diff --git a/src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java b/src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java index aa8863d057..786f0ef0c9 100644 --- a/src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java +++ b/src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java @@ -118,6 +118,9 @@ static ConsumerFlowStrategy creditOnChunkArrival() { * *
Calls to {@link MessageHandler.Context#processed()} are ignored. * + *
Consider using {@link #creditEveryNthChunk(int, int)} instead as it generates less network
+ * traffic.
+ *
* @param initialCredits number of initial credits
* @return flow strategy
* @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int)
@@ -168,12 +171,79 @@ static ConsumerFlowStrategy creditOnProcessedMessageCount(int initialCredits, do
return new MessageCountConsumerFlowStrategy(initialCredits, ratio);
}
+ /**
+ * Strategy that provides the specified number of initial credits and n
credits every
+ * n
chunks.
+ *
+ *
This strategy generates less network traffic than {@link
+ * com.rabbitmq.stream.ConsumerFlowStrategy.CreditOnChunkArrivalConsumerFlowStrategy} and should
+ * be used instead, unless n
is equal to 1.
+ *
+ *
A rule of thumb is to set n
to half the value of initial credits.
+ *
+ *
Calls to {@link MessageHandler.Context#processed()} are ignored.
+ *
+ * @param initialCredits number of initial credits
+ * @param n number of chunks and number of credits
+ * @return flow strategy
+ */
+ static ConsumerFlowStrategy creditEveryNthChunk(int initialCredits, int n) {
+ return new CreditEveryNthChunkConsumerFlowStrategy(initialCredits, n);
+ }
+
+ /**
+ * Strategy that provides the specified number of initial credits and n
credits every
+ * n
chunks.
+ *
+ *
This strategy generates less network traffic than {@link
+ * com.rabbitmq.stream.ConsumerFlowStrategy.CreditOnChunkArrivalConsumerFlowStrategy} and should
+ * be used instead, unless n
is equal to 1.
+ *
+ *
Calls to {@link MessageHandler.Context#processed()} are ignored. + */ + final class CreditEveryNthChunkConsumerFlowStrategy implements ConsumerFlowStrategy { + + private static final MessageProcessedCallback CALLBACK = v -> {}; + + private final int initialCredits; + private final AtomicLong chunkCount = new AtomicLong(0); + private final int n; + + private CreditEveryNthChunkConsumerFlowStrategy(int initialCredits, int n) { + if (n <= 0) { + throw new IllegalArgumentException("The n argument must be greater than 0"); + } + if (initialCredits <= n) { + throw new IllegalArgumentException( + "The number of initial credits must be greater than the limit"); + } + this.initialCredits = initialCredits; + this.n = n; + } + + @Override + public int initialCredits() { + this.chunkCount.set(0); + return this.initialCredits; + } + + @Override + public MessageProcessedCallback start(Context context) { + if (chunkCount.incrementAndGet() % n == 0) { + context.credits(n); + } + return CALLBACK; + } + } + /** * Strategy that provides the specified number of initial credits and a credit on each new chunk. * *
Calls to {@link MessageHandler.Context#processed()} are ignored. */ - class CreditOnChunkArrivalConsumerFlowStrategy implements ConsumerFlowStrategy { + final class CreditOnChunkArrivalConsumerFlowStrategy implements ConsumerFlowStrategy { + + private static final MessageProcessedCallback CALLBACK = v -> {}; private final int initialCredits; @@ -189,7 +259,7 @@ public int initialCredits() { @Override public MessageProcessedCallback start(Context context) { context.credits(1); - return value -> {}; + return CALLBACK; } } @@ -200,7 +270,7 @@ public MessageProcessedCallback start(Context context) { *
Make sure to call {@link MessageHandler.Context#processed()} on every message when using * this strategy, otherwise the broker may stop sending messages to the consumer. */ - class MessageCountConsumerFlowStrategy implements ConsumerFlowStrategy { + final class MessageCountConsumerFlowStrategy implements ConsumerFlowStrategy { private final int initialCredits; private final double ratio; diff --git a/src/test/java/com/rabbitmq/stream/impl/CreditEveryNthChunkConsumerFlowStrategyTest.java b/src/test/java/com/rabbitmq/stream/impl/CreditEveryNthChunkConsumerFlowStrategyTest.java new file mode 100644 index 0000000000..8263843a13 --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/impl/CreditEveryNthChunkConsumerFlowStrategyTest.java @@ -0,0 +1,72 @@ +// Copyright (c) 2025 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import static com.rabbitmq.stream.ConsumerFlowStrategy.creditEveryNthChunk; +import static java.util.stream.IntStream.range; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.rabbitmq.stream.ConsumerFlowStrategy; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +public class CreditEveryNthChunkConsumerFlowStrategyTest { + + AtomicInteger requestedCredits = new AtomicInteger(); + + @Test + void invalidArguments() { + assertThatThrownBy(() -> build(1, 1)).isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> build(1, 0)).isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> build(10, 0)).isInstanceOf(IllegalArgumentException.class); + } + + @ParameterizedTest + @CsvSource({"10,5", "5,2", "2,1"}) + void test(int initialCredits, int limit) { + ConsumerFlowStrategy strategy = build(initialCredits, limit); + + range(0, limit - 1) + .forEach( + ignored -> { + strategy.start(context()); + assertThat(requestedCredits).hasValue(0); + }); + strategy.start(context()); + assertThat(requestedCredits).hasValue(limit); + } + + ConsumerFlowStrategy build(int initial, int limit) { + return creditEveryNthChunk(initial, limit); + } + + ConsumerFlowStrategy.Context context() { + requestedCredits.set(0); + return new ConsumerFlowStrategy.Context() { + @Override + public void credits(int credits) { + requestedCredits.addAndGet(credits); + } + + @Override + public long messageCount() { + throw new UnsupportedOperationException(); + } + }; + } +} diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java index 153af5fd20..b37c9a7ae6 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java @@ -14,6 +14,7 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; +import static com.rabbitmq.stream.ConsumerFlowStrategy.creditEveryNthChunk; import static com.rabbitmq.stream.ConsumerFlowStrategy.creditWhenHalfMessagesProcessed; import static com.rabbitmq.stream.impl.Assertions.assertThat; import static com.rabbitmq.stream.impl.TestUtils.*; @@ -53,6 +54,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -160,8 +162,9 @@ void committedOffsetShouldBeSet() throws Exception { consumer.close(); } - @Test - void consume() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void consume(boolean creditEveryNthChunk) throws Exception { int messageCount = 100_000; CountDownLatch publishLatch = new CountDownLatch(messageCount); Client client = @@ -183,15 +186,18 @@ void consume() throws Exception { CountDownLatch consumeLatch = new CountDownLatch(messageCount); AtomicLong chunkTimestamp = new AtomicLong(); - Consumer consumer = + ConsumerBuilder builder = environment.consumerBuilder().stream(stream) .offset(OffsetSpecification.first()) .messageHandler( (context, message) -> { chunkTimestamp.set(context.timestamp()); consumeLatch.countDown(); - }) - .build(); + }); + if (creditEveryNthChunk) { + builder.flow().strategy(creditEveryNthChunk(10, 5)); + } + Consumer consumer = builder.build(); org.assertj.core.api.Assertions.assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue(); org.assertj.core.api.Assertions.assertThat(chunkTimestamp.get()).isNotZero();