From 04de5a51db0c7f5038ad7019bffff3a1056e1d2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Tue, 25 Nov 2025 15:44:25 +0100 Subject: [PATCH] Add chunk ID to flow strategy context --- .../com/rabbitmq/stream/ConsumerFlowStrategy.java | 10 +++++++++- .../rabbitmq/stream/impl/ConsumersCoordinator.java | 14 +++++++++++--- ...reditEveryNthChunkConsumerFlowStrategyTest.java | 5 +++++ .../impl/MessageCountConsumerFlowStrategyTest.java | 5 +++++ 4 files changed, 30 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java b/src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java index 8e2763103c..804dca3ad4 100644 --- a/src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java +++ b/src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java @@ -67,7 +67,8 @@ interface Context { /** * Provide credits for the subscription. * - *

{@link ConsumerFlowStrategy} implementation should always provide 1 credit a given chunk. + *

{@link ConsumerFlowStrategy} implementation should always provide 1 credit for a given + * chunk. * * @param credits the number of credits provided, usually 1 */ @@ -79,6 +80,13 @@ interface Context { * @return number of messages in the chunk */ long messageCount(); + + /** + * The offset of the first message in the chunk, aka chunk ID. + * + * @return offset of the first message in the chunk (chunk ID) + */ + long chunkId(); } /** Behavior for {@link MessageHandler.Context#processed()} calls. */ diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java index 07fdb59959..5d8f9b0c2f 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java @@ -665,7 +665,8 @@ private ClientSubscriptionsManager( if (subscriptionTracker != null && subscriptionTracker.consumer.isOpen()) { processCallback = subscriptionTracker.flowStrategy.start( - new DefaultConsumerFlowStrategyContext(subscriptionId, client, messageCount)); + new DefaultConsumerFlowStrategyContext( + subscriptionId, client, messageCount, offset)); } else { LOGGER.debug( "Could not find stream subscription {} or subscription closing, not providing credits", @@ -1391,12 +1392,14 @@ private static class DefaultConsumerFlowStrategyContext implements ConsumerFlowS private final byte subscriptionId; private final Client client; private final long messageCount; + private final long chunkId; private DefaultConsumerFlowStrategyContext( - byte subscriptionId, Client client, long messageCount) { + byte subscriptionId, Client client, long messageCount, long chunkId) { this.subscriptionId = subscriptionId; this.client = client; this.messageCount = messageCount; + this.chunkId = chunkId; } @Override @@ -1414,7 +1417,12 @@ public void credits(int credits) { @Override public long messageCount() { - return messageCount; + return this.messageCount; + } + + @Override + public long chunkId() { + return this.chunkId; } } diff --git a/src/test/java/com/rabbitmq/stream/impl/CreditEveryNthChunkConsumerFlowStrategyTest.java b/src/test/java/com/rabbitmq/stream/impl/CreditEveryNthChunkConsumerFlowStrategyTest.java index ca1b6faec7..7841944501 100644 --- a/src/test/java/com/rabbitmq/stream/impl/CreditEveryNthChunkConsumerFlowStrategyTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/CreditEveryNthChunkConsumerFlowStrategyTest.java @@ -66,6 +66,11 @@ public void credits(int credits) { public long messageCount() { throw new UnsupportedOperationException(); } + + @Override + public long chunkId() { + throw new UnsupportedOperationException(); + } }; } } diff --git a/src/test/java/com/rabbitmq/stream/impl/MessageCountConsumerFlowStrategyTest.java b/src/test/java/com/rabbitmq/stream/impl/MessageCountConsumerFlowStrategyTest.java index ea959a1903..bb4d8f3c12 100644 --- a/src/test/java/com/rabbitmq/stream/impl/MessageCountConsumerFlowStrategyTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/MessageCountConsumerFlowStrategyTest.java @@ -70,6 +70,11 @@ public void credits(int credits) { public long messageCount() { return messageCount; } + + @Override + public long chunkId() { + return 0; + } }; } }