Skip to content

Commit

Permalink
GH-1260: Add exception classification to DARP
Browse files Browse the repository at this point in the history
Resolves #1260

Refactor common code with `SeekToCurrentErrorHandler` into
`FailedRecordProcessor` super class.

* Polishing to address PR comment
  • Loading branch information
garyrussell authored and artembilan committed Oct 4, 2019
1 parent 37a1b5b commit 457e63a
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@
import java.util.List;
import java.util.function.BiConsumer;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SeekUtils;
import org.springframework.lang.Nullable;
Expand All @@ -49,14 +47,7 @@
* @since 1.3.5
*
*/
public class DefaultAfterRollbackProcessor<K, V> implements AfterRollbackProcessor<K, V> {

private static final LogAccessor LOGGER =
new LogAccessor(LogFactory.getLog(DefaultAfterRollbackProcessor.class));

private final FailedRecordTracker failureTracker;

private boolean commitRecovered;
public class DefaultAfterRollbackProcessor<K, V> extends FailedRecordProcessor implements AfterRollbackProcessor<K, V> {

private KafkaTemplate<K, V> kafkaTemplate;

Expand Down Expand Up @@ -126,7 +117,8 @@ public DefaultAfterRollbackProcessor(BiConsumer<ConsumerRecord<?, ?>, Exception>
public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer,
int maxFailures) {

this.failureTracker = new FailedRecordTracker(recoverer, new FixedBackOff(0L, maxFailures - 1), LOGGER);
// Remove super CTOR when this is removed.
super(recoverer, maxFailures);
}

/**
Expand All @@ -139,16 +131,17 @@ public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>,
public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer,
BackOff backOff) {

this.failureTracker = new FailedRecordTracker(recoverer, backOff, LOGGER);
super(recoverer, backOff);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception,
boolean recoverable) {

if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable, this.failureTracker::skip, LOGGER)
&& this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable,
getSkipPredicate((List) records, exception), LOGGER)
&& isCommitRecovered() && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
ConsumerRecord<K, V> skipped = records.get(0);
this.kafkaTemplate.sendOffsetsToTransaction(
Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
Expand All @@ -158,10 +151,11 @@ public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,

@Override
public boolean isProcessInTransaction() {
return this.commitRecovered;
return isCommitRecovered();
}

/**
* {@inheritDoc}
* Set to true to and the container will run the
* {@link #process(List, Consumer, Exception, boolean)} method in a transaction and,
* if a record is skipped and recovered, we will send its offset to the transaction.
Expand All @@ -172,8 +166,9 @@ public boolean isProcessInTransaction() {
* @see #process(List, Consumer, Exception, boolean)
* @see #setKafkaTemplate(KafkaTemplate)
*/
@Override
public void setCommitRecovered(boolean commitRecovered) {
this.commitRecovered = commitRecovered;
super.setCommitRecovered(commitRecovered); // NOSONAR enhanced javadoc
}

/**
Expand All @@ -187,9 +182,4 @@ public void setKafkaTemplate(KafkaTemplate<K, V> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

@Override
public void clearThreadState() {
this.failureTracker.clearThreadState();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.classify.BinaryExceptionClassifier;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException;
import org.springframework.util.Assert;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

/**
* Common super class for classes that deal with failing to consume a consumer record.
*
* @author Gary Russell
* @since 2.3.1
*
*/
public abstract class FailedRecordProcessor {

private static final BiPredicate<ConsumerRecord<?, ?>, Exception> ALWAYS_SKIP_PREDICATE = (r, e) -> true;

private static final BiPredicate<ConsumerRecord<?, ?>, Exception> NEVER_SKIP_PREDICATE = (r, e) -> false;

protected static final LogAccessor LOGGER =
new LogAccessor(LogFactory.getLog(SeekToCurrentErrorHandler.class)); // NOSONAR

private final FailedRecordTracker failureTracker;

private BinaryExceptionClassifier classifier;

private boolean commitRecovered;

protected FailedRecordProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff) {
this.failureTracker = new FailedRecordTracker(recoverer, backOff, LOGGER);
this.classifier = configureDefaultClassifier();
}

/**
* TODO: remove when the deprecated dependent CTORs are removed.
* @param recoverer the recoverer.
* @param maxFailures the max failures.
*/
FailedRecordProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, int maxFailures) {
this.failureTracker = new FailedRecordTracker(recoverer, new FixedBackOff(0L, maxFailures - 1), LOGGER);
this.classifier = configureDefaultClassifier();
}

/**
* Return the exception classifier.
* @return the classifier.
*/
protected BinaryExceptionClassifier getClassifier() {
return this.classifier;
}

/**
* Set an exception classifications to determine whether the exception should cause a retry
* (until exhaustion) or not. If not, we go straight to the recoverer. By default,
* the following exceptions will not be retried:
* <ul>
* <li>{@link DeserializationException}</li>
* <li>{@link MessageConversionException}</li>
* <li>{@link MethodArgumentResolutionException}</li>
* <li>{@link NoSuchMethodException}</li>
* <li>{@link ClassCastException}</li>
* </ul>
* All others will be retried.
* When calling this method, the defaults will not be applied.
* @param classifications the classifications.
* @param defaultValue whether or not to retry non-matching exceptions.
* @see BinaryExceptionClassifier#BinaryExceptionClassifier(Map, boolean)
*/
public void setClassifications(Map<Class<? extends Throwable>, Boolean> classifications, boolean defaultValue) {
Assert.notNull(classifications, "'classifications' + cannot be null");
this.classifier = new ExtendedBinaryExceptionClassifier(classifications, defaultValue);
}

protected void setClassifier(BinaryExceptionClassifier classifier) {
this.classifier = classifier;
}

/**
* Whether the offset for a recovered record should be committed.
* @return true to commit recovered record offsets.
*/
protected boolean isCommitRecovered() {
return this.commitRecovered;
}

/**
* Set to true to commit the offset for a recovered record.
* @param commitRecovered true to commit.
*/
public void setCommitRecovered(boolean commitRecovered) {
this.commitRecovered = commitRecovered;
}

/**
* Add an exception type to the default list; if and only if an external classifier
* has not been provided. By default, the following exceptions will not be retried:
* <ul>
* <li>{@link DeserializationException}</li>
* <li>{@link MessageConversionException}</li>
* <li>{@link MethodArgumentResolutionException}</li>
* <li>{@link NoSuchMethodException}</li>
* <li>{@link ClassCastException}</li>
* </ul>
* All others will be retried.
* @param exceptionType the exception type.
* @see #removeNotRetryableException(Class)
* @see #setClassifications(Map, boolean)
*/
public void addNotRetryableException(Class<? extends Exception> exceptionType) {
Assert.isTrue(this.classifier instanceof ExtendedBinaryExceptionClassifier,
"Cannot add exception types to a supplied classifier");
((ExtendedBinaryExceptionClassifier) this.classifier).getClassified().put(exceptionType, false);
}

/**
* Remove an exception type from the configured list; if and only if an external
* classifier has not been provided. By default, the following exceptions will not be
* retried:
* <ul>
* <li>{@link DeserializationException}</li>
* <li>{@link MessageConversionException}</li>
* <li>{@link MethodArgumentResolutionException}</li>
* <li>{@link NoSuchMethodException}</li>
* <li>{@link ClassCastException}</li>
* </ul>
* All others will be retried.
* @param exceptionType the exception type.
* @return true if the removal was successful.
* @see #addNotRetryableException(Class)
* @see #setClassifications(Map, boolean)
*/
public boolean removeNotRetryableException(Class<? extends Exception> exceptionType) {
Assert.isTrue(this.classifier instanceof ExtendedBinaryExceptionClassifier,
"Cannot remove exception types from a supplied classifier");
return ((ExtendedBinaryExceptionClassifier) this.classifier).getClassified().remove(exceptionType);
}

protected BiPredicate<ConsumerRecord<?, ?>, Exception> getSkipPredicate(List<ConsumerRecord<?, ?>> records,
Exception thrownException) {

if (getClassifier().classify(thrownException)) {
return this.failureTracker::skip;
}
else {
try {
this.failureTracker.getRecoverer().accept(records.get(0), thrownException);
}
catch (Exception ex) {
LOGGER.error(ex, () -> "Recovery of record (" + records.get(0) + ") failed");
return NEVER_SKIP_PREDICATE;
}
return ALWAYS_SKIP_PREDICATE;
}
}

public void clearThreadState() {
this.failureTracker.clearThreadState();
}

private static BinaryExceptionClassifier configureDefaultClassifier() {
Map<Class<? extends Throwable>, Boolean> classified = new HashMap<>();
classified.put(DeserializationException.class, false);
classified.put(MessageConversionException.class, false);
classified.put(MethodArgumentResolutionException.class, false);
classified.put(NoSuchMethodException.class, false);
classified.put(ClassCastException.class, false);
ExtendedBinaryExceptionClassifier defaultClassifier = new ExtendedBinaryExceptionClassifier(classified, true);
return defaultClassifier;
}

/**
* Extended to provide visibility to the current classified exceptions.
*
* @author Gary Russell
*
*/
@SuppressWarnings("serial")
private static final class ExtendedBinaryExceptionClassifier extends BinaryExceptionClassifier {

ExtendedBinaryExceptionClassifier(Map<Class<? extends Throwable>, Boolean> typeMap, boolean defaultValue) {
super(typeMap, defaultValue);
setTraverseCauses(true);
}

@Override
protected Map<Class<? extends Throwable>, Boolean> getClassified() { // NOSONAR worthless override
return super.getClassified();
}

}

}

0 comments on commit 457e63a

Please sign in to comment.