From 8d0bb39975c82971165dd75b8309b34ec8621f58 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 29 Oct 2019 16:31:45 -0400 Subject: [PATCH] GH-1287: Detect mis-configured deserialization Resolves https://github.com/spring-projects/spring-kafka/issues/1287 Throw an `IllegalStateException` in the `SeekToCurrentErrorHandler` if the root exception is a `SerializationException`. Handling of deserializatin problms requires the configuration of an `ErrorHandlingDeserializer2`. **cherry-pick to 2.2.x, 2.1.x** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java # spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentErrorHandlerTests.java --- .../kafka/listener/SeekToCurrentErrorHandler.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java index f2ac1b64ec..b797c87df9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java @@ -28,6 +28,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.SerializationException; import org.springframework.kafka.KafkaException; import org.springframework.kafka.listener.ContainerProperties.AckMode; @@ -120,6 +121,12 @@ public void setCommitRecovered(boolean commitRecovered) { public void handle(Exception thrownException, List> records, Consumer consumer, MessageListenerContainer container) { + if (thrownException instanceof SerializationException) { + throw new IllegalStateException("This error handler cannot process 'SerializationException's directly, " + + "please consider configuring an 'ErrorHandlingDeserializer2' in the value and/or key " + + "deserializer", thrownException); + } + if (!SeekUtils.doSeeks(records, consumer, thrownException, true, this.failureTracker::skip, LOGGER)) { throw new KafkaException("Seek to current after exception", thrownException); }