From 8182df8fea2a085ad6fca2339568fc7354518117 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 27 Jul 2020 13:24:01 -0400 Subject: [PATCH] GH-1547: Reset BackOff State on Recovery Failure Resolves https://github.com/spring-projects/spring-kafka/issues/1547 It generally makes sense to repeat the back offs after a recovery failure instead of attempting recovery immediately on the next delivery. Add an option to revert to the previous behavior. * Fix code typos in docs # Conflicts: # src/reference/asciidoc/whats-new.adoc --- .../kafka/listener/FailedRecordProcessor.java | 10 +++ .../kafka/listener/FailedRecordTracker.java | 28 +++++++- ...ringBatchErrorHandlerIntegrationTests.java | 1 + .../listener/SeekToCurrentRecovererTests.java | 67 +++++++++++++++++++ .../listener/TransactionalContainerTests.java | 1 + src/reference/asciidoc/kafka.adoc | 29 ++++++-- src/reference/asciidoc/whats-new.adoc | 4 ++ 7 files changed, 132 insertions(+), 8 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java index d8b8093eec..6029ae3971 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java @@ -107,6 +107,16 @@ public void setCommitRecovered(boolean commitRecovered) { this.commitRecovered = commitRecovered; } + /** + * Set to false to immediately attempt to recover on the next attempt instead + * of repeating the BackOff cycle when recovery fails. + * @param resetStateOnRecoveryFailure false to retain state. + * @since 3.5.5 + */ + public void setResetStateOnRecoveryFailure(boolean resetStateOnRecoveryFailure) { + this.failureTracker.setResetStateOnRecoveryFailure(resetStateOnRecoveryFailure); + } + @Override public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) { return this.failureTracker.deliveryAttempt(topicPartitionOffset); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java index f558951aee..7405123cfe 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java @@ -48,6 +48,8 @@ class FailedRecordTracker { private final BackOff backOff; + private boolean resetStateOnRecoveryFailure = true; + FailedRecordTracker(@Nullable BiConsumer, Exception> recoverer, BackOff backOff, LogAccessor logger) { @@ -73,9 +75,19 @@ class FailedRecordTracker { this.backOff = backOff; } + /** + * Set to false to immediately attempt to recover on the next attempt instead + * of repeating the BackOff cycle when recovery fails. + * @param resetStateOnRecoveryFailure false to retain state. + * @since 3.5.5 + */ + public void setResetStateOnRecoveryFailure(boolean resetStateOnRecoveryFailure) { + this.resetStateOnRecoveryFailure = resetStateOnRecoveryFailure; + } + boolean skip(ConsumerRecord record, Exception exception) { if (this.noRetries) { - this.recoverer.accept(record, exception); + attemptRecovery(record, exception, null); return true; } Map map = this.failures.get(); @@ -103,7 +115,7 @@ boolean skip(ConsumerRecord record, Exception exception) { return false; } else { - this.recoverer.accept(record, exception); + attemptRecovery(record, exception, topicPartition); map.remove(topicPartition); if (map.isEmpty()) { this.failures.remove(); @@ -112,6 +124,18 @@ boolean skip(ConsumerRecord record, Exception exception) { } } + private void attemptRecovery(ConsumerRecord record, Exception exception, @Nullable TopicPartition tp) { + try { + this.recoverer.accept(record, exception); + } + catch (RuntimeException e) { + if (tp != null && this.resetStateOnRecoveryFailure) { + this.failures.get().remove(tp); + } + throw e; + } + } + void clearThreadState() { this.failures.remove(); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/RecoveringBatchErrorHandlerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/RecoveringBatchErrorHandlerIntegrationTests.java index cd9c915b74..88c6372b9c 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/RecoveringBatchErrorHandlerIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/RecoveringBatchErrorHandlerIntegrationTests.java @@ -179,6 +179,7 @@ public void accept(ConsumerRecord record, Exception exception) { }; RecoveringBatchErrorHandler errorHandler = new RecoveringBatchErrorHandler(recoverer, new FixedBackOff(0L, 1)); + errorHandler.setResetStateOnRecoveryFailure(false); container.setBatchErrorHandler(errorHandler); final CountDownLatch stopLatch = new CountDownLatch(1); container.setApplicationEventPublisher(e -> { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java index 0d35a52d94..c662893f06 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java @@ -17,10 +17,12 @@ package org.springframework.kafka.listener; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -36,6 +38,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -194,6 +197,70 @@ public void seekToCurrentErrorHandlerRecovers() { verify(recoverer).accept(eq(records.get(0)), any()); } + @Test + public void seekToCurrentErrorHandlerRecovererFailsBackOffReset() { + @SuppressWarnings("unchecked") + BiConsumer, Exception> recoverer = mock(BiConsumer.class); + AtomicBoolean fail = new AtomicBoolean(true); + willAnswer(incovation -> { + if (fail.getAndSet(false)) { + throw new RuntimeException("recovery failed"); + } + return null; + }).given(recoverer).accept(any(), any()); + SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 1)); + List> records = new ArrayList<>(); + records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo")); + records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar")); + Consumer consumer = mock(Consumer.class); + assertThatExceptionOfType(KafkaException.class).isThrownBy( + () -> eh.handle(new RuntimeException(), records, consumer, null)); + verify(consumer).seek(new TopicPartition("foo", 0), 0L); + verifyNoMoreInteractions(consumer); + assertThatExceptionOfType(KafkaException.class).isThrownBy( + () -> eh.handle(new RuntimeException(), records, consumer, null)); + verify(consumer, times(2)).seek(new TopicPartition("foo", 0), 0L); + assertThatExceptionOfType(KafkaException.class).isThrownBy( + () -> eh.handle(new RuntimeException(), records, consumer, null)); + verify(consumer, times(3)).seek(new TopicPartition("foo", 0), 0L); + eh.handle(new RuntimeException(), records, consumer, null); + verify(consumer, times(3)).seek(new TopicPartition("foo", 0), 0L); + verify(consumer).seek(new TopicPartition("foo", 0), 1L); + verifyNoMoreInteractions(consumer); + verify(recoverer, times(2)).accept(eq(records.get(0)), any()); + } + + @Test + public void seekToCurrentErrorHandlerRecovererFailsBackOffNotReset() { + @SuppressWarnings("unchecked") + BiConsumer, Exception> recoverer = mock(BiConsumer.class); + AtomicBoolean fail = new AtomicBoolean(true); + willAnswer(incovation -> { + if (fail.getAndSet(false)) { + throw new RuntimeException("recovery failed"); + } + return null; + }).given(recoverer).accept(any(), any()); + SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 1)); + eh.setResetStateOnRecoveryFailure(false); + List> records = new ArrayList<>(); + records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo")); + records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar")); + Consumer consumer = mock(Consumer.class); + assertThatExceptionOfType(KafkaException.class).isThrownBy( + () -> eh.handle(new RuntimeException(), records, consumer, null)); + verify(consumer).seek(new TopicPartition("foo", 0), 0L); + verifyNoMoreInteractions(consumer); + assertThatExceptionOfType(KafkaException.class).isThrownBy( + () -> eh.handle(new RuntimeException(), records, consumer, null)); + verify(consumer, times(2)).seek(new TopicPartition("foo", 0), 0L); + eh.handle(new RuntimeException(), records, consumer, null); // immediate re-attempt recovery + verify(consumer, times(2)).seek(new TopicPartition("foo", 0), 0L); + verify(consumer).seek(new TopicPartition("foo", 0), 1L); + verifyNoMoreInteractions(consumer); + verify(recoverer, times(2)).accept(eq(records.get(0)), any()); + } + @Test public void seekToCurrentErrorHandlerRecoversManualAcksAsync() { seekToCurrentErrorHandlerRecoversManualAcks(false); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java index 4dd1760380..00b3e632da 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java @@ -622,6 +622,7 @@ public void accept(ConsumerRecord record, Exception exception) { }; DefaultAfterRollbackProcessor afterRollbackProcessor = spy(new DefaultAfterRollbackProcessor<>(recoverer, new FixedBackOff(0L, 2L), dlTemplate, true)); + afterRollbackProcessor.setResetStateOnRecoveryFailure(false); container.setAfterRollbackProcessor(afterRollbackProcessor); final CountDownLatch stopLatch = new CountDownLatch(1); container.setApplicationEventPublisher(e -> { diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index 3ee656c046..629bfeb69d 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -2128,7 +2128,10 @@ public SeekToCurrentErrorHandler eh() { However, see the note at the beginning of this section; you can avoid using the `RetryTemplate` altogether. -IMPORTANT: If the recoverer fails (throws an exception), the record will be included in the seeks and recovery will be attempted again during the next delivery. +IMPORTANT: If the recoverer fails (throws an exception), the failed record will be included in the seeks. +Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again. +With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure. +To revert to the previous behavior, set the error handler's `resetStateOnRecoveryFailure` to `false`. [[container-props]] ==== Listener Container Properties @@ -4587,7 +4590,7 @@ Here is an example that adds `IllegalArgumentException` to the not-retryable exc [source, java] ---- @Bean -public SeekToCurrentErrorHandler errorHandler(BiConsumer, Exception> recoverer) { +public SeekToCurrentErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) { SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler(recoverer); handler.addNotRetryableException(IllegalArgumentException.class); return handler; @@ -4618,7 +4621,10 @@ However, since this error handler has no mechanism to "recover" after retries ar Again, the maximum delay must be less than the `max.poll.interval.ms` consumer property. Also see <>. -IMPORTANT: If the recoverer fails (throws an exception), the record will be included in the seeks and recovery will be attempted again during the next delivery. +IMPORTANT: If the recoverer fails (throws an exception), the failed record will be included in the seeks. +Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again. +With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure. +To revert to the previous behavior, set the error handler's `resetStateOnRecoveryFailure` to `false`. Starting with version 2.3.2, after a record has been recovered, its offset will be committed (if one of the container `AckMode` s is configured). To revert to the previous behavior, set the error handler's `ackAfterHandle` property to false. @@ -4721,7 +4727,12 @@ public void listen(List things) { ---- ==== -IMPORTANT: This error handler cannot be used with transactions. +IMPORTANT: This error handler cannot be used with transactions + +IMPORTANT: If the recoverer fails (throws an exception), the failed record will be included in the seeks. +Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again. +With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure. +To revert to the previous behavior, set the error handler's `resetStateOnRecoveryFailure` to `false`. ===== Container Stopping Error Handlers @@ -4773,7 +4784,10 @@ Starting with version 2.2.5, the `DefaultAfterRollbackProcessor` can be invoked Then, if you are using the `DeadLetterPublishingRecoverer` to publish a failed record, the processor will send the recovered record's offset in the original topic/partition to the transaction. To enable this feature, set the `commitRecovered` and `kafkaTemplate` properties on the `DefaultAfterRollbackProcessor`. -IMPORTANT: If the recoverer fails (throws an exception), the record will be included in the seeks and recovery will be attempted again during the next delivery. +IMPORTANT: If the recoverer fails (throws an exception), the failed record will be included in the seeks. +Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again. +With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure. +To revert to the previous behavior, set the processor's `resetStateOnRecoveryFailure` property to `false`. Starting with version 2.3.1, similar to the `SeekToCurrentErrorHandler`, the `DefaultAfterRollbackProcessor` considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure. The exceptions that are considered fatal, by default, are: @@ -4905,7 +4919,10 @@ A `LinkedHashMap` is recommended so that the keys are examined in order. When publishing `null` values, when there are multiple templates, the recoverer will look for a template for the `Void` class; if none is present, the first template from the `values().iterator()` will be used. -IMPORTANT: If the recoverer fails (throws an exception), the record will be included in the seeks and recovery will be attempted again during the next delivery. +IMPORTANT: If the recoverer fails (throws an exception), the failed record will be included in the seeks. +Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again. +With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure. +To revert to the previous behavior, set the error handler's `resetStateOnRecoveryFailure` property to `false`. Starting with version 2.3, the recoverer can also be used with Kafka Streams - see <> for more information. diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 52348c69d6..27abe17605 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -77,6 +77,10 @@ See <> for more information. You can now suppress logging entire `ConsumerRecord` s in error, debug logs etc. See `onlyLogRecordMetadata` in <>. +Various error handlers (that extend `FailedRecordProcessor`) and the `DefaultAfterRollbackProcessor` now reset the `BackOff` if recovery fails. +See <>, <>, <> and <> for more information. + + [[x25-template]] ==== KafkaTemplate Changes