Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2806 : Receiving an empty list when using RecordFilterStrategy on batch messages #3216

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,6 +36,7 @@
* @param <V> the value type.
*
* @author Gary Russell
* @author Sanghyeok An
*
*/
public class FilteringBatchMessageListenerAdapter<K, V>
Expand All @@ -44,16 +45,16 @@ public class FilteringBatchMessageListenerAdapter<K, V>

private final boolean ackDiscarded;

private final boolean consumerAware;

/**
* Create an instance with the supplied strategy and delegate listener.
* @param delegate the delegate.
* @param recordFilterStrategy the filter.
*/
public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,
RecordFilterStrategy<K, V> recordFilterStrategy) {

super(delegate, recordFilterStrategy);
this.ackDiscarded = false;
this(delegate, recordFilterStrategy, false);
}

/**
Expand All @@ -71,22 +72,24 @@ public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,

super(delegate, recordFilterStrategy);
this.ackDiscarded = ackDiscarded;
this.consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) || this.delegateType.equals(ListenerType.CONSUMER_AWARE);
artembilan marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {

List<ConsumerRecord<K, V>> consumerRecords = getRecordFilterStrategy().filterBatch(records);
final RecordFilterStrategy<K, V> recordFilterStrategy = getRecordFilterStrategy();
final List<ConsumerRecord<K, V>> consumerRecords = recordFilterStrategy.filterBatch(records);
Assert.state(consumerRecords != null, "filter returned null from filterBatch");
boolean consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
|| this.delegateType.equals(ListenerType.CONSUMER_AWARE);
/*
* An empty list goes to the listener if ackDiscarded is false and the listener can ack
* either through the acknowledgment
*/
if (consumerRecords.size() > 0 || consumerAware
|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING))) {

if (recordFilterStrategy.ignoreEmptyBatch() &&
consumerRecords.isEmpty() &&
acknowledgment != null) {
acknowledgment.acknowledge();
}
else if (consumerRecords.size() > 0 || this.consumerAware
|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING))) {
invokeDelegate(consumerRecords, acknowledgment, consumer);
}
else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,8 @@

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.kafka.listener.BatchMessageListener;

/**
* Implementations of this interface can signal that a record about
* to be delivered to a message listener should be discarded instead
Expand All @@ -30,7 +32,7 @@
* @param <V> the value type.
*
* @author Gary Russell
*
* @author Sanghyeok An
*/
public interface RecordFilterStrategy<K, V> {

Expand Down Expand Up @@ -58,4 +60,14 @@ default List<ConsumerRecord<K, V>> filterBatch(List<ConsumerRecord<K, V>> record
return records;
}

/**
* Returns a boolean value that determines whether {@link FilteringBatchMessageListenerAdapter} invoke the {@link BatchMessageListener} when all {@link ConsumerRecord}
artembilan marked this conversation as resolved.
Show resolved Hide resolved
* have been filtered and return an EmptyList. The default is not to invoke the {@link BatchMessageListener} (false).
artembilan marked this conversation as resolved.
Show resolved Hide resolved
* @return If true is returned, the {@link FilteringBatchMessageListenerAdapter} will not invoke the {@link BatchMessageListener}
* @since 3.2.0
*/
default boolean ignoreEmptyBatch() {
artembilan marked this conversation as resolved.
Show resolved Hide resolved
return false;
}

}