Skip to content

Commit

Permalink
GH-1259: Failed recovery with no retry
Browse files Browse the repository at this point in the history
Resolves #1259

When a record fails with an exception that is not classified for retry,
handle a failed recovery and re-seek the failed record.

`FailedRecordTracker.skip()` is not used for unclassified exceptions,
it's recoverer is called directly.
  • Loading branch information
garyrussell authored and artembilan committed Oct 3, 2019
1 parent 9abc873 commit 0ae9dcc
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class SeekToCurrentErrorHandler implements ContainerAwareErrorHandler {

private static final BiPredicate<ConsumerRecord<?, ?>, Exception> ALWAYS_SKIP_PREDICATE = (r, e) -> true;

private static final BiPredicate<ConsumerRecord<?, ?>, Exception> NEVER_SKIP_PREDICATE = (r, e) -> false;

protected static final LogAccessor LOGGER =
new LogAccessor(LogFactory.getLog(SeekToCurrentErrorHandler.class)); // NOSONAR visibility

Expand Down Expand Up @@ -289,7 +291,13 @@ public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records
return this.failureTracker::skip;
}
else {
this.failureTracker.getRecoverer().accept(records.get(0), thrownException);
try {
this.failureTracker.getRecoverer().accept(records.get(0), thrownException);
}
catch (Exception ex) {
LOGGER.error(ex, () -> "Recovery of record (" + records.get(0) + ") failed");
return NEVER_SKIP_PREDICATE;
}
return ALWAYS_SKIP_PREDICATE;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?>
skipped.set(test);
}
catch (Exception ex) {
logger.error(ex, "Failed to determine if this record should be recovererd, including in seeks");
logger.error(ex, "Failed to determine if this record (" + record
+ ") should be recovererd, including in seeks");
skipped.set(false);
}
if (skipped.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.clients.consumer.Consumer;
Expand All @@ -45,9 +46,15 @@ public class SeekToCurrentErrorHandlerTests {
@Test
public void testClassifier() {
AtomicReference<ConsumerRecord<?, ?>> recovered = new AtomicReference<>();
SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler((r, t) -> recovered.set(r));
AtomicBoolean recovererShouldFail = new AtomicBoolean(false);
SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler((r, t) -> {
if (recovererShouldFail.getAndSet(false)) {
throw new RuntimeException("test recoverer failure");
}
recovered.set(r);
});
ConsumerRecord<String, String> record1 = new ConsumerRecord<>("foo", 0, 0L, "foo", "bar");
ConsumerRecord<String, String> record2 = new ConsumerRecord<>("foo", 0, 1L, "foo", "bar");
ConsumerRecord<String, String> record2 = new ConsumerRecord<>("foo", 1, 1L, "foo", "bar");
List<ConsumerRecord<?, ?>> records = Arrays.asList(record1, record2);
IllegalStateException illegalState = new IllegalStateException();
Consumer<?, ?> consumer = mock(Consumer.class);
Expand All @@ -59,11 +66,16 @@ consumer, mock(MessageListenerContainer.class)))
assertThat(recovered.get()).isSameAs(record1);
handler.addNotRetryableException(IllegalStateException.class);
recovered.set(null);
recovererShouldFail.set(true);
assertThatExceptionOfType(RuntimeException.class).isThrownBy(() ->
handler.handle(illegalState, records, consumer, mock(MessageListenerContainer.class)));
handler.handle(illegalState, records, consumer, mock(MessageListenerContainer.class));
assertThat(recovered.get()).isSameAs(record1);
InOrder inOrder = inOrder(consumer);
inOrder.verify(consumer).seek(new TopicPartition("foo", 0), 0L); // not recovered so seek
inOrder.verify(consumer, times(2)).seek(new TopicPartition("foo", 0), 1L); // 2x recovered seek next
inOrder.verify(consumer, times(2)).seek(new TopicPartition("foo", 1), 1L);
inOrder.verify(consumer).seek(new TopicPartition("foo", 0), 0L); // recovery failed
inOrder.verify(consumer, times(2)).seek(new TopicPartition("foo", 1), 1L);
inOrder.verifyNoMoreInteractions();
}

Expand Down

0 comments on commit 0ae9dcc

Please sign in to comment.