Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 73 additions & 3 deletions src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ static ConsumerFlowStrategy creditOnChunkArrival() {
*
* <p>Calls to {@link MessageHandler.Context#processed()} are ignored.
*
* <p>Consider using {@link #creditEveryNthChunk(int, int)} instead as it generates less network
* traffic.
*
* @param initialCredits number of initial credits
* @return flow strategy
* @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int)
Expand Down Expand Up @@ -168,12 +171,79 @@ static ConsumerFlowStrategy creditOnProcessedMessageCount(int initialCredits, do
return new MessageCountConsumerFlowStrategy(initialCredits, ratio);
}

/**
* Strategy that provides the specified number of initial credits and <code>n</code> credits every
* <code>n</code> chunks.
*
* <p>This strategy generates less network traffic than {@link
* com.rabbitmq.stream.ConsumerFlowStrategy.CreditOnChunkArrivalConsumerFlowStrategy} and should
* be used instead, unless <code>n</code> is equal to 1.
*
* <p>A rule of thumb is to set <code>n</code> to half the value of initial credits.
*
* <p>Calls to {@link MessageHandler.Context#processed()} are ignored.
*
* @param initialCredits number of initial credits
* @param n number of chunks and number of credits
* @return flow strategy
*/
static ConsumerFlowStrategy creditEveryNthChunk(int initialCredits, int n) {
return new CreditEveryNthChunkConsumerFlowStrategy(initialCredits, n);
}

/**
* Strategy that provides the specified number of initial credits and <code>n</code> credits every
* <code>n</code> chunks.
*
* <p>This strategy generates less network traffic than {@link
* com.rabbitmq.stream.ConsumerFlowStrategy.CreditOnChunkArrivalConsumerFlowStrategy} and should
* be used instead, unless <code>n</code> is equal to 1.
*
* <p>Calls to {@link MessageHandler.Context#processed()} are ignored.
*/
final class CreditEveryNthChunkConsumerFlowStrategy implements ConsumerFlowStrategy {

private static final MessageProcessedCallback CALLBACK = v -> {};

private final int initialCredits;
private final AtomicLong chunkCount = new AtomicLong(0);
private final int n;

private CreditEveryNthChunkConsumerFlowStrategy(int initialCredits, int n) {
if (n <= 0) {
throw new IllegalArgumentException("The n argument must be greater than 0");
}
if (initialCredits <= n) {
throw new IllegalArgumentException(
"The number of initial credits must be greater than the limit");
}
this.initialCredits = initialCredits;
this.n = n;
}

@Override
public int initialCredits() {
this.chunkCount.set(0);
return this.initialCredits;
}

@Override
public MessageProcessedCallback start(Context context) {
if (chunkCount.incrementAndGet() % n == 0) {
context.credits(n);
}
return CALLBACK;
}
}

/**
* Strategy that provides the specified number of initial credits and a credit on each new chunk.
*
* <p>Calls to {@link MessageHandler.Context#processed()} are ignored.
*/
class CreditOnChunkArrivalConsumerFlowStrategy implements ConsumerFlowStrategy {
final class CreditOnChunkArrivalConsumerFlowStrategy implements ConsumerFlowStrategy {

private static final MessageProcessedCallback CALLBACK = v -> {};

private final int initialCredits;

Expand All @@ -189,7 +259,7 @@ public int initialCredits() {
@Override
public MessageProcessedCallback start(Context context) {
context.credits(1);
return value -> {};
return CALLBACK;
}
}

Expand All @@ -200,7 +270,7 @@ public MessageProcessedCallback start(Context context) {
* <p>Make sure to call {@link MessageHandler.Context#processed()} on every message when using
* this strategy, otherwise the broker may stop sending messages to the consumer.
*/
class MessageCountConsumerFlowStrategy implements ConsumerFlowStrategy {
final class MessageCountConsumerFlowStrategy implements ConsumerFlowStrategy {

private final int initialCredits;
private final double ratio;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) 2025 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;

import static com.rabbitmq.stream.ConsumerFlowStrategy.creditEveryNthChunk;
import static java.util.stream.IntStream.range;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.rabbitmq.stream.ConsumerFlowStrategy;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

public class CreditEveryNthChunkConsumerFlowStrategyTest {

AtomicInteger requestedCredits = new AtomicInteger();

@Test
void invalidArguments() {
assertThatThrownBy(() -> build(1, 1)).isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(() -> build(1, 0)).isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(() -> build(10, 0)).isInstanceOf(IllegalArgumentException.class);
}

@ParameterizedTest
@CsvSource({"10,5", "5,2", "2,1"})
void test(int initialCredits, int limit) {
ConsumerFlowStrategy strategy = build(initialCredits, limit);

range(0, limit - 1)
.forEach(
ignored -> {
strategy.start(context());
assertThat(requestedCredits).hasValue(0);
});
strategy.start(context());
assertThat(requestedCredits).hasValue(limit);
}

ConsumerFlowStrategy build(int initial, int limit) {
return creditEveryNthChunk(initial, limit);
}

ConsumerFlowStrategy.Context context() {
requestedCredits.set(0);
return new ConsumerFlowStrategy.Context() {
@Override
public void credits(int credits) {
requestedCredits.addAndGet(credits);
}

@Override
public long messageCount() {
throw new UnsupportedOperationException();
}
};
}
}
16 changes: 11 additions & 5 deletions src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;

import static com.rabbitmq.stream.ConsumerFlowStrategy.creditEveryNthChunk;
import static com.rabbitmq.stream.ConsumerFlowStrategy.creditWhenHalfMessagesProcessed;
import static com.rabbitmq.stream.impl.Assertions.assertThat;
import static com.rabbitmq.stream.impl.TestUtils.*;
Expand Down Expand Up @@ -53,6 +54,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -160,8 +162,9 @@ void committedOffsetShouldBeSet() throws Exception {
consumer.close();
}

@Test
void consume() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void consume(boolean creditEveryNthChunk) throws Exception {
int messageCount = 100_000;
CountDownLatch publishLatch = new CountDownLatch(messageCount);
Client client =
Expand All @@ -183,15 +186,18 @@ void consume() throws Exception {
CountDownLatch consumeLatch = new CountDownLatch(messageCount);

AtomicLong chunkTimestamp = new AtomicLong();
Consumer consumer =
ConsumerBuilder builder =
environment.consumerBuilder().stream(stream)
.offset(OffsetSpecification.first())
.messageHandler(
(context, message) -> {
chunkTimestamp.set(context.timestamp());
consumeLatch.countDown();
})
.build();
});
if (creditEveryNthChunk) {
builder.flow().strategy(creditEveryNthChunk(10, 5));
}
Consumer consumer = builder.build();

org.assertj.core.api.Assertions.assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
org.assertj.core.api.Assertions.assertThat(chunkTimestamp.get()).isNotZero();
Expand Down
Loading