Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2806 : Receiving an empty list when using RecordFilterStrategy on batch messages #3216

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,62 @@ public void listen(Thing thing) {
}
----

### Ignore empty batch when you use `batch` mode.
artembilan marked this conversation as resolved.
Show resolved Hide resolved
Starting with version `3.3.0`, we support ignoring empty batch that result from filtering by `RecordFilterStrategy`.
artembilan marked this conversation as resolved.
Show resolved Hide resolved
When implementing `RecordFilterStrategy`, you can configure it through `ignoreEmptyBatch()`. default is `false`, that means `KafkaListener` will be invoked even if all `ConsumerRecord` are filtered out.
artembilan marked this conversation as resolved.
Show resolved Hide resolved

If `true` is returned, the `KafkaListener` will not be invoked when all `ConsumerRecord` s are filtered out. However, commit to broker, will still be executed.
If `false` is returned, the `KafkaListener` will be invoked when all `ConsumerRecord` s are filtered out.

Let's look some examples.
artembilan marked this conversation as resolved.
Show resolved Hide resolved

[source,java]
----
public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}

@Override
public boolean ignoreEmptyBatch() {
return true;
}
};

// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(Thing thing) {
...
}
----
In this case, `IgnoreEmptyBatchRecordFilterStrategy` always returns empty list and return `true` as result of `ignoreEmptyBatch()`.
thus `KafkaListener#listen(...)` never will be invoked at all.


[source,java]
----
public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}

@Override
public boolean ignoreEmptyBatch() {
return false;
}
};

// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
public void listen(Thing thing) {
chickenchickenlove marked this conversation as resolved.
Show resolved Hide resolved
...
}
----
However, in this case, `IgnoreEmptyBatchRecordFilterStrategy` always returns empty list and return `false` as result of `ignoreEmptyBatch()`.
thus `KafkaListener#listen(...)` always will be invoked.
artembilan marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,6 +36,7 @@
* @param <V> the value type.
*
* @author Gary Russell
* @author Sanghyeok An
*
*/
public class FilteringBatchMessageListenerAdapter<K, V>
Expand All @@ -44,16 +45,16 @@ public class FilteringBatchMessageListenerAdapter<K, V>

private final boolean ackDiscarded;

private final boolean consumerAware;

/**
* Create an instance with the supplied strategy and delegate listener.
* @param delegate the delegate.
* @param recordFilterStrategy the filter.
*/
public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,
RecordFilterStrategy<K, V> recordFilterStrategy) {

super(delegate, recordFilterStrategy);
this.ackDiscarded = false;
this(delegate, recordFilterStrategy, false);
}

/**
Expand All @@ -71,22 +72,24 @@ public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,

super(delegate, recordFilterStrategy);
this.ackDiscarded = ackDiscarded;
this.consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) || this.delegateType.equals(ListenerType.CONSUMER_AWARE);
artembilan marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {

List<ConsumerRecord<K, V>> consumerRecords = getRecordFilterStrategy().filterBatch(records);
final RecordFilterStrategy<K, V> recordFilterStrategy = getRecordFilterStrategy();
final List<ConsumerRecord<K, V>> consumerRecords = recordFilterStrategy.filterBatch(records);
Assert.state(consumerRecords != null, "filter returned null from filterBatch");
boolean consumerAware = this.delegateType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
|| this.delegateType.equals(ListenerType.CONSUMER_AWARE);
/*
* An empty list goes to the listener if ackDiscarded is false and the listener can ack
* either through the acknowledgment
*/
if (consumerRecords.size() > 0 || consumerAware
|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING))) {

if (recordFilterStrategy.ignoreEmptyBatch() &&
consumerRecords.isEmpty() &&
acknowledgment != null) {
acknowledgment.acknowledge();
}
else if (consumerRecords.size() > 0 || this.consumerAware
|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING))) {
invokeDelegate(consumerRecords, acknowledgment, consumer);
}
else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,8 @@

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.kafka.listener.BatchMessageListener;

/**
* Implementations of this interface can signal that a record about
* to be delivered to a message listener should be discarded instead
Expand All @@ -30,7 +32,7 @@
* @param <V> the value type.
*
* @author Gary Russell
*
* @author Sanghyeok An
*/
public interface RecordFilterStrategy<K, V> {

Expand Down Expand Up @@ -58,4 +60,14 @@ default List<ConsumerRecord<K, V>> filterBatch(List<ConsumerRecord<K, V>> record
return records;
}

/**
* Returns a boolean value that determines whether {@link FilteringBatchMessageListenerAdapter} invoke the {@link BatchMessageListener} when all {@link ConsumerRecord}
artembilan marked this conversation as resolved.
Show resolved Hide resolved
* have been filtered and return an EmptyList. The default is not to invoke the {@link BatchMessageListener} (false).
artembilan marked this conversation as resolved.
Show resolved Hide resolved
* @return If true is returned, the {@link FilteringBatchMessageListenerAdapter} will not invoke the {@link BatchMessageListener}
* @since 3.3
*/
default boolean ignoreEmptyBatch() {
artembilan marked this conversation as resolved.
Show resolved Hide resolved
return false;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.only;
import static org.mockito.Mockito.verify;

import java.util.ArrayList;
Expand All @@ -29,13 +30,15 @@
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.Test;

import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;

/**
* @author Gary Russell
* @author Sanghyeok An
* @since 2.0
*
*/
Expand Down Expand Up @@ -72,4 +75,173 @@ public void testBatchFilterAckDiscard() throws Exception {
verify(listener, never()).onMessage(any(List.class), any(Acknowledgment.class));
}

@Test
public void listener_should_not_be_invoked_on_emptyList_and_ignoreEmptyBatch_true() throws Exception {
// Given :
final RecordFilterStrategy<String, String> filter = new RecordFilterStrategy<>() {
@Override
public boolean filter(ConsumerRecord<String, String> consumerRecord) {
return true;
}

@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
// SUT
return List.of();
}

@Override
public boolean ignoreEmptyBatch() {
// SUT
artembilan marked this conversation as resolved.
Show resolved Hide resolved
return true;
}
};

final BatchAcknowledgingMessageListener<String, String> listener = mock(BatchAcknowledgingMessageListener.class);
final FilteringBatchMessageListenerAdapter<String, String> adapter =
new FilteringBatchMessageListenerAdapter<String, String>(listener, filter);
final List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();
final Acknowledgment ack = mock(Acknowledgment.class);

// When :
adapter.onMessage(consumerRecords, ack, null);

// Then
verify(ack, only()).acknowledge();
verify(listener, never()).onMessage(any(List.class), any(Acknowledgment.class), any(KafkaConsumer.class));
verify(listener, never()).onMessage(any(List.class), any(Acknowledgment.class));
verify(listener, never()).onMessage(any(List.class), any(KafkaConsumer.class));
verify(listener, never()).onMessage(any(List.class));
}

@Test
public void listener_should_be_invoked_on_notEmptyList_and_ignoreEmptyBatch_true() throws Exception {
// Given :
final RecordFilterStrategy<String, String> filter = new RecordFilterStrategy<>() {
@Override
public boolean filter(ConsumerRecord<String, String> consumerRecord) {
return true;
}

@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
// SUT
return consumerRecords;
}

@Override
public boolean ignoreEmptyBatch() {
// SUT
return true;
}
};

final BatchAcknowledgingMessageListener<String, String> listener = mock(BatchAcknowledgingMessageListener.class);
final FilteringBatchMessageListenerAdapter<String, String> adapter =
new FilteringBatchMessageListenerAdapter<String, String>(listener, filter);
final List<ConsumerRecord<String, String>> consumerRecords = List.of(new ConsumerRecord<>("hello-topic", 1, 1, "hello-key", "hello-value"));
final Acknowledgment ack = mock(Acknowledgment.class);

final CountDownLatch latch = new CountDownLatch(1);
willAnswer(i -> {
latch.countDown();
return null;
}).given(listener).onMessage(any(List.class), any(Acknowledgment.class));

// When :
adapter.onMessage(consumerRecords, ack, null);

// Then
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
verify(ack, never()).acknowledge();
}

@Test
public void listener_should_be_invoked_on_emptyList_and_ignoreEmptyBatch_false() throws Exception {
// Given :
final RecordFilterStrategy<String, String> filter = new RecordFilterStrategy<>() {
@Override
public boolean filter(ConsumerRecord<String, String> consumerRecord) {
return true;
}

@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
// SUT
return List.of();
}

@Override
public boolean ignoreEmptyBatch() {
// SUT
return false;
artembilan marked this conversation as resolved.
Show resolved Hide resolved
}
};

final BatchAcknowledgingMessageListener<String, String> listener = mock(BatchAcknowledgingMessageListener.class);
final FilteringBatchMessageListenerAdapter<String, String> adapter =
new FilteringBatchMessageListenerAdapter<String, String>(listener, filter);
final List<ConsumerRecord<String, String>> consumerRecords = new ArrayList<>();
final Acknowledgment ack = mock(Acknowledgment.class);

final CountDownLatch latch = new CountDownLatch(1);
willAnswer(i -> {
latch.countDown();
return null;
}).given(listener).onMessage(any(List.class), any(Acknowledgment.class));

// When :
adapter.onMessage(consumerRecords, ack, null);

// Then
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
verify(ack, never()).acknowledge();
}

@Test
public void listener_should_be_invoked_on_notEmptyList_and_ignoreEmptyBatch_false() throws Exception {
// Given :
final RecordFilterStrategy<String, String> filter = new RecordFilterStrategy<>() {
@Override
public boolean filter(ConsumerRecord<String, String> consumerRecord) {

return true;
}

@Override
public List<ConsumerRecord<String, String>> filterBatch(
// SUT
List<ConsumerRecord<String, String>> consumerRecords) {
return consumerRecords;
}

@Override
public boolean ignoreEmptyBatch() {
// SUT
return false;
artembilan marked this conversation as resolved.
Show resolved Hide resolved
}
};

final BatchAcknowledgingMessageListener<String, String> listener = mock(BatchAcknowledgingMessageListener.class);
final FilteringBatchMessageListenerAdapter<String, String> adapter =
new FilteringBatchMessageListenerAdapter<String, String>(listener, filter);
final List<ConsumerRecord<String, String>> consumerRecords = List.of(new ConsumerRecord<>("hello-topic", 1, 1, "hello-key", "hello-value"));
final Acknowledgment ack = mock(Acknowledgment.class);

final CountDownLatch latch = new CountDownLatch(1);
willAnswer(i -> {
latch.countDown();
return null;
}).given(listener).onMessage(any(List.class), any(Acknowledgment.class));

// When :
adapter.onMessage(consumerRecords, ack, null);

// Then
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
verify(ack, never()).acknowledge();
}
artembilan marked this conversation as resolved.
Show resolved Hide resolved
}