Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ require (
golang.org/x/net v0.46.0 // indirect
golang.org/x/sys v0.37.0 // indirect
golang.org/x/tools v0.38.0 // indirect
google.golang.org/protobuf v1.36.7 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
)
12 changes: 11 additions & 1 deletion pkg/ha/ha_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,20 +156,30 @@ func (c *ReliableConsumer) GetInfo() string {
return c.getInfo()
}

// Deprecated: see consumer.GetLastStoredOffset()
// use QueryOffset() instead
func (c *ReliableConsumer) GetLastStoredOffset() int64 {
c.mutexConnection.Lock()
defer c.mutexConnection.Unlock()

return c.consumer.GetLastStoredOffset()
}

func (c *ReliableConsumer) StoreOffset() error {
// QueryOffset returns the last stored offset for this consumer given its name and stream
func (c *ReliableConsumer) QueryOffset() (int64, error) {
c.mutexConnection.Lock()
defer c.mutexConnection.Unlock()
return c.consumer.QueryOffset()
}

// StoreOffset stores the current offset for this consumer given its name and stream
func (c *ReliableConsumer) StoreOffset() error {
c.mutexConnection.Lock()
defer c.mutexConnection.Unlock()
return c.consumer.StoreOffset()
}

// StoreCustomOffset stores a custom offset for this consumer given its name and stream
func (c *ReliableConsumer) StoreCustomOffset(offset int64) error {
c.mutexConnection.Lock()
defer c.mutexConnection.Unlock()
Expand Down
41 changes: 16 additions & 25 deletions pkg/ha/ha_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,10 @@ var _ = Describe("Reliable Consumer", func() {
SetConsumerName(clientProvidedName).
SetClientProvidedName(clientProvidedName),
func(ctx ConsumerContext, _ *amqp.Message) {
defer GinkgoRecover()
// call on every message to test the re-connection.
offset := ctx.Consumer.GetOffset()
_ = ctx.Consumer.StoreCustomOffset(offset - 1) // commit all except the last one
Expect(ctx.Consumer.StoreCustomOffset(offset - 1)).To(BeNil()) // commit all except the last one

// wait the connection drop to ensure correct offset tracking on re-connection
if offset == messageToSend/2 {
Expand All @@ -179,36 +180,26 @@ var _ = Describe("Reliable Consumer", func() {
Expect(err).NotTo(HaveOccurred())
Expect(consumer).NotTo(BeNil())

connectionToDrop := ""
Eventually(func() bool {
connections, err := test_helper.Connections("15672")
if err != nil {
return false
}
for _, connection := range connections {
if connection.ClientProperties.Connection_name == clientProvidedName {
connectionToDrop = connection.Name
return true
}
}
return false
}, time.Second*5).
Should(BeTrue())

Expect(connectionToDrop).NotTo(BeEmpty())
// kill the connection
errDrop := test_helper.DropConnection(connectionToDrop, "15672")
Expect(errDrop).NotTo(HaveOccurred())
dropSignal <- struct{}{}
Eventually(func() (bool, error) { return test_helper.IsConnectionAlive(clientProvidedName, "15672") }, 10*time.Second).WithPolling(500*time.Millisecond).
Should(BeTrue(), "check if the connection is alive")

errDrop := test_helper.DropConnectionAndWait(clientProvidedName, "15672", 10*time.Second)
Expect(errDrop).NotTo(HaveOccurred())
Expect(err).NotTo(HaveOccurred())
Eventually(func() int64 { return consumer.GetLastStoredOffset() }, 10*time.Second).
Should(Equal(int64(98)), "Offset should be 99")

Eventually(func() (bool, error) { return test_helper.IsConnectionAlive(clientProvidedName, "15672") }, 20*time.Second).
WithPolling(500*time.Millisecond).
Should(BeTrue(), "check if the connection is alive")

Eventually(func() (int64, error) { return consumer.QueryOffset() }, 10*time.Second).WithPolling(500*time.Millisecond).
Should(Equal(int64(98)), "Offset should be 98")

// set a custom offset
Expect(consumer.StoreCustomOffset(99)).NotTo(HaveOccurred())
Eventually(func() int64 { return consumer.GetLastStoredOffset() }, 1*time.Second).
Should(Equal(int64(99)), "Offset should be 99")
Expect(consumer.StoreCustomOffset(33)).NotTo(HaveOccurred())
Eventually(func() (int64, error) { return consumer.QueryOffset() }, 1*time.Second).
Should(Equal(int64(33)), "Offset should be 33 due to custom commit")

Expect(consumer.Close()).NotTo(HaveOccurred())
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/integration_test/stream_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var _ = Describe("StreamIntegration", func() {
var (
addresses = []string{
"rabbitmq-stream://guest:guest@localhost:5552/"}
streamName = "test-next"
streamName = fmt.Sprintf("test-next-%d", time.Now().UnixNano())
streamEnv *stream.Environment
producer *stream.Producer
totalInitialMessages int
Expand Down
39 changes: 36 additions & 3 deletions pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,10 @@ func (c *Client) queryPublisherSequence(publisherReference string, stream string
}

func (c *Client) BrokerLeader(stream string) (*Broker, error) {
return c.BrokerLeaderWithResolver(stream, nil)
}

func (c *Client) BrokerLeaderWithResolver(stream string, resolver *AddressResolver) (*Broker, error) {
streamsMetadata := c.metaData(stream)
if streamsMetadata == nil {
return nil, fmt.Errorf("leader error for stream for stream: %s", stream)
Expand All @@ -693,6 +697,13 @@ func (c *Client) BrokerLeader(stream string) (*Broker, error) {
streamMetadata.Leader.advPort = streamMetadata.Leader.Port
streamMetadata.Leader.advHost = streamMetadata.Leader.Host

// If AddressResolver is configured, use it directly and skip DNS lookup
if resolver != nil {
streamMetadata.Leader.Host = resolver.Host
streamMetadata.Leader.Port = strconv.Itoa(resolver.Port)
return streamMetadata.Leader, nil
}

res := net.Resolver{}
// see: https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/317
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
Expand Down Expand Up @@ -738,12 +749,30 @@ func (c *Client) BrokerForConsumer(stream string) (*Broker, error) {
}

brokers := make([]*Broker, 0, 1+len(streamMetadata.Replicas))
brokers = append(brokers, streamMetadata.Leader)

// Count available replicas
availableReplicas := 0
for _, replica := range streamMetadata.Replicas {
if replica != nil {
availableReplicas++
}
}

// Only add leader if no replicas are available
if availableReplicas == 0 {
streamMetadata.Leader.advPort = streamMetadata.Leader.Port
streamMetadata.Leader.advHost = streamMetadata.Leader.Host
brokers = append(brokers, streamMetadata.Leader)
}

// Add all available replicas
for idx, replica := range streamMetadata.Replicas {
if replica == nil {
logs.LogWarn("Stream %s replica not ready: %d", stream, idx)
continue
}
replica.advPort = replica.Port
replica.advHost = replica.Host
brokers = append(brokers, replica)
}

Expand Down Expand Up @@ -875,14 +904,18 @@ func (c *Client) declareSubscriber(streamName string,
return nil, fmt.Errorf("specify a valid Offset")
}

if options.autoCommitStrategy.flushInterval < 1*time.Second {
if (options.autoCommitStrategy != nil) && (options.autoCommitStrategy.flushInterval < 1*time.Second) && options.autocommit {
return nil, fmt.Errorf("flush internal must be bigger than one second")
}

if options.autoCommitStrategy.messageCountBeforeStorage < 1 {
if (options.autoCommitStrategy != nil) && options.autoCommitStrategy.messageCountBeforeStorage < 1 && options.autocommit {
return nil, fmt.Errorf("message count before storage must be bigger than one")
}

if (options.autoCommitStrategy != nil) && options.ConsumerName == "" && options.autocommit {
return nil, fmt.Errorf("consumer name must be set when autocommit is enabled")
}

if messagesHandler == nil {
return nil, fmt.Errorf("messages Handler must be set")
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/stream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,22 @@ func (consumer *Consumer) setPromotedAsActive(promoted bool) {
consumer.isPromotedAsActive = promoted
}

// Deprecated: The method name may be misleading.
// The method does not indicate the last message stored, but the last stored in memory.
// The method was added to avoid to query the offset from the server, but it created confusion.
// Use `QueryOffset` instead.:
//
// offset, err := consumer.QueryOffset()
// // or:
// offset, err := env.QueryOffset(consumerName, streamName)
// // check the error
// ....
// SetOffset(stream.OffsetSpecification{}.Offset(offset)).
//
// There is an edge case in which multiple clients use the same consumer name,
// and the last stored offset in memory is not the one the user expects.
// So, to avoid confusion, it is better to use QueryOffset, which always gets the value from the server.

func (consumer *Consumer) GetLastStoredOffset() int64 {
consumer.mutex.Lock()
defer consumer.mutex.Unlock()
Expand Down Expand Up @@ -312,6 +328,13 @@ func (c *ConsumerOptions) SetClientProvidedName(clientProvidedName string) *Cons
return c
}

func (c *ConsumerOptions) GetClientProvidedName(defaultClientProvidedName string) string {
if c == nil {
return defaultClientProvidedName
}
return c.ClientProvidedName
}

func (c *ConsumerOptions) SetFilter(filter *ConsumerFilter) *ConsumerOptions {
c.Filter = filter
return c
Expand Down Expand Up @@ -452,9 +475,12 @@ func (consumer *Consumer) getLastAutoCommitStored() time.Time {
return consumer.lastAutoCommitStored
}

// StoreOffset stores the current offset for this consumer given its name and stream
func (consumer *Consumer) StoreOffset() error {
return consumer.internalStoreOffset()
}

// StoreCustomOffset stores a custom offset for this consumer given its name and stream
func (consumer *Consumer) StoreCustomOffset(offset int64) error {
consumer.mutex.Lock()
defer consumer.mutex.Unlock()
Expand Down Expand Up @@ -510,7 +536,11 @@ func (consumer *Consumer) writeConsumeUpdateOffsetToSocket(correlationID uint32,
return consumer.options.client.socket.writeAndFlush(b.Bytes())
}

// QueryOffset returns the last stored offset for this consumer given its name and stream
func (consumer *Consumer) QueryOffset() (int64, error) {
if (consumer.options == nil) || (consumer.options.client == nil) || (consumer.options.ConsumerName == "") || (consumer.options.streamName == "") {
return -1, fmt.Errorf("offset query error: consumer not properly initialized")
}
return consumer.options.client.queryOffset(consumer.options.ConsumerName, consumer.options.streamName)
}

Expand Down
67 changes: 40 additions & 27 deletions pkg/stream/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ var _ = Describe("Streaming Consumers", func() {
SetManualCommit().
SetCRCCheck(false))
Expect(err).NotTo(HaveOccurred())
Eventually(func() int64 { return consumer.GetLastStoredOffset() }, 5*time.Second).Should(Equal(int64(99)),
Eventually(func() (int64, error) { return consumer.QueryOffset() }, 5*time.Second).Should(Equal(int64(99)),
"Offset should be 99")
Expect(consumer.Close()).NotTo(HaveOccurred())
})
Expand All @@ -236,18 +236,18 @@ var _ = Describe("Streaming Consumers", func() {
SetCountBeforeStorage(100).
SetFlushInterval(50*time.Second))) // here we set a high value to do not trigger the time
Expect(err).NotTo(HaveOccurred())
Eventually(func() int64 {
return consumer.GetLastStoredOffset()
time.Sleep(500 * time.Millisecond)
Eventually(func() (int64, error) {
v, err := consumer.QueryOffset()
// we can ignore the offset not found error here
if err != nil {
return 0, nil
}
return v, err
// 99 is the offset since it starts from 0
}, 5*time.Second).Should(Equal(int64(99)),
}, 5*time.Second).WithPolling(500*time.Millisecond).Should(Equal(int64(99)),
"Offset should be 99")
Expect(consumer.Close()).NotTo(HaveOccurred())
/// When the consumer is closed, it has to save the offset
// so the last offset has to be 104
Eventually(func() int64 {
return consumer.GetLastStoredOffset()
}, 5*time.Second).Should(Equal(int64(104)),
"Offset should be 104")

consumerTimer, errTimer := env.NewConsumer(streamName,
func(_ ConsumerContext, _ *amqp.Message) {
Expand All @@ -259,19 +259,16 @@ var _ = Describe("Streaming Consumers", func() {
SetCountBeforeStorage(10000000). /// We avoid raising the timer
SetFlushInterval(1*time.Second)))
Expect(errTimer).NotTo(HaveOccurred())
time.Sleep(2 * time.Second)
Eventually(func() int64 {
return consumerTimer.GetLastStoredOffset()
}, 5*time.Second).Should(Equal(int64(104)),
Eventually(func() (int64, error) {
v, err := consumerTimer.QueryOffset()
// we can ignore the offset not found error here
if err != nil {
return 0, nil
}
return v, err
}, 5*time.Second).WithPolling(500*time.Millisecond).Should(Equal(int64(104)),
"Offset should be 104")
Expect(consumerTimer.Close()).NotTo(HaveOccurred())
/// When the consumer is closed, it has to save the offset
// so the last offest has to be 104
Eventually(func() int64 {
return consumerTimer.GetLastStoredOffset()
}, 5*time.Second).Should(Equal(int64(104)),
"Offset should be 104")

})

})
Expand All @@ -285,6 +282,7 @@ var _ = Describe("Streaming Consumers", func() {
func(_ ConsumerContext, _ *amqp.Message) {
atomic.AddInt32(&messagesReceived, 1)
}, NewConsumerOptions().
SetConsumerName("autoCommitStrategy").
SetAutoCommit(NewAutoCommitStrategy().
SetCountBeforeStorage(10000000).
SetFlushInterval(time.Second)))
Expand All @@ -294,14 +292,17 @@ var _ = Describe("Streaming Consumers", func() {
for i := 0; i < maxMessages; i++ {
Expect(producer.Send(CreateMessageForTesting("", i))).NotTo(HaveOccurred())
// emit message before the flush interval has elapsed
time.Sleep(time.Millisecond * 100)
time.Sleep(time.Millisecond * 1100)

if consumer.GetLastStoredOffset() > 0 {
v, err := consumer.QueryOffset()
Expect(err).NotTo(HaveOccurred())
if v > 0 {
break
}

}

Expect(messagesReceived > 5 && messagesReceived < int32(maxMessages)).To(BeTrueBecause("%d messages received", messagesReceived))
Expect(messagesReceived > 0 && messagesReceived < int32(maxMessages)).To(BeTrueBecause("%d messages received", messagesReceived))
Expect(producer.Close()).NotTo(HaveOccurred())
Expect(consumer.Close()).NotTo(HaveOccurred())
})
Expand Down Expand Up @@ -404,8 +405,8 @@ var _ = Describe("Streaming Consumers", func() {
}, 5*time.Second).Should(Equal(int32(107)),
"consumer should receive same messages Send by producer")

Eventually(func() int64 {
return consumer.GetLastStoredOffset()
Eventually(func() (int64, error) {
return consumer.QueryOffset()
// 106 is the offset since it starts from 0
}, 5*time.Second).Should(Equal(int64(106)),
"Offset should be 106")
Expand Down Expand Up @@ -710,13 +711,25 @@ var _ = Describe("Streaming Consumers", func() {
NewAutoCommitStrategy().SetFlushInterval(10*time.Millisecond)))
Expect(err).To(HaveOccurred())

// message handler must be set
// message specific a valid offset
_, err = env.NewConsumer(streamName,
nil, &ConsumerOptions{
Offset: OffsetSpecification{},
})
Expect(err).To(HaveOccurred())

// handler is nil
_, err = env.NewConsumer(streamName,
nil, &ConsumerOptions{
Offset: OffsetSpecification{
typeOfs: typeFirst},
})
Expect(err).To(HaveOccurred())

_, err = env.NewConsumer(streamName,
nil, NewConsumerOptions().SetAutoCommit(NewAutoCommitStrategy()))
Expect(err).To(HaveOccurred())

})

It("Sub Batch consumer with different publishers GZIP and Not", func() {
Expand Down
Loading
Loading