Skip to content

Commit

Permalink
GH-1547: Reset BackOff State on Recovery Failure
Browse files Browse the repository at this point in the history
Resolves #1547

It generally makes sense to repeat the back offs after a recovery failure
instead of attempting recovery immediately on the next delivery.

Add an option to revert to the previous behavior.

* Fix code typos in docs
# Conflicts:
#	src/reference/asciidoc/whats-new.adoc
  • Loading branch information
garyrussell authored and artembilan committed Jul 27, 2020
1 parent aca09ba commit 8182df8
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 8 deletions.
Expand Up @@ -107,6 +107,16 @@ public void setCommitRecovered(boolean commitRecovered) {
this.commitRecovered = commitRecovered;
}

/**
* Set to false to immediately attempt to recover on the next attempt instead
* of repeating the BackOff cycle when recovery fails.
* @param resetStateOnRecoveryFailure false to retain state.
* @since 3.5.5
*/
public void setResetStateOnRecoveryFailure(boolean resetStateOnRecoveryFailure) {
this.failureTracker.setResetStateOnRecoveryFailure(resetStateOnRecoveryFailure);
}

@Override
public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
return this.failureTracker.deliveryAttempt(topicPartitionOffset);
Expand Down
Expand Up @@ -48,6 +48,8 @@ class FailedRecordTracker {

private final BackOff backOff;

private boolean resetStateOnRecoveryFailure = true;

FailedRecordTracker(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
LogAccessor logger) {

Expand All @@ -73,9 +75,19 @@ class FailedRecordTracker {
this.backOff = backOff;
}

/**
* Set to false to immediately attempt to recover on the next attempt instead
* of repeating the BackOff cycle when recovery fails.
* @param resetStateOnRecoveryFailure false to retain state.
* @since 3.5.5
*/
public void setResetStateOnRecoveryFailure(boolean resetStateOnRecoveryFailure) {
this.resetStateOnRecoveryFailure = resetStateOnRecoveryFailure;
}

boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
if (this.noRetries) {
this.recoverer.accept(record, exception);
attemptRecovery(record, exception, null);
return true;
}
Map<TopicPartition, FailedRecord> map = this.failures.get();
Expand Down Expand Up @@ -103,7 +115,7 @@ boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
return false;
}
else {
this.recoverer.accept(record, exception);
attemptRecovery(record, exception, topicPartition);
map.remove(topicPartition);
if (map.isEmpty()) {
this.failures.remove();
Expand All @@ -112,6 +124,18 @@ boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
}
}

private void attemptRecovery(ConsumerRecord<?, ?> record, Exception exception, @Nullable TopicPartition tp) {
try {
this.recoverer.accept(record, exception);
}
catch (RuntimeException e) {
if (tp != null && this.resetStateOnRecoveryFailure) {
this.failures.get().remove(tp);
}
throw e;
}
}

void clearThreadState() {
this.failures.remove();
}
Expand Down
Expand Up @@ -179,6 +179,7 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {

};
RecoveringBatchErrorHandler errorHandler = new RecoveringBatchErrorHandler(recoverer, new FixedBackOff(0L, 1));
errorHandler.setResetStateOnRecoveryFailure(false);
container.setBatchErrorHandler(errorHandler);
final CountDownLatch stopLatch = new CountDownLatch(1);
container.setApplicationEventPublisher(e -> {
Expand Down
Expand Up @@ -17,10 +17,12 @@
package org.springframework.kafka.listener;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
Expand All @@ -36,6 +38,7 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

Expand Down Expand Up @@ -194,6 +197,70 @@ public void seekToCurrentErrorHandlerRecovers() {
verify(recoverer).accept(eq(records.get(0)), any());
}

@Test
public void seekToCurrentErrorHandlerRecovererFailsBackOffReset() {
@SuppressWarnings("unchecked")
BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = mock(BiConsumer.class);
AtomicBoolean fail = new AtomicBoolean(true);
willAnswer(incovation -> {
if (fail.getAndSet(false)) {
throw new RuntimeException("recovery failed");
}
return null;
}).given(recoverer).accept(any(), any());
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 1));
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo"));
records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar"));
Consumer<?, ?> consumer = mock(Consumer.class);
assertThatExceptionOfType(KafkaException.class).isThrownBy(
() -> eh.handle(new RuntimeException(), records, consumer, null));
verify(consumer).seek(new TopicPartition("foo", 0), 0L);
verifyNoMoreInteractions(consumer);
assertThatExceptionOfType(KafkaException.class).isThrownBy(
() -> eh.handle(new RuntimeException(), records, consumer, null));
verify(consumer, times(2)).seek(new TopicPartition("foo", 0), 0L);
assertThatExceptionOfType(KafkaException.class).isThrownBy(
() -> eh.handle(new RuntimeException(), records, consumer, null));
verify(consumer, times(3)).seek(new TopicPartition("foo", 0), 0L);
eh.handle(new RuntimeException(), records, consumer, null);
verify(consumer, times(3)).seek(new TopicPartition("foo", 0), 0L);
verify(consumer).seek(new TopicPartition("foo", 0), 1L);
verifyNoMoreInteractions(consumer);
verify(recoverer, times(2)).accept(eq(records.get(0)), any());
}

@Test
public void seekToCurrentErrorHandlerRecovererFailsBackOffNotReset() {
@SuppressWarnings("unchecked")
BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = mock(BiConsumer.class);
AtomicBoolean fail = new AtomicBoolean(true);
willAnswer(incovation -> {
if (fail.getAndSet(false)) {
throw new RuntimeException("recovery failed");
}
return null;
}).given(recoverer).accept(any(), any());
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 1));
eh.setResetStateOnRecoveryFailure(false);
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo"));
records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar"));
Consumer<?, ?> consumer = mock(Consumer.class);
assertThatExceptionOfType(KafkaException.class).isThrownBy(
() -> eh.handle(new RuntimeException(), records, consumer, null));
verify(consumer).seek(new TopicPartition("foo", 0), 0L);
verifyNoMoreInteractions(consumer);
assertThatExceptionOfType(KafkaException.class).isThrownBy(
() -> eh.handle(new RuntimeException(), records, consumer, null));
verify(consumer, times(2)).seek(new TopicPartition("foo", 0), 0L);
eh.handle(new RuntimeException(), records, consumer, null); // immediate re-attempt recovery
verify(consumer, times(2)).seek(new TopicPartition("foo", 0), 0L);
verify(consumer).seek(new TopicPartition("foo", 0), 1L);
verifyNoMoreInteractions(consumer);
verify(recoverer, times(2)).accept(eq(records.get(0)), any());
}

@Test
public void seekToCurrentErrorHandlerRecoversManualAcksAsync() {
seekToCurrentErrorHandlerRecoversManualAcks(false);
Expand Down
Expand Up @@ -622,6 +622,7 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
};
DefaultAfterRollbackProcessor<Object, Object> afterRollbackProcessor =
spy(new DefaultAfterRollbackProcessor<>(recoverer, new FixedBackOff(0L, 2L), dlTemplate, true));
afterRollbackProcessor.setResetStateOnRecoveryFailure(false);
container.setAfterRollbackProcessor(afterRollbackProcessor);
final CountDownLatch stopLatch = new CountDownLatch(1);
container.setApplicationEventPublisher(e -> {
Expand Down
29 changes: 23 additions & 6 deletions src/reference/asciidoc/kafka.adoc
Expand Up @@ -2128,7 +2128,10 @@ public SeekToCurrentErrorHandler eh() {

However, see the note at the beginning of this section; you can avoid using the `RetryTemplate` altogether.

IMPORTANT: If the recoverer fails (throws an exception), the record will be included in the seeks and recovery will be attempted again during the next delivery.
IMPORTANT: If the recoverer fails (throws an exception), the failed record will be included in the seeks.
Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again.
With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure.
To revert to the previous behavior, set the error handler's `resetStateOnRecoveryFailure` to `false`.

[[container-props]]
==== Listener Container Properties
Expand Down Expand Up @@ -4587,7 +4590,7 @@ Here is an example that adds `IllegalArgumentException` to the not-retryable exc
[source, java]
----
@Bean
public SeekToCurrentErrorHandler errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
public SeekToCurrentErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler(recoverer);
handler.addNotRetryableException(IllegalArgumentException.class);
return handler;
Expand Down Expand Up @@ -4618,7 +4621,10 @@ However, since this error handler has no mechanism to "recover" after retries ar
Again, the maximum delay must be less than the `max.poll.interval.ms` consumer property.
Also see <<retrying-batch-eh>>.

IMPORTANT: If the recoverer fails (throws an exception), the record will be included in the seeks and recovery will be attempted again during the next delivery.
IMPORTANT: If the recoverer fails (throws an exception), the failed record will be included in the seeks.
Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again.
With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure.
To revert to the previous behavior, set the error handler's `resetStateOnRecoveryFailure` to `false`.

Starting with version 2.3.2, after a record has been recovered, its offset will be committed (if one of the container `AckMode` s is configured).
To revert to the previous behavior, set the error handler's `ackAfterHandle` property to false.
Expand Down Expand Up @@ -4721,7 +4727,12 @@ public void listen(List<Thing> things) {
----
====

IMPORTANT: This error handler cannot be used with transactions.
IMPORTANT: This error handler cannot be used with transactions

IMPORTANT: If the recoverer fails (throws an exception), the failed record will be included in the seeks.
Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again.
With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure.
To revert to the previous behavior, set the error handler's `resetStateOnRecoveryFailure` to `false`.

===== Container Stopping Error Handlers

Expand Down Expand Up @@ -4773,7 +4784,10 @@ Starting with version 2.2.5, the `DefaultAfterRollbackProcessor` can be invoked
Then, if you are using the `DeadLetterPublishingRecoverer` to publish a failed record, the processor will send the recovered record's offset in the original topic/partition to the transaction.
To enable this feature, set the `commitRecovered` and `kafkaTemplate` properties on the `DefaultAfterRollbackProcessor`.

IMPORTANT: If the recoverer fails (throws an exception), the record will be included in the seeks and recovery will be attempted again during the next delivery.
IMPORTANT: If the recoverer fails (throws an exception), the failed record will be included in the seeks.
Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again.
With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure.
To revert to the previous behavior, set the processor's `resetStateOnRecoveryFailure` property to `false`.

Starting with version 2.3.1, similar to the `SeekToCurrentErrorHandler`, the `DefaultAfterRollbackProcessor` considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure.
The exceptions that are considered fatal, by default, are:
Expand Down Expand Up @@ -4905,7 +4919,10 @@ A `LinkedHashMap` is recommended so that the keys are examined in order.

When publishing `null` values, when there are multiple templates, the recoverer will look for a template for the `Void` class; if none is present, the first template from the `values().iterator()` will be used.

IMPORTANT: If the recoverer fails (throws an exception), the record will be included in the seeks and recovery will be attempted again during the next delivery.
IMPORTANT: If the recoverer fails (throws an exception), the failed record will be included in the seeks.
Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again.
With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure.
To revert to the previous behavior, set the error handler's `resetStateOnRecoveryFailure` property to `false`.

Starting with version 2.3, the recoverer can also be used with Kafka Streams - see <<streams-deser-recovery>> for more information.

Expand Down
4 changes: 4 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Expand Up @@ -77,6 +77,10 @@ See <<container-props>> for more information.
You can now suppress logging entire `ConsumerRecord` s in error, debug logs etc.
See `onlyLogRecordMetadata` in <<container-props>>.

Various error handlers (that extend `FailedRecordProcessor`) and the `DefaultAfterRollbackProcessor` now reset the `BackOff` if recovery fails.
See <<seek-to-current>>, <<recovering-batch-eh>>, <<dead-letters>> and <<after-rollback>> for more information.


[[x25-template]]
==== KafkaTemplate Changes

Expand Down

0 comments on commit 8182df8

Please sign in to comment.