Skip to content

Commit

Permalink
Log KafkaBackoffException at DEBUG Level
Browse files Browse the repository at this point in the history
Resolves #2009

The expected KafkaBackoffException's message is being logged at WARN level.

Make KafkaBackoffException be logged in DEBUG level

Remove coupling between KafkaMessageListenerContainer and KafkaBackoffException

Document KafkaBackOffException Log Level Change

Doc polishing.
  • Loading branch information
tomazfernandes authored and garyrussell committed Jan 13, 2022
1 parent 65a7fd2 commit 95fc922
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 8 deletions.
22 changes: 22 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -702,3 +702,25 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo>
}
----
====

[[change-kboe-logging-level]]
==== Changing KafkaBackOffException Logging Level

When a message in the retry topic is not due for consumption, a `KafkaBackOffException` is thrown. Such exceptions are logged by default at `DEBUG` level, but you can change this behavior by setting an error handler customizer in the `ListenerContainerFactoryConfigurer` in a `@Configuration` class.

For example, to change the logging level to WARN you might add:

====
[source, java]
----
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer listenerContainer(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer configurer = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
configurer.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler).setLogLevel(KafkaException.Level.WARN));
return configurer;
}
----
====
7 changes: 7 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,10 @@ See <<delegating-serialization>> for more information.
The property `stripPreviousExceptionHeaders` is now `true` by default.

See <<dlpr-headers>> for more information.

[[x28-kafka-back-off-exception-log-level]]
==== KafkaBackOffException Log Level Changes

The KafkaBackOffException thrown when using the retryable topics feature is now logged at DEBUG level.

See <<change-kboe-logging-level>> if you need to change the logging level back to WARN or set it to any other level.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -2495,12 +2495,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> recor
commitOffsetsIfNeeded(record);
}
catch (KafkaException ke) {
if (ke.contains(KafkaBackoffException.class)) {
this.logger.warn(ke.getMessage());
}
else {
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
}
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
return ke;
}
catch (RuntimeException ee) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.listener.CommonErrorHandler;
Expand Down Expand Up @@ -144,6 +145,7 @@ private CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer dead
DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer,
new FixedBackOff(0, 0));
errorHandler.setCommitRecovered(true);
errorHandler.setLogLevel(KafkaException.Level.DEBUG);
this.errorHandlerCustomizer.accept(errorHandler);
return errorHandler;
}
Expand Down

0 comments on commit 95fc922

Please sign in to comment.