From 51ea8e380ab7e8b08775afcb5e9850324c3dba6a Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 27 Apr 2021 14:23:01 -0400 Subject: [PATCH] GH-748: Handle Kotlin/Lombok Message Listeners See https://github.com/spring-projects/spring-kafka/issues/748#issuecomment-827333267 (issue not resolved by this) Kotlin and Lombok (`@SneakyThrows`) can throw checked exceptions even though the method signatures do not allow it. Such exceptions were not passed to the error handlers until the failed record is no longer in scope. They were called as if the consumer itself threw an exception. Always wrap listener exceptions in `ListenerExecutionFailedException`s. Also remove Java9 constructs in `@Deprecation`s. **cherry-pick to 2.6.x, 2.5.x** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplate.java # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java # spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java --- .../ReactiveKafkaProducerTemplate.java | 12 +- .../KafkaMessageListenerContainer.java | 116 ++++++++++-------- .../kafka/listener/ListenerUtils.java | 3 + .../kafka/listener/EnableKafkaKotlinTests.kt | 102 +++++++++++++-- 4 files changed, 169 insertions(+), 64 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplate.java index 6c70bd06e2..4cd7b95565 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplate.java @@ -121,7 +121,17 @@ public Flux> send(Publisher> return this.sender.send(records); } - public Mono flush() { + /** + * Flush the producer. + * @return {@link Mono#empty()}. + * @deprecated - flush does not make sense in the context of a reactive flow since, + * the send completion signal is a send result, which implies that a flush is + * redundant. If you use this method with reactor-kafka 1.3 or later, it must be + * scheduled to avoid a deadlock; see + * https://issues.apache.org/jira/browse/KAFKA-10790 (since 2.7). + */ + @Deprecated + public Mono flush() { return doOnProducer(producer -> { producer.flush(); return null; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 499e63c6a3..229be0099b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -1609,10 +1609,9 @@ protected void doInTransactionWithoutResult(TransactionStatus status) { } private void batchAfterRollback(final ConsumerRecords records, - final List> recordList, RuntimeException e, + @Nullable final List> recordList, RuntimeException rollbackException, AfterRollbackProcessor afterRollbackProcessorToUse) { - RuntimeException rollbackException = decorateException(e); try { if (recordList == null) { afterRollbackProcessorToUse.process(createRecordList(records), this.consumer, @@ -1776,32 +1775,37 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords r private void doInvokeBatchOnMessage(final ConsumerRecords records, List> recordList) { - switch (this.listenerType) { - case ACKNOWLEDGING_CONSUMER_AWARE: - this.batchListener.onMessage(recordList, - this.isAnyManualAck - ? new ConsumerBatchAcknowledgment(records) - : null, this.consumer); - break; - case ACKNOWLEDGING: - this.batchListener.onMessage(recordList, - this.isAnyManualAck - ? new ConsumerBatchAcknowledgment(records) - : null); - break; - case CONSUMER_AWARE: - this.batchListener.onMessage(recordList, this.consumer); - break; - case SIMPLE: - this.batchListener.onMessage(recordList); - break; + try { + switch (this.listenerType) { + case ACKNOWLEDGING_CONSUMER_AWARE: + this.batchListener.onMessage(recordList, + this.isAnyManualAck + ? new ConsumerBatchAcknowledgment(records) + : null, this.consumer); + break; + case ACKNOWLEDGING: + this.batchListener.onMessage(recordList, + this.isAnyManualAck + ? new ConsumerBatchAcknowledgment(records) + : null); + break; + case CONSUMER_AWARE: + this.batchListener.onMessage(recordList, this.consumer); + break; + case SIMPLE: + this.batchListener.onMessage(recordList); + break; + } + } + catch (Exception ex) { // NOSONAR + throw decorateException(ex); } } private void invokeBatchErrorHandler(final ConsumerRecords records, - @Nullable List> list, RuntimeException e) { + @Nullable List> list, RuntimeException rte) { - this.batchErrorHandler.handle(decorateException(e), records, this.consumer, + this.batchErrorHandler.handle(rte, records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer, () -> invokeBatchOnMessageWithRecordsOrList(records, list)); } @@ -1860,9 +1864,9 @@ public void doInTransactionWithoutResult(TransactionStatus s) { } break; } - catch (RuntimeException e) { - this.logger.error(e, "Transaction rolled back"); - recordAfterRollback(iterator, record, decorateException(e)); + catch (RuntimeException ex) { + this.logger.error(ex, "Transaction rolled back"); + recordAfterRollback(iterator, record, ex); } finally { if (this.producerPerConsumerPartition) { @@ -2064,31 +2068,36 @@ record = this.recordInterceptor.intercept(record); + ListenerUtils.recordToString(recordArg)); } else { - switch (this.listenerType) { - case ACKNOWLEDGING_CONSUMER_AWARE: - this.listener.onMessage(record, - this.isAnyManualAck - ? new ConsumerAcknowledgment(record) - : null, this.consumer); - break; - case CONSUMER_AWARE: - this.listener.onMessage(record, this.consumer); - break; - case ACKNOWLEDGING: - this.listener.onMessage(record, - this.isAnyManualAck - ? new ConsumerAcknowledgment(record) - : null); - break; - case SIMPLE: - this.listener.onMessage(record); - break; + try { + switch (this.listenerType) { + case ACKNOWLEDGING_CONSUMER_AWARE: + this.listener.onMessage(record, + this.isAnyManualAck + ? new ConsumerAcknowledgment(record) + : null, this.consumer); + break; + case CONSUMER_AWARE: + this.listener.onMessage(record, this.consumer); + break; + case ACKNOWLEDGING: + this.listener.onMessage(record, + this.isAnyManualAck + ? new ConsumerAcknowledgment(record) + : null); + break; + case SIMPLE: + this.listener.onMessage(record); + break; + } + } + catch (Exception ex) { // NOSONAR + throw decorateException(ex); } } } private void invokeErrorHandler(final ConsumerRecord record, - Iterator> iterator, RuntimeException e) { + Iterator> iterator, RuntimeException rte) { if (this.errorHandler instanceof RemainingRecordsErrorHandler) { if (this.producer == null) { @@ -2099,16 +2108,16 @@ private void invokeErrorHandler(final ConsumerRecord record, while (iterator.hasNext()) { records.add(iterator.next()); } - this.errorHandler.handle(decorateException(e), records, this.consumer, + this.errorHandler.handle(rte, records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer); } else { - this.errorHandler.handle(decorateException(e), record, this.consumer); + this.errorHandler.handle(rte, record, this.consumer); } } - private RuntimeException decorateException(RuntimeException e) { - RuntimeException toHandle = e; + private RuntimeException decorateException(Exception ex) { + Exception toHandle = ex; if (toHandle instanceof ListenerExecutionFailedException) { toHandle = new ListenerExecutionFailedException(toHandle.getMessage(), this.consumerGroupId, toHandle.getCause()); @@ -2116,13 +2125,16 @@ private RuntimeException decorateException(RuntimeException e) { else { toHandle = new ListenerExecutionFailedException("Listener failed", this.consumerGroupId, toHandle); } - return toHandle; + return (RuntimeException) toHandle; } public void checkDeser(final ConsumerRecord record, String headerName) { DeserializationException exception = ListenerUtils.getExceptionFromHeader(record, headerName, this.logger); if (exception != null) { - throw exception; + /* + * Wrapping in a LEFE is not strictly correct, but required for backwards compatibility. + */ + throw decorateException(exception); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java index a2e1f191d0..cc31f6a5ee 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java @@ -153,7 +153,10 @@ public static String recordToString(ConsumerRecord record, boolean meta) { * @param lastIntervals a thread local containing the previous {@link BackOff} * interval for this thread. * @since 2.3.12 + * @deprecated since 2.7 in favor of + * {@link #unrecoverableBackOff(BackOff, ThreadLocal, ThreadLocal, MessageListenerContainer)}. */ + @Deprecated public static void unrecoverableBackOff(BackOff backOff, ThreadLocal executions, ThreadLocal lastIntervals) { diff --git a/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinTests.kt b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinTests.kt index b09104cc89..b41c0d8027 100644 --- a/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinTests.kt +++ b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinTests.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,10 +33,16 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.listener.BatchErrorHandler +import org.springframework.kafka.listener.BatchMessageListener +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer +import org.springframework.kafka.listener.ErrorHandler +import org.springframework.kafka.listener.MessageListener import org.springframework.kafka.test.EmbeddedKafkaBroker import org.springframework.kafka.test.context.EmbeddedKafka import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.junit.jupiter.SpringJUnitConfig +import java.lang.Exception import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit @@ -48,7 +54,7 @@ import java.util.concurrent.TimeUnit @SpringJUnitConfig @DirtiesContext -@EmbeddedKafka(topics = ["kotlinTestTopic"]) +@EmbeddedKafka(topics = ["kotlinTestTopic1", "kotlinBatchTestTopic1", "kotlinTestTopic2", "kotlinBatchTestTopic2"]) class EnableKafkaKotlinTests { @Autowired @@ -59,26 +65,55 @@ class EnableKafkaKotlinTests { @Test fun `test listener`() { - this.template.send("kotlinTestTopic", "foo") - assertThat(this.config.latch.await(10, TimeUnit.SECONDS)).isTrue() + this.template.send("kotlinTestTopic1", "foo") + assertThat(this.config.latch1.await(10, TimeUnit.SECONDS)).isTrue() assertThat(this.config.received).isEqualTo("foo") } + @Test + fun `test checkedEx`() { + this.template.send("kotlinTestTopic2", "fail") + assertThat(this.config.latch2.await(10, TimeUnit.SECONDS)).isTrue() + assertThat(this.config.error).isTrue() + } + @Test fun `test batch listener`() { - this.template.send("kotlinTestTopic", "foo") - assertThat(this.config.latch.await(10, TimeUnit.SECONDS)).isTrue() + this.template.send("kotlinBatchTestTopic1", "foo") + assertThat(this.config.batchLatch1.await(10, TimeUnit.SECONDS)).isTrue() assertThat(this.config.batchReceived).isEqualTo("foo") } + @Test + fun `test batch checkedEx`() { + this.template.send("kotlinBatchTestTopic2", "fail") + assertThat(this.config.batchLatch2.await(10, TimeUnit.SECONDS)).isTrue() + assertThat(this.config.batchError).isTrue() + } + @Configuration @EnableKafka class Config { + @Volatile lateinit var received: String + + @Volatile lateinit var batchReceived: String - val latch = CountDownLatch(2) + @Volatile + var error: Boolean = false + + @Volatile + var batchError: Boolean = false + + val latch1 = CountDownLatch(1) + + val latch2 = CountDownLatch(1) + + val batchLatch1 = CountDownLatch(1) + + val batchLatch2 = CountDownLatch(1) @Value("\${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}") private lateinit var brokerAddresses: String @@ -108,33 +143,78 @@ class EnableKafkaKotlinTests { return KafkaTemplate(kpf()) } + val eh = ErrorHandler { _, recs : ConsumerRecord<*, *>? -> + if (recs != null) { + this.error = true; + this.latch2.countDown() + } + } + @Bean fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory { val factory: ConcurrentKafkaListenerContainerFactory = ConcurrentKafkaListenerContainerFactory() factory.consumerFactory = kcf() + factory.setErrorHandler(eh) return factory } + val beh = BatchErrorHandler { _, recs -> + if (!recs.isEmpty) { + this.batchError = true; + this.batchLatch2.countDown() + } + } + @Bean fun kafkaBatchListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory { val factory: ConcurrentKafkaListenerContainerFactory = ConcurrentKafkaListenerContainerFactory() factory.isBatchListener = true factory.consumerFactory = kcf() + factory.setBatchErrorHandler(beh) return factory } - @KafkaListener(id = "kotlin", topics = ["kotlinTestTopic"], containerFactory = "kafkaListenerContainerFactory") + @KafkaListener(id = "kotlin", topics = ["kotlinTestTopic1"], containerFactory = "kafkaListenerContainerFactory") fun listen(value: String) { this.received = value - this.latch.countDown() + this.latch1.countDown() } - @KafkaListener(id = "kotlin-batch", topics = ["kotlinTestTopic"], containerFactory = "kafkaBatchListenerContainerFactory") + @KafkaListener(id = "kotlin-batch", topics = ["kotlinBatchTestTopic1"], containerFactory = "kafkaBatchListenerContainerFactory") fun batchListen(values: List>) { this.batchReceived = values.first().value() - this.latch.countDown() + this.batchLatch1.countDown() + } + + @Bean + fun checkedEx(kafkaListenerContainerFactory : ConcurrentKafkaListenerContainerFactory) : + ConcurrentMessageListenerContainer { + + val container = kafkaListenerContainerFactory.createContainer("kotlinTestTopic2") + container.containerProperties.groupId = "checkedEx" + container.containerProperties.messageListener = MessageListener { + if (it.value() == "fail") { + throw Exception("checked") + } + } + return container; + } + + @Bean + fun batchCheckedEx(kafkaBatchListenerContainerFactory : + ConcurrentKafkaListenerContainerFactory) : + ConcurrentMessageListenerContainer { + + val container = kafkaBatchListenerContainerFactory.createContainer("kotlinBatchTestTopic2") + container.containerProperties.groupId = "batchCheckedEx" + container.containerProperties.messageListener = BatchMessageListener { + if (it.first().value() == "fail") { + throw Exception("checked") + } + } + return container; } }