From 3c816dca107fed35addafa45e785328e8a1dc577 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 21 Aug 2019 10:32:51 -0400 Subject: [PATCH] Try/Catch around error recovery [Backport] - if a recoverer `BiConsumer` threw an exception, the seeks would not be performed - also add try/catch around after rollback processor calls **cherry pick to 2.1.x, 1.7.x** --- .../kafka/listener/FailedRecordTracker.java | 16 ++++- .../KafkaMessageListenerContainer.java | 20 ++++-- .../listener/TransactionalContainerTests.java | 68 ++++++++++++++++++- 3 files changed, 96 insertions(+), 8 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java index 9f9cb0c57b..5e00e31a77 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java @@ -41,6 +41,8 @@ class FailedRecordTracker { private final boolean noRetries; + private final Log logger; + FailedRecordTracker(@Nullable BiConsumer, Exception> recoverer, int maxFailures, Log logger) { if (recoverer == null) { this.recoverer = (r, t) -> logger.error("Max failures (" + maxFailures + ") reached for: " + r, t); @@ -50,11 +52,12 @@ class FailedRecordTracker { } this.maxFailures = maxFailures; this.noRetries = ValueRange.of(0, 1).isValidIntValue(maxFailures); + this.logger = logger; } boolean skip(ConsumerRecord record, Exception exception) { if (this.noRetries) { - this.recoverer.accept(record, exception); + recover(record, exception); return true; } FailedRecord failedRecord = this.failures.get(); @@ -63,7 +66,7 @@ boolean skip(ConsumerRecord record, Exception exception) { return false; } else if (this.maxFailures > 0 && failedRecord.incrementAndGet() >= this.maxFailures) { - this.recoverer.accept(record, exception); + recover(record, exception); return true; } else { @@ -71,6 +74,15 @@ else if (this.maxFailures > 0 && failedRecord.incrementAndGet() >= this.maxFailu } } + private void recover(ConsumerRecord record, Exception exception) { + try { + this.recoverer.accept(record, exception); + } + catch (Exception ex) { + this.logger.error("Recoverer threw exception", ex); + } + } + private boolean newFailure(ConsumerRecord record, FailedRecord failedRecord) { return !failedRecord.getTopic().equals(record.topic()) || failedRecord.getPartition() != record.partition() diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 576ef45164..9d796353d0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -1000,11 +1000,16 @@ private void batchAfterRollback(final ConsumerRecords records, final List> recordList, RuntimeException e, AfterRollbackProcessor afterRollbackProcessorToUse) { - if (recordList == null) { - afterRollbackProcessorToUse.process(createRecordList(records), this.consumer, e, false); + try { + if (recordList == null) { + afterRollbackProcessorToUse.process(createRecordList(records), this.consumer, e, false); + } + else { + afterRollbackProcessorToUse.process(recordList, this.consumer, e, false); + } } - else { - afterRollbackProcessorToUse.process(recordList, this.consumer, e, false); + catch (Exception ex) { + this.logger.error("AfterRollbackProcessor threw exception", ex); } } @@ -1189,7 +1194,12 @@ protected void doInTransactionWithoutResult(TransactionStatus status) { }); } else { - afterRollbackProcessorToUse.process(unprocessed, this.consumer, e, true); + try { + afterRollbackProcessorToUse.process(unprocessed, this.consumer, e, true); + } + catch (Exception ex) { + this.logger.error("AfterRollbackProcessor threw exception", ex); + } } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java index 1b47a23a6f..8cbfbae31e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java @@ -45,6 +45,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -110,9 +111,11 @@ public class TransactionalContainerTests { private static String topic3DLT = "txTopic3.DLT"; + private static String topic4 = "txTopic4"; + @ClassRule public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, topic1, topic2, topic3, - topic3DLT) + topic3DLT, topic4) .brokerProperty(KafkaConfig.TransactionsTopicReplicationFactorProp(), "1") .brokerProperty(KafkaConfig.TransactionsTopicMinISRProp(), "1"); @@ -593,6 +596,69 @@ public void accept(ConsumerRecord record, Exception exception) { logger.info("Stop testMaxAttempts"); } + @SuppressWarnings("unchecked") + @Test + public void testRollbackProcessorCrash() throws Exception { + logger.info("Start testRollbackNoRetries"); + Map props = KafkaTestUtils.consumerProps("testRollbackNoRetries", "false", embeddedKafka); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "group"); + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); + ContainerProperties containerProps = new ContainerProperties(topic4); + containerProps.setPollTimeout(10_000); + + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + senderProps.put(ProducerConfig.RETRIES_CONFIG, 1); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); + pf.setTransactionIdPrefix("noRetries."); + final KafkaTemplate template = new KafkaTemplate<>(pf); + final CountDownLatch latch = new CountDownLatch(1); + AtomicReference data = new AtomicReference<>(); + containerProps.setMessageListener((MessageListener) message -> { + data.set(message.value()); + if (message.offset() == 0) { + throw new RuntimeException("fail for no retry"); + } + latch.countDown(); + }); + + @SuppressWarnings({ "rawtypes" }) + KafkaTransactionManager tm = new KafkaTransactionManager(pf); + containerProps.setTransactionManager(tm); + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); + container.setBeanName("testRollbackNoRetries"); + BiConsumer, Exception> recoverer = (rec, ex) -> { + throw new RuntimeException("arbp fail"); + }; + DefaultAfterRollbackProcessor afterRollbackProcessor = + spy(new DefaultAfterRollbackProcessor<>(recoverer, 0)); + container.setAfterRollbackProcessor(afterRollbackProcessor); + final CountDownLatch stopLatch = new CountDownLatch(1); + container.setApplicationEventPublisher(e -> { + if (e instanceof ConsumerStoppedEvent) { + stopLatch.countDown(); + } + }); + container.start(); + + template.setDefaultTopic(topic4); + template.executeInTransaction(t -> { + RecordHeaders headers = new RecordHeaders(new RecordHeader[] { new RecordHeader("baz", "qux".getBytes()) }); + ProducerRecord record = new ProducerRecord<>(topic4, 0, 0, "foo", headers); + template.send(record); + template.sendDefault(0, 0, "bar"); + return null; + }); + assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); + assertThat(data.get()).isEqualTo("bar"); + container.stop(); + pf.destroy(); + assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue(); + logger.info("Stop testRollbackNoRetries"); + } + @SuppressWarnings("serial") public static class SomeOtherTransactionManager extends AbstractPlatformTransactionManager {