Skip to content

Commit

Permalink
GH-2806: Receiving an empty list with RecordFilterStrategy
Browse files Browse the repository at this point in the history
Fixes: #2806

Motivation:

    Receiving an empty list when using `RecordFilterStrategy` on batch messages
    In the current batch mode, even if the `RecordFilterStrategy` filters all records resulting in an Empty List being returned, the KafkaListener is still invoked.
    In contrast, in single record mode, if record are filtered, the `KafkaListener` is not called.
    This difference in behavior between the two modes can cause confusion for users.

Modifications:

    Add public method `isAnyManualAck()` to `Acknowledgment` to verify that `manualAck` is needed on `FilteringBatchMessageListenerAdapter`.
    Modify `FilteringBatchMessageListenerAdapter`.
        add field `consumerAware` as final (IMHO, we don't need to calculate it every single call `onMessage()`).
        add logic (if empty list and manual Ack == true, KafkaListener will be invoked.
        If empty list and manual Ack == false, `KafkaListener` will not be invoked even if listener is kind of ConsumerAware.
        In detail, See Discussion section below.

Result:

    Receiving an empty list when using RecordFilterStrategy on batch messages #2806
    When the RecordFilterStrategy filters all records and returns an Empty List, the KafkaListener is invoked only if it is in manual ACK mode.

Discussion:

    When using a ConsumerAware Listener, commits can be made using `Consumer.commitSync()` and `Consumer.commitAsync()`.
    However, when using a `ConsumerAwareAckListener`, it seems possible that commits using the Consumer and commits using Ack could be processed simultaneously.
    That situation seems quite ambiguous.
  • Loading branch information
chickenchickenlove authored and artembilan committed Jul 11, 2024
1 parent 3c2bd93 commit f91f8a9
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,63 @@ public void listen(Thing thing) {
}
----

Starting with version 3.3, Ignoring empty batches that result from filtering by `RecordFilterStrategy` is supported.
When implementing `RecordFilterStrategy`, it can be configured through `ignoreEmptyBatch()`.
The default setting is `false`, indicating `KafkaListener` will be invoked even if all `ConsumerRecord` s are filtered out.

If `true` is returned, the `KafkaListener` [underline]#will not be invoked# when all `ConsumerRecord` are filtered out.
However, commit to broker, will still be executed.

If `false` is returned, the `KafkaListener` [underline]#will be invoked# when all `ConsumerRecord` are filtered out.

Here are some examples.

[source,java]
----
public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
@Override
public boolean ignoreEmptyBatch() {
return true;
}
};
// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
----
In this case, `IgnoreEmptyBatchRecordFilterStrategy` always returns empty list and return `true` as result of `ignoreEmptyBatch()`.
Thus `KafkaListener#listen(...)` never will be invoked at all.

[source,java]
----
public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
@Override
public boolean ignoreEmptyBatch() {
return false;
}
};
// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
----
However, in this case, `IgnoreEmptyBatchRecordFilterStrategy` always returns empty list and return `false` as result of `ignoreEmptyBatch()`.
Thus `KafkaListener#listen(...)` always will be invoked.
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,9 @@ A new method, `getGroupId()`, has been added to the `ConsumerSeekCallback` inter
This method allows for more selective seek operations by targeting only the desired consumer group.
For more details, see xref:kafka/seek.adoc#seek[Seek API Docs].

[[x33-new-option-ignore-empty-batch]]
=== Configurable Handling of Empty Batches in Kafka Listener with RecordFilterStrategy

`RecordFilterStrategy` now supports ignoring empty batches that result from filtering.
This can be configured through overriding default method `ignoreEmptyBatch()`, which defaults to false, ensuring `KafkaListener` is invoked even if all `ConsumerRecords` are filtered out.
For more details, see xref:kafka/receiving-messages/filtering.adoc[Message receive filtering Docs].
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,6 +36,7 @@
* @param <V> the value type.
*
* @author Gary Russell
* @author Sanghyeok An
*
*/
public class FilteringBatchMessageListenerAdapter<K, V>
Expand All @@ -44,16 +45,16 @@ public class FilteringBatchMessageListenerAdapter<K, V>

private final boolean ackDiscarded;

private final boolean consumerAware;

/**
* Create an instance with the supplied strategy and delegate listener.
* @param delegate the delegate.
* @param recordFilterStrategy the filter.
*/
public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,
RecordFilterStrategy<K, V> recordFilterStrategy) {

super(delegate, recordFilterStrategy);
this.ackDiscarded = false;
this(delegate, recordFilterStrategy, false);
}

/**
Expand All @@ -71,22 +72,25 @@ public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,

super(delegate, recordFilterStrategy);
this.ackDiscarded = ackDiscarded;
this.consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) ||
this.delegateType.equals(ListenerType.CONSUMER_AWARE);
}

@Override
public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {

List<ConsumerRecord<K, V>> consumerRecords = getRecordFilterStrategy().filterBatch(records);
final RecordFilterStrategy<K, V> recordFilterStrategy = getRecordFilterStrategy();
final List<ConsumerRecord<K, V>> consumerRecords = recordFilterStrategy.filterBatch(records);
Assert.state(consumerRecords != null, "filter returned null from filterBatch");
boolean consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
|| this.delegateType.equals(ListenerType.CONSUMER_AWARE);
/*
* An empty list goes to the listener if ackDiscarded is false and the listener can ack
* either through the acknowledgment
*/
if (consumerRecords.size() > 0 || consumerAware
|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING))) {

if (recordFilterStrategy.ignoreEmptyBatch() &&
consumerRecords.isEmpty() &&
acknowledgment != null) {
acknowledgment.acknowledge();
}
else if (!consumerRecords.isEmpty() || this.consumerAware
|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING))) {
invokeDelegate(consumerRecords, acknowledgment, consumer);
}
else {
Expand All @@ -98,6 +102,7 @@ public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgme

private void invokeDelegate(List<ConsumerRecord<K, V>> consumerRecords, Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {

switch (this.delegateType) {
case ACKNOWLEDGING_CONSUMER_AWARE:
this.delegate.onMessage(consumerRecords, acknowledgment, consumer);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,11 +16,12 @@

package org.springframework.kafka.listener.adapter;

import java.util.Iterator;
import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.kafka.listener.BatchMessageListener;

/**
* Implementations of this interface can signal that a record about
* to be delivered to a message listener should be discarded instead
Expand All @@ -30,7 +31,7 @@
* @param <V> the value type.
*
* @author Gary Russell
*
* @author Sanghyeok An
*/
public interface RecordFilterStrategy<K, V> {

Expand All @@ -49,13 +50,20 @@ public interface RecordFilterStrategy<K, V> {
* @since 2.8
*/
default List<ConsumerRecord<K, V>> filterBatch(List<ConsumerRecord<K, V>> records) {
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
while (iterator.hasNext()) {
if (filter(iterator.next())) {
iterator.remove();
}
}
records.removeIf(this::filter);
return records;
}

/**
* Determine whether {@link FilteringBatchMessageListenerAdapter} should invoke
* the {@link BatchMessageListener} when all {@link ConsumerRecord}s in a batch have been filtered out
* resulting in empty list. By default, do invoke the {@link BatchMessageListener} (return false).
* @return true for {@link FilteringBatchMessageListenerAdapter} to {@link BatchMessageListener}
* when all {@link ConsumerRecord} in a batch filtered out
* @since 3.3
*/
default boolean ignoreEmptyBatch() {
return false;
}

}
Loading

0 comments on commit f91f8a9

Please sign in to comment.