Skip to content

Commit

Permalink
GH-2269: Improve DLPR Extensibility
Browse files Browse the repository at this point in the history
Resolves #2269

- add getters for fields used in protected methods
- change more methods to protected

**Cherry-pick to 2.9.x, 2.8.x, 2.7.x**

If it doesn't cherry-pick cleanly, I will back-port.

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java
  • Loading branch information
garyrussell authored and artembilan committed May 11, 2022
1 parent a08267d commit a1c2092
Showing 1 changed file with 114 additions and 53 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -69,7 +69,7 @@ public class DeadLetterPublishingRecoverer implements ConsumerAwareRecordRecover
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR

private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition());
DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition());

private static final long FIVE = 5L;

Expand Down Expand Up @@ -157,7 +157,6 @@ public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Obj
* @param templates the {@link KafkaOperations}s to use for publishing.
* @param destinationResolver the resolving function.
*/
@SuppressWarnings("unchecked")
public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {

Expand All @@ -170,28 +169,28 @@ public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Obj
this.transactional = firstTemplate.isTransactional();
Boolean tx = this.transactional;
Assert.isTrue(templates.values()
.stream()
.map(t -> t.isTransactional())
.allMatch(t -> t.equals(tx)), "All templates must have the same setting for transactional");
.stream()
.map(KafkaOperations::isTransactional)
.allMatch(t -> t.equals(tx)), "All templates must have the same setting for transactional");
this.destinationResolver = destinationResolver;
}

/**
* Create an instance with a template resolving function that receives the failed
* consumer record and the exception and returns a {@link KafkaOperations} and a
* flag on whether or not the publishing from this instance will be transactional
* or not. Also receives a destination resolving function that works similarly but
* returns a {@link TopicPartition} instead. If the partition in the {@link TopicPartition}
* is less than 0, no partition is set when publishing to the topic.
*
* @param templateResolver the function that resolver the {@link KafkaOperations} to use for publishing.
* @param transactional whether or not publishing by this instance should be transactional
* @param destinationResolver the resolving function.
* @since 2.7
*/
* Create an instance with a template resolving function that receives the failed
* consumer record and the exception and returns a {@link KafkaOperations} and a
* flag on whether the publishing from this instance will be transactional
* or not. Also receives a destination resolving function that works similarly but
* returns a {@link TopicPartition} instead. If the partition in the {@link TopicPartition}
* is less than 0, no partition is set when publishing to the topic.
*
* @param templateResolver the function that resolver the {@link KafkaOperations} to use for publishing.
* @param transactional whether publishing by this instance should be transactional
* @param destinationResolver the resolving function.
* @since 2.7
*/
public DeadLetterPublishingRecoverer(Function<ProducerRecord<?, ?>, KafkaOperations<?, ?>> templateResolver,
boolean transactional,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
boolean transactional,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {

Assert.notNull(templateResolver, "The templateResolver cannot be null");
Assert.notNull(destinationResolver, "The destinationResolver cannot be null");
Expand Down Expand Up @@ -293,7 +292,18 @@ public void setFailIfSendResultIsError(boolean failIfSendResultIsError) {
}

/**
* Set the minumum time to wait for message sending. Default is the producer
* If true, wait for the send result and throw an exception if it fails.
* It will wait for the milliseconds specified in waitForSendResultTimeout for the result.
* @return true to wait.
* @since 2.7.14
* @see #setWaitForSendResultTimeout(Duration)
*/
protected boolean isFailIfSendResultIsError() {
return this.failIfSendResultIsError;
}

/**
* Set the minimum time to wait for message sending. Default is the producer
* configuration {@code delivery.timeout.ms} plus the {@link #setTimeoutBuffer(long)}.
* @param waitForSendResultTimeout the timeout.
* @since 2.7
Expand All @@ -305,8 +315,9 @@ public void setWaitForSendResultTimeout(Duration waitForSendResultTimeout) {
}

/**
* Set the number of milliseconds to add to the producer configuration {@code delivery.timeout.ms}
* property to avoid timing out before the Kafka producer. Default 5000.
* Set the number of milliseconds to add to the producer configuration
* {@code delivery.timeout.ms} property to avoid timing out before the Kafka producer.
* Default 5000.
* @param buffer the buffer.
* @since 2.7
* @see #setWaitForSendResultTimeout(Duration)
Expand All @@ -316,17 +327,36 @@ public void setTimeoutBuffer(long buffer) {
}

/**
* Set to true to remove previous exception headers and only retain headers for the
* current exception. Default is false, which means all exception header values are
* retained; this can cause a growth in record size when a record is republished many
* times.
* @param stripPreviousExceptionHeaders true to strip.
* The number of milliseconds to add to the producer configuration
* {@code delivery.timeout.ms} property to avoid timing out before the Kafka producer.
* @return the buffer.
* @since 2.7.14
*/
protected long getTimeoutBuffer() {
return this.timeoutBuffer;
}

/**
* Set to false to retain previous exception headers as well as headers for the
* current exception. Default is true, which means only the current headers are
* retained; setting it to false this can cause a growth in record size when a record
* is republished many times.
* @param stripPreviousExceptionHeaders false to retain all.
* @since 2.7.9
*/
public void setStripPreviousExceptionHeaders(boolean stripPreviousExceptionHeaders) {
this.stripPreviousExceptionHeaders = stripPreviousExceptionHeaders;
}

/**
* True if publishing should run in a transaction.
* @return true for transactional.
* @since 2.7.14
*/
protected boolean isTransactional() {
return this.transactional;
}

@SuppressWarnings("unchecked")
@Override
public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consumer, Exception exception) {
Expand All @@ -341,7 +371,7 @@ public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consume
DeserializationException vDeserEx = ListenerUtils.getExceptionFromHeader(record,
ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
DeserializationException kDeserEx = ListenerUtils.getExceptionFromHeader(record,
ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER, this.logger);
ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER, this.logger);
Headers headers = new RecordHeaders(record.headers().toArray());
addAndEnhanceHeaders(record, exception, vDeserEx, kDeserEx, headers);
ProducerRecord<Object, Object> outRecord = createProducerRecord(record, tp, headers,
Expand Down Expand Up @@ -385,7 +415,7 @@ private void sendOrThrow(ProducerRecord<Object, Object> outRecord,

private void maybeThrow(ConsumerRecord<?, ?> record, Exception exception) {
String message = String.format("No destination returned for record %s and exception %s. " +
"failIfNoDestinationReturned: %s", ListenerUtils.recordToString(record), exception,
"failIfNoDestinationReturned: %s", ListenerUtils.recordToString(record), exception,
this.throwIfNoDestinationReturned);
this.logger.warn(message);
if (this.throwIfNoDestinationReturned) {
Expand Down Expand Up @@ -437,7 +467,7 @@ private TopicPartition checkPartition(TopicPartition tp, Consumer<?, ?> consumer

@SuppressWarnings("unchecked")
private KafkaOperations<Object, Object> findTemplateForValue(@Nullable Object value,
Map<Class<?>, KafkaOperations<?, ?>> templates) {
Map<Class<?>, KafkaOperations<?, ?>> templates) {
if (value == null) {
KafkaOperations<?, ?> operations = templates.get(Void.class);
if (operations == null) {
Expand All @@ -448,16 +478,16 @@ private KafkaOperations<Object, Object> findTemplateForValue(@Nullable Object va
}
}
Optional<Class<?>> key = templates.keySet()
.stream()
.filter((k) -> k.isAssignableFrom(value.getClass()))
.findFirst();
.stream()
.filter((k) -> k.isAssignableFrom(value.getClass()))
.findFirst();
if (key.isPresent()) {
return (KafkaOperations<Object, Object>) templates.get(key.get());
}
this.logger.warn(() -> "Failed to find a template for " + value.getClass() + " attempting to use the last entry");
return (KafkaOperations<Object, Object>) templates.values()
.stream()
.reduce((first, second) -> second)
.reduce((first, second) -> second)
.get();
}

Expand Down Expand Up @@ -509,7 +539,13 @@ protected void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations
}
}

private void verifySendResult(KafkaOperations<Object, Object> kafkaTemplate,
/**
* Wait for the send future to complete.
* @param kafkaTemplate the template used to send the record.
* @param outRecord the record.
* @param sendResult the future.
*/
protected void verifySendResult(KafkaOperations<Object, Object> kafkaTemplate,
ProducerRecord<Object, Object> outRecord,
@Nullable ListenableFuture<SendResult<Object, Object>> sendResult) {

Expand All @@ -529,7 +565,14 @@ private void verifySendResult(KafkaOperations<Object, Object> kafkaTemplate,
}
}

private Duration determineSendTimeout(KafkaOperations<?, ?> template) {
/**
* Determine the send timeout based on the template's producer factory and
* {@link #setWaitForSendResultTimeout(Duration)}.
* @param template the template.
* @return the timeout.
* @since 2.7.14
*/
protected Duration determineSendTimeout(KafkaOperations<?, ?> template) {
ProducerFactory<? extends Object, ? extends Object> producerFactory = template.getProducerFactory();
if (producerFactory != null) { // NOSONAR - will only occur in mock tests
Map<String, Object> props = producerFactory.getConfigurationProperties();
Expand Down Expand Up @@ -608,18 +651,18 @@ private String getStackTraceAsString(Throwable cause) {
protected HeaderNames getHeaderNames() {
return HeaderNames.Builder
.original()
.offsetHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)
.timestampHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)
.timestampTypeHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)
.topicHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)
.partitionHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)
.offsetHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)
.timestampHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)
.timestampTypeHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)
.topicHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)
.partitionHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)
.exception()
.keyExceptionFqcn(KafkaHeaders.DLT_KEY_EXCEPTION_FQCN)
.exceptionFqcn(KafkaHeaders.DLT_EXCEPTION_FQCN)
.keyExceptionMessage(KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE)
.exceptionMessage(KafkaHeaders.DLT_EXCEPTION_MESSAGE)
.keyExceptionStacktrace(KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE)
.exceptionStacktrace(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)
.keyExceptionFqcn(KafkaHeaders.DLT_KEY_EXCEPTION_FQCN)
.exceptionFqcn(KafkaHeaders.DLT_EXCEPTION_FQCN)
.keyExceptionMessage(KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE)
.exceptionMessage(KafkaHeaders.DLT_EXCEPTION_MESSAGE)
.keyExceptionStacktrace(KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE)
.exceptionStacktrace(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)
.build();
}

Expand All @@ -631,6 +674,7 @@ protected HeaderNames getHeaderNames() {
public static class HeaderNames {

private final HeaderNames.Original original;

private final ExceptionInfo exceptionInfo;

HeaderNames(HeaderNames.Original original, ExceptionInfo exceptionInfo) {
Expand All @@ -639,10 +683,15 @@ public static class HeaderNames {
}

static class Original {

private final String offsetHeader;

private final String timestampHeader;

private final String timestampTypeHeader;

private final String topicHeader;

private final String partitionHeader;

Original(String offsetHeader,
Expand All @@ -656,30 +705,37 @@ static class Original {
this.topicHeader = topicHeader;
this.partitionHeader = partitionHeader;
}

}

static class ExceptionInfo {

private final String keyExceptionFqcn;

private final String exceptionFqcn;

private final String keyExceptionMessage;

private final String exceptionMessage;

private final String keyExceptionStacktrace;

private final String exceptionStacktrace;

ExceptionInfo(String keyExceptionFqcn,
String exceptionFqcn,
String keyExceptionMessage,
String exceptionMessage,
String keyExceptionStacktrace,
String exceptionStacktrace) {
String exceptionFqcn,
String keyExceptionMessage,
String exceptionMessage,
String keyExceptionStacktrace,
String exceptionStacktrace) {
this.keyExceptionFqcn = keyExceptionFqcn;
this.exceptionFqcn = exceptionFqcn;
this.keyExceptionMessage = keyExceptionMessage;
this.exceptionMessage = exceptionMessage;
this.keyExceptionStacktrace = keyExceptionStacktrace;
this.exceptionStacktrace = exceptionStacktrace;
}

}


Expand Down Expand Up @@ -805,6 +861,7 @@ private DeadLetterPublishingRecoverer.HeaderNames.Original build() {
this.topicHeader,
this.partitionHeader);
}

}

/**
Expand Down Expand Up @@ -919,7 +976,11 @@ public DeadLetterPublishingRecoverer.HeaderNames build() {
this.keyExceptionStacktrace,
this.exceptionStacktrace));
}

}

}

}

}

0 comments on commit a1c2092

Please sign in to comment.