-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Closed
Description
Version 2.8.9
Description
Call of Acknowledgment.nack() leads to infinite consumer pause when container property asyncAck set to true.
The problem seems to be with nack() implementation, as it only clears offsetsInThisBatch
values but not map itself:
@Override
public void nack(Duration sleep) {
ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis();
synchronized (ListenerConsumer.this) {
if (ListenerConsumer.this.offsetsInThisBatch != null) {
ListenerConsumer.this.offsetsInThisBatch.forEach((part, recs) -> recs.clear());
ListenerConsumer.this.deferredOffsets.forEach((part, recs) -> recs.clear());
}
}
}
Subsequent call of private void doResumeConsumerIfNeccessary() {
does not resume consumer, since offsetsInThisBatch.size() > 0
.
To Reproduce
Try to call nack() with asyncAck set to true
Expected behavior
Consumer continues to read messages after specified sleep time out.
Sample