-
Notifications
You must be signed in to change notification settings - Fork 74
Description
Spring Pulsar Partitioned Topic Batch Ack Bug Report
Affected Versions
- Initially confirmed on Spring Pulsar 1.1.4
Later updated all dependencies and reproduced on:
- Spring Boot: 3.5.8
- Spring Boot Starter Pulsar: 3.5.8
- Spring Pulsar: 1.2.12
- Spring: 6.2.14
- Apache Pulsar Client: 4.0.7
- Pulsar Docker image: apachepulsar/pulsar:4.1.1
Description
When using @PulsarListener on a partitioned topic with the default AckMode.BATCH (or explicitly configured BATCH) and an Exclusive subscription type, messages from most partitions remain unacknowledged despite being successfully processed. This results in a persistent backlog on the broker. Upon restarting the consumer, the unacknowledged messages are redelivered (duplicates), processed again, and some may remain unacknowledged.
Root Cause Analysis
The issue is in DefaultPulsarMessageListenerContainer (specifically in batch ack handling).
When consuming from a partitioned topic, the underlying Pulsar client uses a MultiTopicsConsumer. The Messages<T> container returned by the client contains a multiplexed list of messages from various partitions (e.g., [Partition-0-Msg1, Partition-1-Msg1, Partition-0-Msg2, Partition-7-Msg1]).
The current implementation for non-shared subscriptions reduces this list to the last message and performs a cumulative acknowledgement on that single message:
// In DefaultPulsarMessageListenerContainer.java handleBatchAcks
Stream<Message<T>> stream = StreamSupport.stream(messages.spliterator(), true);
Message<T> last = stream.reduce((a, b) -> b)
.orElseThrow(() -> new RuntimeException("Failed to determine last message"));
AckUtils.handleAckCumulative(this.consumer, last, txn);In Pulsar, acknowledgeCumulative seems to be scoped to the specific partition the message belongs to. By acknowledging only the last message in the batch (e.g., Partition-7-Msg1), the listener leaves all messages from other partitions (e.g., Partition-0 and Partition-1) unacknowledged.
Steps to Reproduce
- Start a standalone Pulsar broker.
- Create a persistent partitioned topic (e.g., 4 or 8 partitions):
bin/pulsar-admin topics create-partitioned-topic persistent://public/default/my-partitioned-topic --partitions 8
- Publish ~500 messages rapidly, distributed across all partitions.
- Create a Spring Boot application with a standard listener:
Ensure
@PulsarListener(topics = "my-partitioned-topic", subscriptionName = "sub-1") public void listen(String payload) { // processing logic }
ackModeis default (BATCH) or explicitly set toBATCH. - Observe:
- Application logs show all messages are processed.
- Broker stats (
bin/pulsar-admin topics stats ...) show only one partition (corresponding to the last message processed in the batch) has an advanced cursor. Other partitions showlastAckedTimestamp: 0and a full backlog.
Expected Behavior
When AckMode.BATCH is used with a MultiTopicsConsumer, the container should either:
- Group messages by partition and issue a cumulative ack for the last message of each partition, or
- Fall back to individual acknowledgments for all messages if tracking per-partition tails is too complex.
Actual Behavior
Only the partition belonging to the very last message in the polled batch is acknowledged. Messages processed from other partitions in that same poll cycle are never acknowledged. When the consumer restarts, unacknowledged messages are redelivered.
Workaround
Switching to AckMode.MANUAL or AckMode.RECORD resolves the issue, as these modes trigger individual acknowledge(message) calls, which correctly route to the internal consumer for the specific partition.
@PulsarListener(
topics = "my-partitioned-topic",
ackMode = AckMode.MANUAL
)
public void listen(Message<String> msg, Consumer<String> consumer) {
// process...
consumer.acknowledge(msg);
}