Skip to content

Commit

Permalink
GH-748: Handle Kotlin/Lombok Message Listeners
Browse files Browse the repository at this point in the history
See #748 (comment) (issue not resolved by this)

Kotlin and Lombok (`@SneakyThrows`) can throw checked exceptions even though
the method signatures do not allow it. Such exceptions were not passed to the
error handlers until the failed record is no longer in scope. They were called
as if the consumer itself threw an exception.

Always wrap listener exceptions in `ListenerExecutionFailedException`s.

Also remove Java9 constructs in `@Deprecation`s.

**cherry-pick to 2.6.x, 2.5.x**

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplate.java
#	spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
#	spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java
  • Loading branch information
garyrussell authored and artembilan committed Apr 27, 2021
1 parent da2eab6 commit 51ea8e3
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,17 @@ public <T> Flux<SenderResult<T>> send(Publisher<? extends SenderRecord<K, V, T>>
return this.sender.send(records);
}

public Mono<Void> flush() {
/**
* Flush the producer.
* @return {@link Mono#empty()}.
* @deprecated - flush does not make sense in the context of a reactive flow since,
* the send completion signal is a send result, which implies that a flush is
* redundant. If you use this method with reactor-kafka 1.3 or later, it must be
* scheduled to avoid a deadlock; see
* https://issues.apache.org/jira/browse/KAFKA-10790 (since 2.7).
*/
@Deprecated
public Mono<?> flush() {
return doOnProducer(producer -> {
producer.flush();
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1609,10 +1609,9 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
}

private void batchAfterRollback(final ConsumerRecords<K, V> records,
final List<ConsumerRecord<K, V>> recordList, RuntimeException e,
@Nullable final List<ConsumerRecord<K, V>> recordList, RuntimeException rollbackException,
AfterRollbackProcessor<K, V> afterRollbackProcessorToUse) {

RuntimeException rollbackException = decorateException(e);
try {
if (recordList == null) {
afterRollbackProcessorToUse.process(createRecordList(records), this.consumer,
Expand Down Expand Up @@ -1776,32 +1775,37 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
private void doInvokeBatchOnMessage(final ConsumerRecords<K, V> records,
List<ConsumerRecord<K, V>> recordList) {

switch (this.listenerType) {
case ACKNOWLEDGING_CONSUMER_AWARE:
this.batchListener.onMessage(recordList,
this.isAnyManualAck
? new ConsumerBatchAcknowledgment(records)
: null, this.consumer);
break;
case ACKNOWLEDGING:
this.batchListener.onMessage(recordList,
this.isAnyManualAck
? new ConsumerBatchAcknowledgment(records)
: null);
break;
case CONSUMER_AWARE:
this.batchListener.onMessage(recordList, this.consumer);
break;
case SIMPLE:
this.batchListener.onMessage(recordList);
break;
try {
switch (this.listenerType) {
case ACKNOWLEDGING_CONSUMER_AWARE:
this.batchListener.onMessage(recordList,
this.isAnyManualAck
? new ConsumerBatchAcknowledgment(records)
: null, this.consumer);
break;
case ACKNOWLEDGING:
this.batchListener.onMessage(recordList,
this.isAnyManualAck
? new ConsumerBatchAcknowledgment(records)
: null);
break;
case CONSUMER_AWARE:
this.batchListener.onMessage(recordList, this.consumer);
break;
case SIMPLE:
this.batchListener.onMessage(recordList);
break;
}
}
catch (Exception ex) { // NOSONAR
throw decorateException(ex);
}
}

private void invokeBatchErrorHandler(final ConsumerRecords<K, V> records,
@Nullable List<ConsumerRecord<K, V>> list, RuntimeException e) {
@Nullable List<ConsumerRecord<K, V>> list, RuntimeException rte) {

this.batchErrorHandler.handle(decorateException(e), records, this.consumer,
this.batchErrorHandler.handle(rte, records, this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer,
() -> invokeBatchOnMessageWithRecordsOrList(records, list));
}
Expand Down Expand Up @@ -1860,9 +1864,9 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
}
break;
}
catch (RuntimeException e) {
this.logger.error(e, "Transaction rolled back");
recordAfterRollback(iterator, record, decorateException(e));
catch (RuntimeException ex) {
this.logger.error(ex, "Transaction rolled back");
recordAfterRollback(iterator, record, ex);
}
finally {
if (this.producerPerConsumerPartition) {
Expand Down Expand Up @@ -2064,31 +2068,36 @@ record = this.recordInterceptor.intercept(record);
+ ListenerUtils.recordToString(recordArg));
}
else {
switch (this.listenerType) {
case ACKNOWLEDGING_CONSUMER_AWARE:
this.listener.onMessage(record,
this.isAnyManualAck
? new ConsumerAcknowledgment(record)
: null, this.consumer);
break;
case CONSUMER_AWARE:
this.listener.onMessage(record, this.consumer);
break;
case ACKNOWLEDGING:
this.listener.onMessage(record,
this.isAnyManualAck
? new ConsumerAcknowledgment(record)
: null);
break;
case SIMPLE:
this.listener.onMessage(record);
break;
try {
switch (this.listenerType) {
case ACKNOWLEDGING_CONSUMER_AWARE:
this.listener.onMessage(record,
this.isAnyManualAck
? new ConsumerAcknowledgment(record)
: null, this.consumer);
break;
case CONSUMER_AWARE:
this.listener.onMessage(record, this.consumer);
break;
case ACKNOWLEDGING:
this.listener.onMessage(record,
this.isAnyManualAck
? new ConsumerAcknowledgment(record)
: null);
break;
case SIMPLE:
this.listener.onMessage(record);
break;
}
}
catch (Exception ex) { // NOSONAR
throw decorateException(ex);
}
}
}

private void invokeErrorHandler(final ConsumerRecord<K, V> record,
Iterator<ConsumerRecord<K, V>> iterator, RuntimeException e) {
Iterator<ConsumerRecord<K, V>> iterator, RuntimeException rte) {

if (this.errorHandler instanceof RemainingRecordsErrorHandler) {
if (this.producer == null) {
Expand All @@ -2099,30 +2108,33 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
while (iterator.hasNext()) {
records.add(iterator.next());
}
this.errorHandler.handle(decorateException(e), records, this.consumer,
this.errorHandler.handle(rte, records, this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer);
}
else {
this.errorHandler.handle(decorateException(e), record, this.consumer);
this.errorHandler.handle(rte, record, this.consumer);
}
}

private RuntimeException decorateException(RuntimeException e) {
RuntimeException toHandle = e;
private RuntimeException decorateException(Exception ex) {
Exception toHandle = ex;
if (toHandle instanceof ListenerExecutionFailedException) {
toHandle = new ListenerExecutionFailedException(toHandle.getMessage(), this.consumerGroupId,
toHandle.getCause());
}
else {
toHandle = new ListenerExecutionFailedException("Listener failed", this.consumerGroupId, toHandle);
}
return toHandle;
return (RuntimeException) toHandle;
}

public void checkDeser(final ConsumerRecord<K, V> record, String headerName) {
DeserializationException exception = ListenerUtils.getExceptionFromHeader(record, headerName, this.logger);
if (exception != null) {
throw exception;
/*
* Wrapping in a LEFE is not strictly correct, but required for backwards compatibility.
*/
throw decorateException(exception);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ public static String recordToString(ConsumerRecord<?, ?> record, boolean meta) {
* @param lastIntervals a thread local containing the previous {@link BackOff}
* interval for this thread.
* @since 2.3.12
* @deprecated since 2.7 in favor of
* {@link #unrecoverableBackOff(BackOff, ThreadLocal, ThreadLocal, MessageListenerContainer)}.
*/
@Deprecated
public static void unrecoverableBackOff(BackOff backOff, ThreadLocal<BackOffExecution> executions,
ThreadLocal<Long> lastIntervals) {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,10 +33,16 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory
import org.springframework.kafka.listener.BatchErrorHandler
import org.springframework.kafka.listener.BatchMessageListener
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer
import org.springframework.kafka.listener.ErrorHandler
import org.springframework.kafka.listener.MessageListener
import org.springframework.kafka.test.EmbeddedKafkaBroker
import org.springframework.kafka.test.context.EmbeddedKafka
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
import java.lang.Exception
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

Expand All @@ -48,7 +54,7 @@ import java.util.concurrent.TimeUnit

@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(topics = ["kotlinTestTopic"])
@EmbeddedKafka(topics = ["kotlinTestTopic1", "kotlinBatchTestTopic1", "kotlinTestTopic2", "kotlinBatchTestTopic2"])
class EnableKafkaKotlinTests {

@Autowired
Expand All @@ -59,26 +65,55 @@ class EnableKafkaKotlinTests {

@Test
fun `test listener`() {
this.template.send("kotlinTestTopic", "foo")
assertThat(this.config.latch.await(10, TimeUnit.SECONDS)).isTrue()
this.template.send("kotlinTestTopic1", "foo")
assertThat(this.config.latch1.await(10, TimeUnit.SECONDS)).isTrue()
assertThat(this.config.received).isEqualTo("foo")
}

@Test
fun `test checkedEx`() {
this.template.send("kotlinTestTopic2", "fail")
assertThat(this.config.latch2.await(10, TimeUnit.SECONDS)).isTrue()
assertThat(this.config.error).isTrue()
}

@Test
fun `test batch listener`() {
this.template.send("kotlinTestTopic", "foo")
assertThat(this.config.latch.await(10, TimeUnit.SECONDS)).isTrue()
this.template.send("kotlinBatchTestTopic1", "foo")
assertThat(this.config.batchLatch1.await(10, TimeUnit.SECONDS)).isTrue()
assertThat(this.config.batchReceived).isEqualTo("foo")
}

@Test
fun `test batch checkedEx`() {
this.template.send("kotlinBatchTestTopic2", "fail")
assertThat(this.config.batchLatch2.await(10, TimeUnit.SECONDS)).isTrue()
assertThat(this.config.batchError).isTrue()
}

@Configuration
@EnableKafka
class Config {

@Volatile
lateinit var received: String

@Volatile
lateinit var batchReceived: String

val latch = CountDownLatch(2)
@Volatile
var error: Boolean = false

@Volatile
var batchError: Boolean = false

val latch1 = CountDownLatch(1)

val latch2 = CountDownLatch(1)

val batchLatch1 = CountDownLatch(1)

val batchLatch2 = CountDownLatch(1)

@Value("\${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
private lateinit var brokerAddresses: String
Expand Down Expand Up @@ -108,33 +143,78 @@ class EnableKafkaKotlinTests {
return KafkaTemplate(kpf())
}

val eh = ErrorHandler { _, recs : ConsumerRecord<*, *>? ->
if (recs != null) {
this.error = true;
this.latch2.countDown()
}
}

@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory: ConcurrentKafkaListenerContainerFactory<String, String>
= ConcurrentKafkaListenerContainerFactory()
factory.consumerFactory = kcf()
factory.setErrorHandler(eh)
return factory
}

val beh = BatchErrorHandler { _, recs ->
if (!recs.isEmpty) {
this.batchError = true;
this.batchLatch2.countDown()
}
}

@Bean
fun kafkaBatchListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory: ConcurrentKafkaListenerContainerFactory<String, String>
= ConcurrentKafkaListenerContainerFactory()
factory.isBatchListener = true
factory.consumerFactory = kcf()
factory.setBatchErrorHandler(beh)
return factory
}

@KafkaListener(id = "kotlin", topics = ["kotlinTestTopic"], containerFactory = "kafkaListenerContainerFactory")
@KafkaListener(id = "kotlin", topics = ["kotlinTestTopic1"], containerFactory = "kafkaListenerContainerFactory")
fun listen(value: String) {
this.received = value
this.latch.countDown()
this.latch1.countDown()
}

@KafkaListener(id = "kotlin-batch", topics = ["kotlinTestTopic"], containerFactory = "kafkaBatchListenerContainerFactory")
@KafkaListener(id = "kotlin-batch", topics = ["kotlinBatchTestTopic1"], containerFactory = "kafkaBatchListenerContainerFactory")
fun batchListen(values: List<ConsumerRecord<String, String>>) {
this.batchReceived = values.first().value()
this.latch.countDown()
this.batchLatch1.countDown()
}

@Bean
fun checkedEx(kafkaListenerContainerFactory : ConcurrentKafkaListenerContainerFactory<String, String>) :
ConcurrentMessageListenerContainer<String, String> {

val container = kafkaListenerContainerFactory.createContainer("kotlinTestTopic2")
container.containerProperties.groupId = "checkedEx"
container.containerProperties.messageListener = MessageListener<String, String> {
if (it.value() == "fail") {
throw Exception("checked")
}
}
return container;
}

@Bean
fun batchCheckedEx(kafkaBatchListenerContainerFactory :
ConcurrentKafkaListenerContainerFactory<String, String>) :
ConcurrentMessageListenerContainer<String, String> {

val container = kafkaBatchListenerContainerFactory.createContainer("kotlinBatchTestTopic2")
container.containerProperties.groupId = "batchCheckedEx"
container.containerProperties.messageListener = BatchMessageListener<String, String> {
if (it.first().value() == "fail") {
throw Exception("checked")
}
}
return container;
}

}
Expand Down

0 comments on commit 51ea8e3

Please sign in to comment.