Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2806 : Receiving an empty list when using RecordFilterStrategy on batch messages #3216

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,16 +44,16 @@ public class FilteringBatchMessageListenerAdapter<K, V>

private final boolean ackDiscarded;

private final boolean consumerAware;

/**
* Create an instance with the supplied strategy and delegate listener.
* @param delegate the delegate.
* @param recordFilterStrategy the filter.
*/
public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,
RecordFilterStrategy<K, V> recordFilterStrategy) {

super(delegate, recordFilterStrategy);
this.ackDiscarded = false;
this(delegate, recordFilterStrategy, false);
}

/**
Expand All @@ -71,27 +71,31 @@ public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,

super(delegate, recordFilterStrategy);
this.ackDiscarded = ackDiscarded;
this.consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) || this.delegateType.equals(ListenerType.CONSUMER_AWARE);
artembilan marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {

List<ConsumerRecord<K, V>> consumerRecords = getRecordFilterStrategy().filterBatch(records);
final RecordFilterStrategy<K, V> recordFilterStrategy = getRecordFilterStrategy();
final List<ConsumerRecord<K, V>> consumerRecords = recordFilterStrategy.filterBatch(records);
Assert.state(consumerRecords != null, "filter returned null from filterBatch");
boolean consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
|| this.delegateType.equals(ListenerType.CONSUMER_AWARE);
/*
* An empty list goes to the listener if ackDiscarded is false and the listener can ack
* either through the acknowledgment
*/
if (consumerRecords.size() > 0 || consumerAware
|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING))) {
invokeDelegate(consumerRecords, acknowledgment, consumer);

if (recordFilterStrategy.ignoreEmptyBatch()) {
if (acknowledgment != null) {
invokeDelegate(consumerRecords, acknowledgment, consumer);
chickenchickenlove marked this conversation as resolved.
Show resolved Hide resolved
}
}
else {
if (this.ackDiscarded && acknowledgment != null) {
acknowledgment.acknowledge();
if (this.consumerAware
|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING))) {
invokeDelegate(consumerRecords, acknowledgment, consumer);
}
else {
if (this.ackDiscarded && acknowledgment != null) {
acknowledgment.acknowledge();
artembilan marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ default List<ConsumerRecord<K, V>> filterBatch(List<ConsumerRecord<K, V>> record
return records;
}

default boolean ignoreEmptyBatch() {
artembilan marked this conversation as resolved.
Show resolved Hide resolved
return false;
}

}