Skip to content

Commit

Permalink
GH-1700: Check Partition in DLPublishingRecoverer
Browse files Browse the repository at this point in the history
Resolves #1700
  • Loading branch information
garyrussell authored and artembilan committed Feb 10, 2021
1 parent 8995e4d commit bdd9ee8
Show file tree
Hide file tree
Showing 14 changed files with 184 additions and 35 deletions.
4 changes: 4 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5283,6 +5283,10 @@ public ErrorHandler eh(KafkaOperations<String, String> template) {
----
====

Starting with version 2.7, the recoverer checks that the partition selected by the destination resolver actually exists.
If the partition is not present, the partition in the `ProducerRecord` is set to `null`, allowing the `KafkaProducer` to select the partition.
You can disable this check by setting the `verifyPartition` property to `false`.

[[kerberos]]
==== JAAS and Kerberos

Expand Down
3 changes: 3 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ See <<kafka-validation>> for more information.
Now, if both the key and value fail deserialization, the original values are published to the DLT.
Previously, the value was populated but the key `DeserializationException` remained in the headers.
There is a breaking API change, if you subclassed the recoverer and overrode the `createProducerRecord` method.

In addition, the recoverer verifies that the partition selected by the destination resolver actually exists before publishing to it.

See <<dead-letters>> for more information.

[[x27-CKTM]]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 the original author or authors.
* Copyright 2015-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.TopicPartition;

import org.springframework.kafka.support.SendResult;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.concurrent.ListenableFuture;

Expand Down Expand Up @@ -167,6 +168,7 @@ public interface KafkaOperations<K, V> {
* @return the result.
* @since 1.1
*/
@Nullable
<T> T execute(ProducerCallback<K, V, T> callback);

/**
Expand All @@ -178,6 +180,7 @@ public interface KafkaOperations<K, V> {
* @return the result.
* @since 1.1
*/
@Nullable
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.lang.Nullable;

/**
* A {@link ConsumerRecordRecoverer} that supports getting a reference to the
* {@link Consumer}.
*
* @author Gary Russell
* @since 2.7
*
*/
@FunctionalInterface
public interface ConsumerAwareRecordRecoverer extends ConsumerRecordRecoverer {


@Override
default void accept(ConsumerRecord<?, ?> record, Exception exception) {
accept(record, null, exception);
}

/**
* Recover the record.
* @param record the record.
* @param consumer the consumer.
* @param exception the exception.
* @since 2.7
*/
void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consumer, Exception exception);

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
Expand All @@ -50,13 +54,15 @@
* @since 2.2
*
*/
public class DeadLetterPublishingRecoverer implements ConsumerRecordRecoverer {
public class DeadLetterPublishingRecoverer implements ConsumerAwareRecordRecoverer {

protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR

private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition());

private static final long FIVE = 5L;

private final KafkaOperations<Object, Object> template;

private final Map<Class<?>, KafkaOperations<?, ?>> templates;
Expand All @@ -69,6 +75,10 @@ public class DeadLetterPublishingRecoverer implements ConsumerRecordRecoverer {

private BiFunction<ConsumerRecord<?, ?>, Exception, Headers> headersFunction = (rec, ex) -> null;

private boolean verifyPartition = true;

private Duration partitionInfoTimeout = Duration.ofSeconds(FIVE);

/**
* Create an instance with the provided template and a default destination resolving
* function that returns a TopicPartition based on the original topic (appended with ".DLT")
Expand Down Expand Up @@ -165,9 +175,36 @@ public void setHeadersFunction(BiFunction<ConsumerRecord<?, ?>, Exception, Heade
this.headersFunction = headersFunction;
}

/**
* Set to false to disable partition verification. When true, verify that the
* partition returned by the resolver actually exists. If not, set the
* {@link ProducerRecord#partition()} to null, allowing the producer to determine the
* destination partition.
* @param verifyPartition false to disable.
* @since 2.7
* @see #setPartitionInfoTimeout(Duration)
*/
public void setVerifyPartition(boolean verifyPartition) {
this.verifyPartition = verifyPartition;
}

/**
* Time to wait for partition information when verifying. Default is 5 seconds.
* @param partitionInfoTimeout the timeout.
* @since 2.7
* @see #setVerifyPartition(boolean)
*/
public void setPartitionInfoTimeout(Duration partitionInfoTimeout) {
Assert.notNull(partitionInfoTimeout, "'partitionInfoTimeout' cannot be null");
this.partitionInfoTimeout = partitionInfoTimeout;
}

@Override
public void accept(ConsumerRecord<?, ?> record, Exception exception) {
public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consumer, Exception exception) {
TopicPartition tp = this.destinationResolver.apply(record, exception);
if (consumer != null && this.verifyPartition) {
tp = checkPartition(tp, consumer);
}
DeserializationException vDeserEx = ListenerUtils.getExceptionFromHeader(record,
ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
DeserializationException kDeserEx = ListenerUtils.getExceptionFromHeader(record,
Expand Down Expand Up @@ -195,6 +232,30 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
}
}

private TopicPartition checkPartition(TopicPartition tp, Consumer<?, ?> consumer) {
if (tp.partition() < 0) {
return tp;
}
try {
List<PartitionInfo> partitions = consumer.partitionsFor(tp.topic(), this.partitionInfoTimeout);
if (partitions == null) {
this.logger.debug(() -> "Could not obtain partition info for " + tp.topic());
return tp;
}
boolean anyMatch = partitions.stream().anyMatch(pi -> pi.partition() == tp.partition());
if (!anyMatch) {
this.logger.warn(() -> "Destination resolver returned non-existent partition " + tp
+ ", KafkaProducer will determine partition to use for this topic");
return new TopicPartition(tp.topic(), -1);
}
return tp;
}
catch (Exception ex) {
this.logger.debug(ex, () -> "Could not obtain partition info for " + tp.topic());
return tp;
}
}

@SuppressWarnings("unchecked")
private KafkaOperations<Object, Object> findTemplateForValue(@Nullable Object value) {
if (this.template != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ protected RecoveryStrategy getRecoveryStrategy(List<ConsumerRecord<?, ?>> record
this.failureTracker.getRetryListeners().forEach(rl ->
rl.recoveryFailed(records.get(0), thrownException, ex));
}
return (rec, excep, cont) -> NEVER_SKIP_PREDICATE.test(rec, excep);
return (rec, excep, cont, consumer) -> NEVER_SKIP_PREDICATE.test(rec, excep);
}
return (rec, excep, cont) -> ALWAYS_SKIP_PREDICATE.test(rec, excep);
return (rec, excep, cont, consumer) -> ALWAYS_SKIP_PREDICATE.test(rec, excep);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.function.BiConsumer;
import java.util.function.BiFunction;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;

Expand All @@ -46,7 +47,7 @@ class FailedRecordTracker implements RecoveryStrategy {

private final ThreadLocal<Map<TopicPartition, FailedRecord>> failures = new ThreadLocal<>(); // intentionally not static

private final BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer;
private final ConsumerAwareRecordRecoverer recoverer;

private final boolean noRetries;

Expand All @@ -65,7 +66,7 @@ class FailedRecordTracker implements RecoveryStrategy {

Assert.notNull(backOff, "'backOff' cannot be null");
if (recoverer == null) {
this.recoverer = (rec, thr) -> {
this.recoverer = (rec, consumer, thr) -> {
Map<TopicPartition, FailedRecord> map = this.failures.get();
FailedRecord failedRecord = null;
if (map != null) {
Expand All @@ -79,7 +80,12 @@ class FailedRecordTracker implements RecoveryStrategy {
};
}
else {
this.recoverer = recoverer;
if (recoverer instanceof ConsumerAwareRecordRecoverer) {
this.recoverer = (ConsumerAwareRecordRecoverer) recoverer;
}
else {
this.recoverer = (rec, consumer, ex) -> recoverer.accept(rec, ex);
}
}
this.noRetries = backOff.start().nextBackOff() == BackOffExecution.STOP;
this.backOff = backOff;
Expand Down Expand Up @@ -136,7 +142,7 @@ List<RetryListener> getRetryListeners() {

boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
try {
return recovered(record, exception, null);
return recovered(record, exception, null, null);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -146,10 +152,11 @@ boolean skip(ConsumerRecord<?, ?> record, Exception exception) {

@Override
public boolean recovered(ConsumerRecord<?, ?> record, Exception exception,
@Nullable MessageListenerContainer container) throws InterruptedException {
@Nullable MessageListenerContainer container,
@Nullable Consumer<?, ?> consumer) throws InterruptedException {

if (this.noRetries) {
attemptRecovery(record, exception, null);
attemptRecovery(record, exception, null, consumer);
return true;
}
Map<TopicPartition, FailedRecord> map = this.failures.get();
Expand All @@ -172,7 +179,7 @@ public boolean recovered(ConsumerRecord<?, ?> record, Exception exception,
return false;
}
else {
attemptRecovery(record, exception, topicPartition);
attemptRecovery(record, exception, topicPartition, consumer);
map.remove(topicPartition);
if (map.isEmpty()) {
this.failures.remove();
Expand Down Expand Up @@ -213,9 +220,11 @@ private BackOff determineBackOff(ConsumerRecord<?, ?> record, Exception exceptio
return backOffToUse != null ? backOffToUse : this.backOff;
}

private void attemptRecovery(ConsumerRecord<?, ?> record, Exception exception, @Nullable TopicPartition tp) {
private void attemptRecovery(ConsumerRecord<?, ?> record, Exception exception, @Nullable TopicPartition tp,
Consumer<?, ?> consumer) {

try {
this.recoverer.accept(record, exception);
this.recoverer.accept(record, consumer, exception);
this.retryListeners.forEach(rl -> rl.recovered(record, exception));
}
catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.kafka.listener;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.lang.Nullable;
Expand All @@ -35,10 +36,11 @@ public interface RecoveryStrategy {
* @param record the record.
* @param ex the exception.
* @param container the container (or parent if a child container).
* @param consumer the consumer.
* @return true to skip.
* @throws InterruptedException if the thread is interrupted.
*/
boolean recovered(ConsumerRecord<?, ?> record, Exception ex, @Nullable MessageListenerContainer container)
throws InterruptedException;
boolean recovered(ConsumerRecord<?, ?> record, Exception ex, @Nullable MessageListenerContainer container,
@Nullable Consumer<?, ?> consumer) throws InterruptedException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private SeekUtils() {
public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, Exception exception,
boolean recoverable, BiPredicate<ConsumerRecord<?, ?>, Exception> skipper, LogAccessor logger) {

return doSeeks(records, consumer, exception, recoverable, (rec, ex, cont) -> skipper.test(rec, ex), null,
return doSeeks(records, consumer, exception, recoverable, (rec, ex, cont, cons) -> skipper.test(rec, ex), null,
logger);
}

Expand All @@ -101,7 +101,7 @@ public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?>
records.forEach(record -> {
if (recoverable && first.get()) {
try {
boolean test = recovery.recovered(record, exception, container);
boolean test = recovery.recovered(record, exception, container, consumer);
skipped.set(test);
}
catch (Exception ex) {
Expand Down Expand Up @@ -161,7 +161,7 @@ public static void seekOrRecover(Exception thrownException, List<ConsumerRecord<
BiPredicate<ConsumerRecord<?, ?>, Exception> skipPredicate, LogAccessor logger, Level level) {

seekOrRecover(thrownException, records, consumer, container, commitRecovered,
(rec, ex, cont) -> skipPredicate.test(rec, ex), logger, level);
(rec, ex, cont, cons) -> skipPredicate.test(rec, ex), logger, level);

}

Expand Down

0 comments on commit bdd9ee8

Please sign in to comment.