Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2806 : Receiving an empty list when using RecordFilterStrategy on batch messages #3216

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -3390,18 +3390,30 @@ private final class ConsumerBatchAcknowledgment implements Acknowledgment {
private final ConsumerRecords<K, V> records;

private final List<ConsumerRecord<K, V>> recordList;
private boolean isAnyManualAck = false;

private volatile boolean acked;

private volatile int partial = -1;

ConsumerBatchAcknowledgment(ConsumerRecords<K, V> records,
@Nullable List<ConsumerRecord<K, V>> recordList) {

@Nullable List<ConsumerRecord<K, V>> recordList) {
this.records = records;
this.recordList = recordList;
}

ConsumerBatchAcknowledgment(ConsumerRecords<K, V> records,
@Nullable List<ConsumerRecord<K, V>> recordList,
boolean isAnyManualAck) {
this(records, recordList);
this.isAnyManualAck = isAnyManualAck;
}

@Override
public boolean isAnyManualAck() {
return this.isAnyManualAck;
}

@Override
public void acknowledge() {
if (this.partial >= 0) {
Expand All @@ -3410,7 +3422,8 @@ public void acknowledge() {
}
if (!this.acked) {
Map<TopicPartition, List<Long>> offs = ListenerConsumer.this.offsetsInThisBatch;
Map<TopicPartition, List<ConsumerRecord<K, V>>> deferred = ListenerConsumer.this.deferredOffsets;
Map<TopicPartition, List<ConsumerRecord<K, V>>> deferred =
ListenerConsumer.this.deferredOffsets;
for (TopicPartition topicPartition : this.records.partitions()) {
if (offs != null) {
offs.remove(topicPartition);
Expand All @@ -3425,22 +3438,23 @@ public void acknowledge() {
@Override
public void acknowledge(int index) {
Assert.isTrue(index > this.partial,
() -> String.format("index (%d) must be greater than the previous partial commit (%d)", index,
this.partial));
() -> String.format(
"index (%d) must be greater than the previous partial commit (%d)", index,
this.partial));
Assert.state(ListenerConsumer.this.isManualImmediateAck,
"Partial batch acknowledgment is only supported with AckMode.MANUAL_IMMEDIATE");
"Partial batch acknowledgment is only supported with AckMode.MANUAL_IMMEDIATE");
Assert.state(this.recordList != null,
"Listener must receive a List of records to use partial batch acknowledgment");
"Listener must receive a List of records to use partial batch acknowledgment");
Assert.isTrue(index >= 0 && index < this.recordList.size(),
() -> String.format("index (%d) is out of range (%d-%d)", index, 0,
this.recordList.size() - 1));
() -> String.format("index (%d) is out of range (%d-%d)", index, 0,
this.recordList.size() - 1));
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
"Partial batch acknowledgment is only supported on the consumer thread");
"Partial batch acknowledgment is only supported on the consumer thread");
Map<TopicPartition, List<ConsumerRecord<K, V>>> offsetsToCommit = new LinkedHashMap<>();
for (int i = this.partial + 1; i <= index; i++) {
ConsumerRecord<K, V> record = this.recordList.get(i);
offsetsToCommit.computeIfAbsent(new TopicPartition(record.topic(), record.partition()),
tp -> new ArrayList<>()).add(record);
tp -> new ArrayList<>()).add(record);
}
if (!offsetsToCommit.isEmpty()) {
processAcks(new ConsumerRecords<>(offsetsToCommit));
Expand All @@ -3451,9 +3465,9 @@ public void acknowledge(int index) {
@Override
public void nack(int index, Duration sleep) {
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
"nack() can only be called on the consumer thread");
"nack() can only be called on the consumer thread");
Assert.state(!ListenerConsumer.this.asyncReplies,
"nack() is not supported with out-of-order commits");
"nack() is not supported with out-of-order commits");
Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative");
Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds");
ListenerConsumer.this.nackIndex = index;
Expand All @@ -3463,15 +3477,14 @@ public void nack(int index, Duration sleep) {
for (ConsumerRecord<K, V> cRecord : this.records) {
if (i++ < index) {
toAck.add(cRecord);
}
else {
} else {
break;
}
}
Map<TopicPartition, List<ConsumerRecord<K, V>>> newRecords = new HashMap<>();
for (ConsumerRecord<K, V> cRecord : toAck) {
newRecords.computeIfAbsent(new TopicPartition(cRecord.topic(), cRecord.partition()),
tp -> new LinkedList<>()).add(cRecord);
tp -> new LinkedList<>()).add(cRecord);
artembilan marked this conversation as resolved.
Show resolved Hide resolved
}
processAcks(new ConsumerRecords<K, V>(newRecords));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class FilteringBatchMessageListenerAdapter<K, V>
implements BatchAcknowledgingConsumerAwareMessageListener<K, V> {

private final boolean ackDiscarded;
private final boolean consumerAware;

/**
* Create an instance with the supplied strategy and delegate listener.
Expand All @@ -54,6 +55,8 @@ public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,

super(delegate, recordFilterStrategy);
this.ackDiscarded = false;
this.consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
|| this.delegateType.equals(ListenerType.CONSUMER_AWARE);
}

/**
Expand All @@ -79,19 +82,26 @@ public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgme

List<ConsumerRecord<K, V>> consumerRecords = getRecordFilterStrategy().filterBatch(records);
Assert.state(consumerRecords != null, "filter returned null from filterBatch");
boolean consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
|| this.delegateType.equals(ListenerType.CONSUMER_AWARE);
/*
* An empty list goes to the listener if ackDiscarded is false and the listener can ack
* either through the acknowledgment
*/
if (consumerRecords.size() > 0 || consumerAware
|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING))) {
invokeDelegate(consumerRecords, acknowledgment, consumer);

if (consumerRecords.isEmpty()) {
if (acknowledgment != null && acknowledgment.isAnyManualAck()) {
invokeDelegate(consumerRecords, acknowledgment, consumer);
chickenchickenlove marked this conversation as resolved.
Show resolved Hide resolved
}
else {
if (this.ackDiscarded && acknowledgment != null) {
acknowledgment.acknowledge();
}
}
}
else {
if (this.ackDiscarded && acknowledgment != null) {
acknowledgment.acknowledge();
if (this.consumerAware
|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING))) {
invokeDelegate(consumerRecords, acknowledgment, consumer);
}
else {
if (this.ackDiscarded && acknowledgment != null) {
acknowledgment.acknowledge();
artembilan marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,7 @@ default boolean isOutOfOrderCommit() {
return false;
}

default boolean isAnyManualAck() {
artembilan marked this conversation as resolved.
Show resolved Hide resolved
throw new UnsupportedOperationException("You should implement your isAnyManualAck() method.");
}
}