Skip to content

Commit

Permalink
GH-2711: Non-Blocking Retries and Custom Publisher
Browse files Browse the repository at this point in the history
Resolves #2711

Previously, it was not easy to provide custom DLPRs, for example to override
`createProducerRecord`.

* Fix Javadocs.
  • Loading branch information
garyrussell committed Jun 21, 2023
1 parent 1eaaa27 commit cd7de80
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 50 deletions.
23 changes: 23 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Expand Up @@ -613,6 +613,29 @@ Starting with version 2.8.4, if you wish to add custom headers (in addition to t
By default, any headers added will be cumulative - Kafka headers can contain multiple values.
Starting with version 2.9.5, if the `Headers` returned by the function contains a header of type `DeadLetterPublishingRecoverer.SingleRecordHeader`, then any existing values for that header will be removed and only the new single value will remain.

[[custom-dlpr]]
===== Custom DeadLetterPublishingRecoverer

As can be seen in <<retry-headers>> it is possible to customize the default `DeadLetterPublishingRecoverer` instances created by the framework.
However, for some use cases, it is necessary to subclass the `DeadLetterPublishingRecoverer`, for example to override `createProducerRecord()` to modify the contents sent to the retry (or dead-letter) topics.
Starting with version 3.0.9, you can override the `RetryConfigurationSupport.configureDeadLetterPublishingContainerFactory()` method to provide a `DeadLetterPublisherCreator` instance, for example:

====
[source, java]
----
@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
configureDeadLetterPublishingContainerFactory() {
return (factory) -> factory.setDeadLetterPublisherCreator(
(templateResolver, destinationResolver) ->
new CustomDLPR(templateResolver, destinationResolver));
}
----
====

It is recommended that you use the provided resolvers when constructing the custom instance.

[[retry-topic-combine-blocking]]
==== Combining Blocking and Non-Blocking Retries

Expand Down
Expand Up @@ -81,8 +81,6 @@ public class DeadLetterPublishingRecoverer extends ExceptionClassifier implement

private static final long THIRTY = 30L;

private final HeaderNames headerNames = getHeaderNames();

private final boolean transactional;

private final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver;
Expand All @@ -91,6 +89,8 @@ public class DeadLetterPublishingRecoverer extends ExceptionClassifier implement

private final EnumSet<HeaderNames.HeadersToAdd> whichHeaders = EnumSet.allOf(HeaderNames.HeadersToAdd.class);

private HeaderNames headerNames = getHeaderNames();

private boolean retainExceptionHeader;

private BiFunction<ConsumerRecord<?, ?>, Exception, Headers> headersFunction = DEFAULT_HEADERS_FUNCTION;
Expand All @@ -115,6 +115,24 @@ public class DeadLetterPublishingRecoverer extends ExceptionClassifier implement

private ExceptionHeadersCreator exceptionHeadersCreator = this::addExceptionInfoHeaders;

private Supplier<HeaderNames> headerNamesSupplier = () -> HeaderNames.Builder
.original()
.offsetHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)
.timestampHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)
.timestampTypeHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)
.topicHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)
.partitionHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)
.consumerGroupHeader(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP)
.exception()
.keyExceptionFqcn(KafkaHeaders.DLT_KEY_EXCEPTION_FQCN)
.exceptionFqcn(KafkaHeaders.DLT_EXCEPTION_FQCN)
.exceptionCauseFqcn(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN)
.keyExceptionMessage(KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE)
.exceptionMessage(KafkaHeaders.DLT_EXCEPTION_MESSAGE)
.keyExceptionStacktrace(KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE)
.exceptionStacktrace(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)
.build();

/**
* Create an instance with the provided template and a default destination resolving
* function that returns a TopicPartition based on the original topic (appended with ".DLT")
Expand Down Expand Up @@ -188,6 +206,23 @@ public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Obj
this.destinationResolver = destinationResolver;
}

/**
* Create an instance with a template resolving function that receives the failed
* consumer record and the exception and returns a {@link KafkaOperations} and a
* flag on whether or not the publishing from this instance will be transactional
* or not. Also receives a destination resolving function that works similarly but
* returns a {@link TopicPartition} instead. If the partition in the {@link TopicPartition}
* is less than 0, no partition is set when publishing to the topic.
*
* @param templateResolver the function that resolver the {@link KafkaOperations} to use for publishing.
* @param destinationResolver the resolving function.
* @since 3.0.9
*/
public DeadLetterPublishingRecoverer(Function<ProducerRecord<?, ?>, KafkaOperations<?, ?>> templateResolver,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
this(templateResolver, false, destinationResolver);
}

/**
* Create an instance with a template resolving function that receives the failed
* consumer record and the exception and returns a {@link KafkaOperations} and a
Expand Down Expand Up @@ -487,6 +522,9 @@ public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consume
private void addAndEnhanceHeaders(ConsumerRecord<?, ?> record, Exception exception,
@Nullable DeserializationException vDeserEx, @Nullable DeserializationException kDeserEx, Headers headers) {

if (this.headerNames == null) {
this.headerNames = this.headerNamesSupplier.get();
}
if (kDeserEx != null) {
if (!this.retainExceptionHeader) {
headers.remove(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
Expand Down Expand Up @@ -825,25 +863,24 @@ private String getStackTraceAsString(Throwable cause) {
* in the sent record.
* @return the header names.
* @since 2.7
* @deprecated since 3.0.9 - provide a supplier instead.
* @see #setHeaderNamesSupplier(Supplier)
*/
@Nullable
@Deprecated(since = "3.0.9", forRemoval = true)
protected HeaderNames getHeaderNames() {
return HeaderNames.Builder
.original()
.offsetHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)
.timestampHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)
.timestampTypeHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)
.topicHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)
.partitionHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)
.consumerGroupHeader(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP)
.exception()
.keyExceptionFqcn(KafkaHeaders.DLT_KEY_EXCEPTION_FQCN)
.exceptionFqcn(KafkaHeaders.DLT_EXCEPTION_FQCN)
.exceptionCauseFqcn(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN)
.keyExceptionMessage(KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE)
.exceptionMessage(KafkaHeaders.DLT_EXCEPTION_MESSAGE)
.keyExceptionStacktrace(KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE)
.exceptionStacktrace(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)
.build();
return null;
}

/**
* Set a {@link Supplier} for {@link HeaderNames}.
* @param supplier the supplier.
* @since3.0.7
*
*/
public void setHeaderNamesSupplier(Supplier<HeaderNames> supplier) {
Assert.notNull(supplier, "'HeaderNames supplier cannot be null");
this.headerNamesSupplier = supplier;
}

/**
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -38,6 +39,7 @@
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer.HeaderNames;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer.SingleRecordHeader;
import org.springframework.kafka.listener.SeekUtils;
import org.springframework.kafka.listener.TimestampedException;
Expand Down Expand Up @@ -76,6 +78,8 @@ public class DeadLetterPublishingRecovererFactory {

private boolean retainAllRetryHeaderValues = true;

private DeadLetterPublisherCreator dlpCreator = this::create;

public DeadLetterPublishingRecovererFactory(DestinationTopicResolver destinationTopicResolver) {
this.destinationTopicResolver = destinationTopicResolver;
}
Expand Down Expand Up @@ -113,6 +117,28 @@ public void setRetainAllRetryHeaderValues(boolean retainAllRetryHeaderValues) {
this.retainAllRetryHeaderValues = retainAllRetryHeaderValues;
}

/**
* Provide a {@link DeadLetterPublisherCreator}; used to create a subclass of the
* {@link DeadLetterPublishingRecoverer}, instead of the default, for example, to
* modify the published records.
* @param creator the creator,
* @since 3.0.9.
*/
public void setDeadLetterPublisherCreator(DeadLetterPublisherCreator creator) {
Assert.notNull(creator, "'creator' cannot be null");
this.dlpCreator = creator;
}

/**
* Set a customizer to customize the default {@link DeadLetterPublishingRecoverer}.
* @param customizer the customizer.
* @see #setDeadLetterPublisherCreator(DeadLetterPublisherCreator)
*/
public void setDeadLetterPublishingRecovererCustomizer(Consumer<DeadLetterPublishingRecoverer> customizer) {
Assert.notNull(customizer, "'customizer' cannot be null");
this.recovererCustomizer = customizer;
}

/**
* Add exception type to the default list. By default, the following exceptions will
* not be retried:
Expand Down Expand Up @@ -175,31 +201,26 @@ public void alwaysLogListenerException() {
@SuppressWarnings("unchecked")
public DeadLetterPublishingRecoverer create(String mainListenerId) {
Assert.notNull(mainListenerId, "'listenerId' cannot be null");
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(// NOSONAR anon. class size
templateResolver(mainListenerId), false, destinationResolver(mainListenerId)) {

@Override
protected DeadLetterPublishingRecoverer.HeaderNames getHeaderNames() {
return DeadLetterPublishingRecoverer.HeaderNames.Builder
.original()
.offsetHeader(KafkaHeaders.ORIGINAL_OFFSET)
.timestampHeader(KafkaHeaders.ORIGINAL_TIMESTAMP)
.timestampTypeHeader(KafkaHeaders.ORIGINAL_TIMESTAMP_TYPE)
.topicHeader(KafkaHeaders.ORIGINAL_TOPIC)
.partitionHeader(KafkaHeaders.ORIGINAL_PARTITION)
.consumerGroupHeader(KafkaHeaders.ORIGINAL_CONSUMER_GROUP)
.exception()
.keyExceptionFqcn(KafkaHeaders.KEY_EXCEPTION_FQCN)
.exceptionFqcn(KafkaHeaders.EXCEPTION_FQCN)
.exceptionCauseFqcn(KafkaHeaders.EXCEPTION_CAUSE_FQCN)
.keyExceptionMessage(KafkaHeaders.KEY_EXCEPTION_MESSAGE)
.exceptionMessage(KafkaHeaders.EXCEPTION_MESSAGE)
.keyExceptionStacktrace(KafkaHeaders.KEY_EXCEPTION_STACKTRACE)
.exceptionStacktrace(KafkaHeaders.EXCEPTION_STACKTRACE)
.build();
}
};

Supplier<HeaderNames> headerNamesSupplier = () -> HeaderNames.Builder
.original()
.offsetHeader(KafkaHeaders.ORIGINAL_OFFSET)
.timestampHeader(KafkaHeaders.ORIGINAL_TIMESTAMP)
.timestampTypeHeader(KafkaHeaders.ORIGINAL_TIMESTAMP_TYPE)
.topicHeader(KafkaHeaders.ORIGINAL_TOPIC)
.partitionHeader(KafkaHeaders.ORIGINAL_PARTITION)
.consumerGroupHeader(KafkaHeaders.ORIGINAL_CONSUMER_GROUP)
.exception()
.keyExceptionFqcn(KafkaHeaders.KEY_EXCEPTION_FQCN)
.exceptionFqcn(KafkaHeaders.EXCEPTION_FQCN)
.exceptionCauseFqcn(KafkaHeaders.EXCEPTION_CAUSE_FQCN)
.keyExceptionMessage(KafkaHeaders.KEY_EXCEPTION_MESSAGE)
.exceptionMessage(KafkaHeaders.EXCEPTION_MESSAGE)
.keyExceptionStacktrace(KafkaHeaders.KEY_EXCEPTION_STACKTRACE)
.exceptionStacktrace(KafkaHeaders.EXCEPTION_STACKTRACE)
.build();
DeadLetterPublishingRecoverer recoverer = this.dlpCreator.create(templateResolver(mainListenerId),
destinationResolver(mainListenerId));
recoverer.setHeaderNamesSupplier(headerNamesSupplier);
recoverer.setHeadersFunction(
(consumerRecord, e) -> addHeaders(mainListenerId, consumerRecord, e, getAttempts(consumerRecord)));
if (this.headersFunction != null) {
Expand All @@ -215,16 +236,19 @@ protected DeadLetterPublishingRecoverer.HeaderNames getHeaderNames() {
return recoverer;
}

private DeadLetterPublishingRecoverer create(
Function<ProducerRecord<?, ?>, KafkaOperations<?, ?>> templateResolver,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {

return new DeadLetterPublishingRecoverer(templateResolver, destinationResolver);
}

private Function<ProducerRecord<?, ?>, KafkaOperations<?, ?>> templateResolver(String mainListenerId) {
return outRecord -> this.destinationTopicResolver
.getDestinationTopicByName(mainListenerId, outRecord.topic())
.getKafkaOperations();
}

public void setDeadLetterPublishingRecovererCustomizer(Consumer<DeadLetterPublishingRecoverer> customizer) {
this.recovererCustomizer = customizer;
}

private BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver(String mainListenerId) {
return (cr, ex) -> {
if (SeekUtils.isBackoffException(ex)) {
Expand Down Expand Up @@ -412,4 +436,24 @@ private enum ListenerExceptionLoggingStrategy {
AFTER_RETRIES_EXHAUSTED

}

/**
* Implement this interface to create each {@link DeadLetterPublishingRecoverer}.
*
* @since 3.0.9
*/
@FunctionalInterface
public interface DeadLetterPublisherCreator {

/**
* Create a {@link DeadLetterPublishingRecoverer} using the supplied properties.
* @param templateResolver the template resolver.
* @param destinationResolver the destination resolver.
* @return the publisher.
*/
DeadLetterPublishingRecoverer create(Function<ProducerRecord<?, ?>, KafkaOperations<?, ?>> templateResolver,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver);

}

}
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 the original author or authors.
* Copyright 2021-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,10 +24,15 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -37,12 +42,15 @@
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
Expand All @@ -66,7 +74,8 @@ class RetryTopicConfigurationIntegrationTests {
@Test
void includeTopic(@Autowired EmbeddedKafkaBroker broker, @Autowired ConsumerFactory<Integer, String> cf,
@Autowired KafkaTemplate<Integer, String> template, @Autowired Config config,
@Autowired RetryTopicComponentFactory componentFactory) throws InterruptedException {
@Autowired RetryTopicComponentFactory componentFactory, @Autowired KafkaListenerEndpointRegistry registry)
throws InterruptedException {

Consumer<Integer, String> consumer = cf.createConsumer("grp2", "");
Map<String, List<PartitionInfo>> topics = consumer.listTopics();
Expand All @@ -76,6 +85,11 @@ void includeTopic(@Autowired EmbeddedKafkaBroker broker, @Autowired ConsumerFact
template.send(TOPIC1, "foo");
assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue();
verify(componentFactory).destinationTopicResolver();
assertThat(registry.getListenerContainer(TOPIC1))
.extracting("commonErrorHandler")
.extracting("failureTracker")
.extracting("recoverer")
.isInstanceOf(CustomDLPR.class);
}

@Configuration(proxyBeanMethods = false)
Expand Down Expand Up @@ -139,11 +153,29 @@ RetryTopicConfiguration retryTopicConfiguration1(KafkaTemplate<Integer, String>
.create(template);
}

@Override
protected java.util.function.Consumer<DeadLetterPublishingRecovererFactory>
configureDeadLetterPublishingContainerFactory() {

return (factory) -> factory.setDeadLetterPublisherCreator(
(templateResolver, destinationResolver) ->
new CustomDLPR(templateResolver, destinationResolver));
}

@Bean
TaskScheduler sched() {
return new ThreadPoolTaskScheduler();
}

}

static class CustomDLPR extends DeadLetterPublishingRecoverer {

CustomDLPR(Function<ProducerRecord<?, ?>, KafkaOperations<?, ?>> templateResolver,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
super(templateResolver, destinationResolver);
}

}

}

0 comments on commit cd7de80

Please sign in to comment.