Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ interface Context {
/**
* Provide credits for the subscription.
*
* <p>{@link ConsumerFlowStrategy} implementation should always provide 1 credit a given chunk.
* <p>{@link ConsumerFlowStrategy} implementation should always provide 1 credit for a given
* chunk.
*
* @param credits the number of credits provided, usually 1
*/
Expand All @@ -79,6 +80,13 @@ interface Context {
* @return number of messages in the chunk
*/
long messageCount();

/**
* The offset of the first message in the chunk, aka chunk ID.
*
* @return offset of the first message in the chunk (chunk ID)
*/
long chunkId();
}

/** Behavior for {@link MessageHandler.Context#processed()} calls. */
Expand Down
14 changes: 11 additions & 3 deletions src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,8 @@ private ClientSubscriptionsManager(
if (subscriptionTracker != null && subscriptionTracker.consumer.isOpen()) {
processCallback =
subscriptionTracker.flowStrategy.start(
new DefaultConsumerFlowStrategyContext(subscriptionId, client, messageCount));
new DefaultConsumerFlowStrategyContext(
subscriptionId, client, messageCount, offset));
} else {
LOGGER.debug(
"Could not find stream subscription {} or subscription closing, not providing credits",
Expand Down Expand Up @@ -1391,12 +1392,14 @@ private static class DefaultConsumerFlowStrategyContext implements ConsumerFlowS
private final byte subscriptionId;
private final Client client;
private final long messageCount;
private final long chunkId;

private DefaultConsumerFlowStrategyContext(
byte subscriptionId, Client client, long messageCount) {
byte subscriptionId, Client client, long messageCount, long chunkId) {
this.subscriptionId = subscriptionId;
this.client = client;
this.messageCount = messageCount;
this.chunkId = chunkId;
}

@Override
Expand All @@ -1414,7 +1417,12 @@ public void credits(int credits) {

@Override
public long messageCount() {
return messageCount;
return this.messageCount;
}

@Override
public long chunkId() {
return this.chunkId;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public void credits(int credits) {
public long messageCount() {
throw new UnsupportedOperationException();
}

@Override
public long chunkId() {
throw new UnsupportedOperationException();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public void credits(int credits) {
public long messageCount() {
return messageCount;
}

@Override
public long chunkId() {
return 0;
}
};
}
}