Skip to content

Commit

Permalink
Try/Catch around error recovery [Backport]
Browse files Browse the repository at this point in the history
- if a recoverer `BiConsumer` threw an exception, the seeks would not be performed
- also add try/catch around after rollback processor calls

**cherry pick to 2.1.x, 1.7.x**
  • Loading branch information
garyrussell authored and artembilan committed Aug 21, 2019
1 parent fd2166e commit 3c816dc
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 8 deletions.
Expand Up @@ -41,6 +41,8 @@ class FailedRecordTracker {

private final boolean noRetries;

private final Log logger;

FailedRecordTracker(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, int maxFailures, Log logger) {
if (recoverer == null) {
this.recoverer = (r, t) -> logger.error("Max failures (" + maxFailures + ") reached for: " + r, t);
Expand All @@ -50,11 +52,12 @@ class FailedRecordTracker {
}
this.maxFailures = maxFailures;
this.noRetries = ValueRange.of(0, 1).isValidIntValue(maxFailures);
this.logger = logger;
}

boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
if (this.noRetries) {
this.recoverer.accept(record, exception);
recover(record, exception);
return true;
}
FailedRecord failedRecord = this.failures.get();
Expand All @@ -63,14 +66,23 @@ boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
return false;
}
else if (this.maxFailures > 0 && failedRecord.incrementAndGet() >= this.maxFailures) {
this.recoverer.accept(record, exception);
recover(record, exception);
return true;
}
else {
return false;
}
}

private void recover(ConsumerRecord<?, ?> record, Exception exception) {
try {
this.recoverer.accept(record, exception);
}
catch (Exception ex) {
this.logger.error("Recoverer threw exception", ex);
}
}

private boolean newFailure(ConsumerRecord<?, ?> record, FailedRecord failedRecord) {
return !failedRecord.getTopic().equals(record.topic())
|| failedRecord.getPartition() != record.partition()
Expand Down
Expand Up @@ -1000,11 +1000,16 @@ private void batchAfterRollback(final ConsumerRecords<K, V> records,
final List<ConsumerRecord<K, V>> recordList, RuntimeException e,
AfterRollbackProcessor<K, V> afterRollbackProcessorToUse) {

if (recordList == null) {
afterRollbackProcessorToUse.process(createRecordList(records), this.consumer, e, false);
try {
if (recordList == null) {
afterRollbackProcessorToUse.process(createRecordList(records), this.consumer, e, false);
}
else {
afterRollbackProcessorToUse.process(recordList, this.consumer, e, false);
}
}
else {
afterRollbackProcessorToUse.process(recordList, this.consumer, e, false);
catch (Exception ex) {
this.logger.error("AfterRollbackProcessor threw exception", ex);
}
}

Expand Down Expand Up @@ -1189,7 +1194,12 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
});
}
else {
afterRollbackProcessorToUse.process(unprocessed, this.consumer, e, true);
try {
afterRollbackProcessorToUse.process(unprocessed, this.consumer, e, true);
}
catch (Exception ex) {
this.logger.error("AfterRollbackProcessor threw exception", ex);
}
}
}

Expand Down
Expand Up @@ -45,6 +45,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -110,9 +111,11 @@ public class TransactionalContainerTests {

private static String topic3DLT = "txTopic3.DLT";

private static String topic4 = "txTopic4";

@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, topic1, topic2, topic3,
topic3DLT)
topic3DLT, topic4)
.brokerProperty(KafkaConfig.TransactionsTopicReplicationFactorProp(), "1")
.brokerProperty(KafkaConfig.TransactionsTopicMinISRProp(), "1");

Expand Down Expand Up @@ -593,6 +596,69 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
logger.info("Stop testMaxAttempts");
}

@SuppressWarnings("unchecked")
@Test
public void testRollbackProcessorCrash() throws Exception {
logger.info("Start testRollbackNoRetries");
Map<String, Object> props = KafkaTestUtils.consumerProps("testRollbackNoRetries", "false", embeddedKafka);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic4);
containerProps.setPollTimeout(10_000);

Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
senderProps.put(ProducerConfig.RETRIES_CONFIG, 1);
DefaultKafkaProducerFactory<Object, Object> pf = new DefaultKafkaProducerFactory<>(senderProps);
pf.setTransactionIdPrefix("noRetries.");
final KafkaTemplate<Object, Object> template = new KafkaTemplate<>(pf);
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> data = new AtomicReference<>();
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
data.set(message.value());
if (message.offset() == 0) {
throw new RuntimeException("fail for no retry");
}
latch.countDown();
});

@SuppressWarnings({ "rawtypes" })
KafkaTransactionManager tm = new KafkaTransactionManager(pf);
containerProps.setTransactionManager(tm);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.setBeanName("testRollbackNoRetries");
BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = (rec, ex) -> {
throw new RuntimeException("arbp fail");
};
DefaultAfterRollbackProcessor<Object, Object> afterRollbackProcessor =
spy(new DefaultAfterRollbackProcessor<>(recoverer, 0));
container.setAfterRollbackProcessor(afterRollbackProcessor);
final CountDownLatch stopLatch = new CountDownLatch(1);
container.setApplicationEventPublisher(e -> {
if (e instanceof ConsumerStoppedEvent) {
stopLatch.countDown();
}
});
container.start();

template.setDefaultTopic(topic4);
template.executeInTransaction(t -> {
RecordHeaders headers = new RecordHeaders(new RecordHeader[] { new RecordHeader("baz", "qux".getBytes()) });
ProducerRecord<Object, Object> record = new ProducerRecord<>(topic4, 0, 0, "foo", headers);
template.send(record);
template.sendDefault(0, 0, "bar");
return null;
});
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(data.get()).isEqualTo("bar");
container.stop();
pf.destroy();
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
logger.info("Stop testRollbackNoRetries");
}

@SuppressWarnings("serial")
public static class SomeOtherTransactionManager extends AbstractPlatformTransactionManager {

Expand Down

0 comments on commit 3c816dc

Please sign in to comment.