Skip to content

Commit

Permalink
GH-885: Restore infinite retries STCEH
Browse files Browse the repository at this point in the history
Resolves #885

`SeekToCurrentErrorHandler` and `DefaultAfterRollbackProcessor`,
with negative `maxFailures`, now never recovers and continues to retry
indefinitely.
  • Loading branch information
garyrussell authored and artembilan committed Nov 21, 2018
1 parent 04e9e1a commit 0a953b0
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 4 deletions.
Expand Up @@ -59,6 +59,16 @@ public DefaultAfterRollbackProcessor() {
this(null, SeekUtils.DEFAULT_MAX_FAILURES);
}

/**
* Construct an instance with the default recoverer which simply logs the record after
* 'maxFailures' have occurred for a topic/partition/offset.
* @param maxFailures the maxFailures; a negative value is treated as infinity.
* @since 2.2.1
*/
public DefaultAfterRollbackProcessor(int maxFailures) {
this(null, maxFailures);
}

/**
* Construct an instance with the provided recoverer which will be called after
* {@value SeekUtils#DEFAULT_MAX_FAILURES} (maxFailures) have occurred for a
Expand All @@ -74,7 +84,7 @@ public DefaultAfterRollbackProcessor(BiConsumer<ConsumerRecord<?, ?>, Exception>
* Construct an instance with the provided recoverer which will be called after
* maxFailures have occurred for a topic/partition/offset.
* @param recoverer the recoverer; if null, the default (logging) recoverer is used.
* @param maxFailures the maxFailures.
* @param maxFailures the maxFailures; a negative value is treated as infinity.
* @since 2.2
*/
public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer,
Expand Down
Expand Up @@ -56,7 +56,7 @@ boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
return false;
}
else {
if (failedRecord.incrementAndGet() >= this.maxFailures) {
if (this.maxFailures >= 0 && failedRecord.incrementAndGet() >= this.maxFailures) {
this.recoverer.accept(record, exception);
return true;
}
Expand Down
Expand Up @@ -53,6 +53,16 @@ public SeekToCurrentErrorHandler() {
this(null, SeekUtils.DEFAULT_MAX_FAILURES);
}

/**
* Construct an instance with the default recoverer which simply logs the record after
* 'maxFailures' have occurred for a topic/partition/offset.
* @param maxFailures the maxFailures; a negative value is treated as infinity.
* @since 2.2.1
*/
public SeekToCurrentErrorHandler(int maxFailures) {
this(null, maxFailures);
}

/**
* Construct an instance with the provided recoverer which will be called after
* {@value SeekUtils#DEFAULT_MAX_FAILURES} (maxFailures) have occurred for a
Expand All @@ -68,7 +78,7 @@ public SeekToCurrentErrorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> rec
* Construct an instance with the provided recoverer which will be called after
* maxFailures have occurred for a topic/partition/offset.
* @param recoverer the recoverer; if null, the default (logging) recoverer is used.
* @param maxFailures the maxFailures.
* @param maxFailures the maxFailures; a negative value is treated as infinity.
* @since 2.2
*/
public SeekToCurrentErrorHandler(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, int maxFailures) {
Expand Down
Expand Up @@ -18,8 +18,12 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

Expand All @@ -29,6 +33,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -129,7 +134,9 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {

@Test
public void seekToCurrentErrorHandlerRecovers() {
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler((r, e) -> { }, 2);
@SuppressWarnings("unchecked")
BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = mock(BiConsumer.class);
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, 2);
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo"));
records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar"));
Expand All @@ -146,6 +153,30 @@ public void seekToCurrentErrorHandlerRecovers() {
eh.handle(new RuntimeException(), records, consumer, null);
verify(consumer).seek(new TopicPartition("foo", 0), 1L);
verifyNoMoreInteractions(consumer);
verify(recoverer).accept(eq(records.get(0)), any());
}

@Test
public void testNeverRecover() {
@SuppressWarnings("unchecked")
BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = mock(BiConsumer.class);
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, -1);
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo"));
records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar"));
Consumer<?, ?> consumer = mock(Consumer.class);
for (int i = 0; i < 20; i++) {
try {
eh.handle(new RuntimeException(), records, consumer, null);
fail("Expected exception");
}
catch (KafkaException e) {
// NOSONAR
}
}
verify(consumer, times(20)).seek(new TopicPartition("foo", 0), 0L);
verifyNoMoreInteractions(consumer);
verify(recoverer, never()).accept(any(), any());
}

}
2 changes: 2 additions & 0 deletions src/reference/asciidoc/kafka.adoc
Expand Up @@ -2436,6 +2436,7 @@ If the `AckMode` was `BATCH`, the container commits the offsets for the first 2
Starting with version 2.2, the `SeekToCurrentErrorHandler` can now recover (skip) a record that keeps failing.
By default, after 10 failures, the failed record will be logged (ERROR).
You can configure the handler with a custom recoverer (`BiConsumer`) and/or max failures.
Setting the `maxFailures` property to a negative number will cause infinite retries.

====
[source, java]
Expand Down Expand Up @@ -2482,6 +2483,7 @@ For example, with a record-based listener, you might want to keep track of the f
Starting with version 2.2, the `DefaultAfterRollbackProcessor` can now recover (skip) a record that keeps failing.
By default, after 10 failures, the failed record will be logged (ERROR).
You can configure the processor with a custom recoverer (`BiConsumer`) and/or max failures.
Setting the `maxFailures` property to a negative number will cause infinite retries.

====
[source, java]
Expand Down

0 comments on commit 0a953b0

Please sign in to comment.