diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java index 42c8e9294a..4f46260485 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.BiPredicate; import org.apache.commons.logging.LogFactory; @@ -108,6 +109,17 @@ public void setCommitRecovered(boolean commitRecovered) { this.commitRecovered = commitRecovered; } + /** + * Set a function to dynamically determine the {@link BackOff} to use, based on the + * consumer record and/or exception. If null is returned, the default BackOff will be + * used. + * @param backOffFunction the function. + * @since 2.6 + */ + public void setBackOffFunction(BiFunction, Exception, BackOff> backOffFunction) { + this.failureTracker.setBackOffFunction(backOffFunction); + } + /** * Set to false to immediately attempt to recover on the next attempt instead * of repeating the BackOff cycle when recovery fails. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java index ee6a564790..d4594a82d3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; @@ -48,6 +49,8 @@ class FailedRecordTracker { private final BackOff backOff; + private BiFunction, Exception, BackOff> backOffFunction; + private boolean resetStateOnRecoveryFailure = true; FailedRecordTracker(@Nullable BiConsumer, Exception> recoverer, BackOff backOff, @@ -75,6 +78,17 @@ class FailedRecordTracker { this.backOff = backOff; } + /** + * Set a function to dynamically determine the {@link BackOff} to use, based on the + * consumer record and/or exception. If null is returned, the default BackOff will be + * used. + * @param backOffFunction the function. + * @since 2.6 + */ + public void setBackOffFunction(BiFunction, Exception, BackOff> backOffFunction) { + this.backOffFunction = backOffFunction; + } + /** * Set to false to immediately attempt to recover on the next attempt instead * of repeating the BackOff cycle when recovery fails. @@ -98,7 +112,7 @@ boolean skip(ConsumerRecord record, Exception exception) { TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); FailedRecord failedRecord = map.get(topicPartition); if (failedRecord == null || failedRecord.getOffset() != record.offset()) { - failedRecord = new FailedRecord(record.offset(), this.backOff.start()); + failedRecord = new FailedRecord(record.offset(), determineBackOff(record, exception).start()); map.put(topicPartition, failedRecord); } else { @@ -124,6 +138,14 @@ boolean skip(ConsumerRecord record, Exception exception) { } } + private BackOff determineBackOff(ConsumerRecord record, Exception exception) { + if (this.backOffFunction == null) { + return this.backOff; + } + BackOff backOff = this.backOffFunction.apply(record, exception); + return backOff != null ? backOff : this.backOff; + } + private void attemptRecovery(ConsumerRecord record, Exception exception, @Nullable TopicPartition tp) { try { this.recoverer.accept(record, exception); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedRecordTrackerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedRecordTrackerTests.java index 106599c01b..9c3b19ad7c 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedRecordTrackerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedRecordTrackerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2019-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,16 +17,22 @@ package org.springframework.kafka.listener; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.BackOffExecution; import org.springframework.util.backoff.FixedBackOff; /** @@ -37,7 +43,7 @@ public class FailedRecordTrackerTests { @Test - public void testNoRetries() { + void testNoRetries() { AtomicBoolean recovered = new AtomicBoolean(); FailedRecordTracker tracker = new FailedRecordTracker((r, e) -> { recovered.set(true); @@ -48,7 +54,7 @@ public void testNoRetries() { } @Test - public void testThreeRetries() { + void testThreeRetries() { AtomicBoolean recovered = new AtomicBoolean(); FailedRecordTracker tracker = new FailedRecordTracker((r, e) -> { recovered.set(true); @@ -62,7 +68,7 @@ public void testThreeRetries() { } @Test - public void testSuccessAfterFailure() { + void testSuccessAfterFailure() { FailedRecordTracker tracker = new FailedRecordTracker(null, new FixedBackOff(0L, 1L), mock(LogAccessor.class)); ConsumerRecord record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz"); assertThat(tracker.skip(record, new RuntimeException())).isFalse(); @@ -77,7 +83,7 @@ record = new ConsumerRecord<>("bar", 1, 1L, "bar", "baz"); } @Test - public void testDifferentOrder() { + void testDifferentOrder() { List> records = new ArrayList<>(); FailedRecordTracker tracker = new FailedRecordTracker((rec, ex) -> { records.add(rec); @@ -93,4 +99,36 @@ public void testDifferentOrder() { assertThat(records).hasSize(2); } + @Test + void multiBackOffs() { + BackOff bo1 = mock(BackOff.class); + BackOffExecution be1 = mock(BackOffExecution.class); + given(bo1.start()).willReturn(be1); + BackOff bo2 = mock(BackOff.class); + BackOffExecution be2 = mock(BackOffExecution.class); + given(bo2.start()).willReturn(be2); + FailedRecordTracker tracker = new FailedRecordTracker((rec, ex) -> { }, bo1, mock(LogAccessor.class)); + tracker.setBackOffFunction((record, ex) -> { + if (record.topic().equals("foo")) { + return bo2; + } + else { + return null; + } + }); + @SuppressWarnings("unchecked") + ThreadLocal> failures = (ThreadLocal>) KafkaTestUtils + .getPropertyValue(tracker, "failures"); + ConsumerRecord record1 = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz"); + tracker.skip(record1, new RuntimeException()); + assertThat(KafkaTestUtils.getPropertyValue(failures.get() + .get(new TopicPartition("foo", 0)), "backOffExecution")) + .isSameAs(be2); + ConsumerRecord record2 = new ConsumerRecord<>("bar", 0, 0L, "bar", "baz"); + tracker.skip(record2, new RuntimeException()); + assertThat(KafkaTestUtils.getPropertyValue(failures.get() + .get(new TopicPartition("bar", 0)), "backOffExecution")) + .isSameAs(be1); + } + } diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index e164a39d02..6472f7874a 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -2159,6 +2159,17 @@ Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure. To revert to the previous behavior, set the error handler's `resetStateOnRecoveryFailure` to `false`. +Starting with version 2.6, you can now provide the error handler with a `BiFunction, Exception, BackOff>` to determine the `BackOff` to use, based on the failed record and/or the exception: + +==== +[source, java] +---- +handler.setBackOffFunction((record, ex) -> { ... }); +---- +==== + +If the function returns `null`, the handler's default `BackOff` will be used. + [[container-props]] ==== Listener Container Properties @@ -4654,6 +4665,17 @@ To revert to the previous behavior, set the error handler's `resetStateOnRecover Starting with version 2.3.2, after a record has been recovered, its offset will be committed (if one of the container `AckMode` s is configured). To revert to the previous behavior, set the error handler's `ackAfterHandle` property to false. +Starting with version 2.6, you can now provide the error handler with a `BiFunction, Exception, BackOff>` to determine the `BackOff` to use, based on the failed record and/or the exception: + +==== +[source, java] +---- +handler.setBackOffFunction((record, ex) -> { ... }); +---- +==== + +If the function returns `null`, the handler's default `BackOff` will be used. + Also see <>. [[retrying-batch-eh]] @@ -4759,6 +4781,17 @@ Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure. To revert to the previous behavior, set the error handler's `resetStateOnRecoveryFailure` to `false`. +Starting with version 2.6, you can now provide the error handler with a `BiFunction, Exception, BackOff>` to determine the `BackOff` to use, based on the failed record and/or the exception: + +==== +[source, java] +---- +handler.setBackOffFunction((record, ex) -> { ... }); +---- +==== + +If the function returns `null`, the handler's default `BackOff` will be used. + ===== Container Stopping Error Handlers The `ContainerStoppingErrorHandler` (used with record listeners) stops the container if the listener throws an exception. @@ -4814,6 +4847,17 @@ Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure. To revert to the previous behavior, set the processor's `resetStateOnRecoveryFailure` property to `false`. +Starting with version 2.6, you can now provide the processor with a `BiFunction, Exception, BackOff>` to determine the `BackOff` to use, based on the failed record and/or the exception: + +==== +[source, java] +---- +handler.setBackOffFunction((record, ex) -> { ... }); +---- +==== + +If the function returns `null`, the processor's default `BackOff` will be used. + Starting with version 2.3.1, similar to the `SeekToCurrentErrorHandler`, the `DefaultAfterRollbackProcessor` considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure. The exceptions that are considered fatal, by default, are: diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index b0733f9a65..152f6b5c56 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -14,6 +14,7 @@ The default `EOSMode` is now `BETA`. See <> for more information. Various error handlers (that extend `FailedRecordProcessor`) and the `DefaultAfterRollbackProcessor` now reset the `BackOff` if recovery fails. +In addition, you can now select the `BackOff` to use based on the failed record and/or exception. See <>, <>, <> and <> for more information. ==== @KafkaLisener Changes