Skip to content

Commit

Permalink
GH-1287: Detect mis-configured deserialization
Browse files Browse the repository at this point in the history
Resolves #1287

Throw an `IllegalStateException` in the `SeekToCurrentErrorHandler` if
the root exception is a `SerializationException`.

Handling of deserializatin problms requires the configuration of an
`ErrorHandlingDeserializer2`.

**cherry-pick to 2.2.x, 2.1.x**

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java
#	spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentErrorHandlerTests.java
  • Loading branch information
garyrussell authored and artembilan committed Oct 30, 2019
1 parent 169f118 commit 8d0bb39
Showing 1 changed file with 7 additions and 0 deletions.
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;

import org.springframework.kafka.KafkaException;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
Expand Down Expand Up @@ -120,6 +121,12 @@ public void setCommitRecovered(boolean commitRecovered) {
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer, MessageListenerContainer container) {

if (thrownException instanceof SerializationException) {
throw new IllegalStateException("This error handler cannot process 'SerializationException's directly, "
+ "please consider configuring an 'ErrorHandlingDeserializer2' in the value and/or key "
+ "deserializer", thrownException);
}

if (!SeekUtils.doSeeks(records, consumer, thrownException, true, this.failureTracker::skip, LOGGER)) {
throw new KafkaException("Seek to current after exception", thrownException);
}
Expand Down

0 comments on commit 8d0bb39

Please sign in to comment.