Skip to content

Commit

Permalink
GH-948: Decorate LEFE for batch listeners
Browse files Browse the repository at this point in the history
See #948

Also add the groupId for batch listeners, for consistency.
  • Loading branch information
garyrussell authored and artembilan committed Jan 29, 2019
1 parent 2907f9c commit e39be7e
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 18 deletions.
Expand Up @@ -1077,10 +1077,11 @@ private void invokeBatchErrorHandler(final ConsumerRecords<K, V> records,
@SuppressWarnings(RAW_TYPES) @Nullable Producer producer, RuntimeException e) {

if (this.batchErrorHandler instanceof ContainerAwareBatchErrorHandler) {
this.batchErrorHandler.handle(e, records, this.consumer, KafkaMessageListenerContainer.this.container);
this.batchErrorHandler.handle(decorateException(e), records, this.consumer,
KafkaMessageListenerContainer.this.container);
}
else {
this.batchErrorHandler.handle(e, records, this.consumer);
this.batchErrorHandler.handle(decorateException(e), records, this.consumer);
}
// if the handler handled the error (no exception), go ahead and commit
if (producer != null) {
Expand Down Expand Up @@ -1243,18 +1244,6 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
@SuppressWarnings(RAWTYPES) @Nullable Producer producer,
Iterator<ConsumerRecord<K, V>> iterator, RuntimeException e) {

Exception toHandle = e;
if (toHandle instanceof ListenerExecutionFailedException) {
toHandle = new ListenerExecutionFailedException(toHandle.getMessage(), this.consumerGroupId,
toHandle.getCause());
}
else {
/*
* TODO: in 2.3, wrap all exceptions (e.g. thrown by user implementations
* of MessageListener) in LEFE with groupId. @KafkaListeners always throw
* LEFE.
*/
}
if (this.errorHandler instanceof RemainingRecordsErrorHandler) {
if (producer == null) {
processCommits();
Expand All @@ -1264,17 +1253,33 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
while (iterator.hasNext()) {
records.add(iterator.next());
}
((RemainingRecordsErrorHandler) this.errorHandler).handle(toHandle, records, this.consumer,
((RemainingRecordsErrorHandler) this.errorHandler).handle(decorateException(e), records, this.consumer,
KafkaMessageListenerContainer.this.container);
}
else {
this.errorHandler.handle(toHandle, record, this.consumer);
this.errorHandler.handle(decorateException(e), record, this.consumer);
}
if (producer != null) {
ackCurrent(record, producer);
}
}

private Exception decorateException(RuntimeException e) {
Exception toHandle = e;
if (toHandle instanceof ListenerExecutionFailedException) {
toHandle = new ListenerExecutionFailedException(toHandle.getMessage(), this.consumerGroupId,
toHandle.getCause());
}
else {
/*
* TODO: in 2.3, wrap all exceptions (e.g. thrown by user implementations
* of MessageListener) in LEFE with groupId. @KafkaListeners always throw
* LEFE.
*/
}
return toHandle;
}

public void checkDeser(final ConsumerRecord<K, V> record, String headerName) {
Header header = record.headers().lastHeader(headerName);
if (header != null) {
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2018 the original author or authors.
* Copyright 2017-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -112,6 +112,8 @@ public void discardRemainingRecordsFromPollAndSeek() throws Exception {
offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).commitTransaction();
assertThat(this.config.ehException).isInstanceOf(ListenerExecutionFailedException.class);
assertThat(((ListenerExecutionFailedException) this.config.ehException).getGroupId()).isEqualTo(CONTAINER_ID);
}

@Configuration
Expand All @@ -126,6 +128,8 @@ public static class Config {

private final AtomicBoolean fail = new AtomicBoolean(true);

private volatile Exception ehException;

@KafkaListener(id = CONTAINER_ID, topics = "foo")
public void foo(List<String> in) {
this.deliveryLatch.countDown();
Expand Down Expand Up @@ -195,7 +199,17 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler() {

@Override
public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
MessageListenerContainer container) {

Config.this.ehException = thrownException;
super.handle(thrownException, data, consumer, container);
}

});
factory.setBatchListener(true);
factory.getContainerProperties().setTransactionManager(tm());
return factory;
Expand Down

0 comments on commit e39be7e

Please sign in to comment.