diff --git a/pkg/stream/consumer_test.go b/pkg/stream/consumer_test.go index b77d2bdb..d609e5f6 100644 --- a/pkg/stream/consumer_test.go +++ b/pkg/stream/consumer_test.go @@ -642,6 +642,51 @@ var _ = Describe("Streaming Consumers", func() { Expect(consumer.Close()).NotTo(HaveOccurred()) }) + It("Should not skip chunks on slow consumer", func() { + // see: https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/412 + const credits = 2 + const numMessages = 10_000 + producer, err := env.NewProducer(streamName, nil) + Expect(err).NotTo(HaveOccurred()) + err = producer.BatchSend(CreateArrayMessagesForTesting(numMessages)) + Expect(err).NotTo(HaveOccurred()) + + defer func(producer *Producer) { + err := producer.Close() + Expect(err).NotTo(HaveOccurred()) + }(producer) + + lastOffset := atomic.Int64{} + lastOffset.Store(-1) + consumer, err := env.NewConsumer(streamName, + func(ctx ConsumerContext, _ *amqp.Message) { + offset := ctx.Consumer.GetOffset() + Expect(offset).To(Equal(lastOffset.Load() + 1)) + lastOffset.Store(offset) + + if offset%1_000 == 0 { + // simulating an intensive operation + time.Sleep(500 * time.Millisecond) + } + }, NewConsumerOptions(). + SetInitialCredits(credits). + SetOffset(OffsetSpecification{}.First()). + SetConsumerName("consumer_test")) + Expect(err).NotTo(HaveOccurred()) + + // it subtract 1 because the offset starts at 0 + //nolint:gocritic + Eventually(func() int64 { return lastOffset.Load() }, 10*time.Second). + Should( + Equal(int64(numMessages-1)), + "not all the messages has been consumed %d out of %d", + lastOffset.Load(), + numMessages-1, + ) + + Expect(consumer.Close()).NotTo(HaveOccurred()) + }) + It("Validation", func() { _, err := env.NewConsumer(streamName, func(_ ConsumerContext, _ *amqp.Message) {