diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java index 45e1313f6b..32aae467ad 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java @@ -59,6 +59,16 @@ public DefaultAfterRollbackProcessor() { this(null, SeekUtils.DEFAULT_MAX_FAILURES); } + /** + * Construct an instance with the default recoverer which simply logs the record after + * 'maxFailures' have occurred for a topic/partition/offset. + * @param maxFailures the maxFailures; a negative value is treated as infinity. + * @since 2.2.1 + */ + public DefaultAfterRollbackProcessor(int maxFailures) { + this(null, maxFailures); + } + /** * Construct an instance with the provided recoverer which will be called after * {@value SeekUtils#DEFAULT_MAX_FAILURES} (maxFailures) have occurred for a @@ -74,7 +84,7 @@ public DefaultAfterRollbackProcessor(BiConsumer, Exception> * Construct an instance with the provided recoverer which will be called after * maxFailures have occurred for a topic/partition/offset. * @param recoverer the recoverer; if null, the default (logging) recoverer is used. - * @param maxFailures the maxFailures. + * @param maxFailures the maxFailures; a negative value is treated as infinity. * @since 2.2 */ public DefaultAfterRollbackProcessor(@Nullable BiConsumer, Exception> recoverer, diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java index bd1ed5ec90..ff27f15c62 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java @@ -56,7 +56,7 @@ boolean skip(ConsumerRecord record, Exception exception) { return false; } else { - if (failedRecord.incrementAndGet() >= this.maxFailures) { + if (this.maxFailures >= 0 && failedRecord.incrementAndGet() >= this.maxFailures) { this.recoverer.accept(record, exception); return true; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java index 01c1d6848b..ccea758399 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java @@ -53,6 +53,16 @@ public SeekToCurrentErrorHandler() { this(null, SeekUtils.DEFAULT_MAX_FAILURES); } + /** + * Construct an instance with the default recoverer which simply logs the record after + * 'maxFailures' have occurred for a topic/partition/offset. + * @param maxFailures the maxFailures; a negative value is treated as infinity. + * @since 2.2.1 + */ + public SeekToCurrentErrorHandler(int maxFailures) { + this(null, maxFailures); + } + /** * Construct an instance with the provided recoverer which will be called after * {@value SeekUtils#DEFAULT_MAX_FAILURES} (maxFailures) have occurred for a @@ -68,7 +78,7 @@ public SeekToCurrentErrorHandler(BiConsumer, Exception> rec * Construct an instance with the provided recoverer which will be called after * maxFailures have occurred for a topic/partition/offset. * @param recoverer the recoverer; if null, the default (logging) recoverer is used. - * @param maxFailures the maxFailures. + * @param maxFailures the maxFailures; a negative value is treated as infinity. * @since 2.2 */ public SeekToCurrentErrorHandler(@Nullable BiConsumer, Exception> recoverer, int maxFailures) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java index e24202b9e4..653c6dc86e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java @@ -18,8 +18,12 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -29,6 +33,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -129,7 +134,9 @@ public void accept(ConsumerRecord record, Exception exception) { @Test public void seekToCurrentErrorHandlerRecovers() { - SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler((r, e) -> { }, 2); + @SuppressWarnings("unchecked") + BiConsumer, Exception> recoverer = mock(BiConsumer.class); + SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, 2); List> records = new ArrayList<>(); records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo")); records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar")); @@ -146,6 +153,30 @@ public void seekToCurrentErrorHandlerRecovers() { eh.handle(new RuntimeException(), records, consumer, null); verify(consumer).seek(new TopicPartition("foo", 0), 1L); verifyNoMoreInteractions(consumer); + verify(recoverer).accept(eq(records.get(0)), any()); + } + + @Test + public void testNeverRecover() { + @SuppressWarnings("unchecked") + BiConsumer, Exception> recoverer = mock(BiConsumer.class); + SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, -1); + List> records = new ArrayList<>(); + records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo")); + records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar")); + Consumer consumer = mock(Consumer.class); + for (int i = 0; i < 20; i++) { + try { + eh.handle(new RuntimeException(), records, consumer, null); + fail("Expected exception"); + } + catch (KafkaException e) { + // NOSONAR + } + } + verify(consumer, times(20)).seek(new TopicPartition("foo", 0), 0L); + verifyNoMoreInteractions(consumer); + verify(recoverer, never()).accept(any(), any()); } } diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index 8a38cbf4c1..34fac8dc9c 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -2436,6 +2436,7 @@ If the `AckMode` was `BATCH`, the container commits the offsets for the first 2 Starting with version 2.2, the `SeekToCurrentErrorHandler` can now recover (skip) a record that keeps failing. By default, after 10 failures, the failed record will be logged (ERROR). You can configure the handler with a custom recoverer (`BiConsumer`) and/or max failures. +Setting the `maxFailures` property to a negative number will cause infinite retries. ==== [source, java] @@ -2482,6 +2483,7 @@ For example, with a record-based listener, you might want to keep track of the f Starting with version 2.2, the `DefaultAfterRollbackProcessor` can now recover (skip) a record that keeps failing. By default, after 10 failures, the failed record will be logged (ERROR). You can configure the processor with a custom recoverer (`BiConsumer`) and/or max failures. +Setting the `maxFailures` property to a negative number will cause infinite retries. ==== [source, java]