Skip to content

Commit

Permalink
GH-1287: Detect mis-configured deserialization
Browse files Browse the repository at this point in the history
Resolves #1287

Throw an `IllegalStateException` in the `SeekToCurrentErrorHandler` if
the root exception is a `SerializationException`.

Handling of deserializatin problms requires the configuration of an
`ErrorHandlingDeserializer2`.

**cherry-pick to 2.2.x, 2.1.x**
  • Loading branch information
garyrussell authored and artembilan committed Oct 30, 2019
1 parent 78a0f15 commit 30dca49
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;

import org.springframework.classify.BinaryExceptionClassifier;
import org.springframework.kafka.KafkaException;
Expand Down Expand Up @@ -180,6 +181,12 @@ public void setClassifier(BinaryExceptionClassifier classifier) {
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer, MessageListenerContainer container) {

if (thrownException instanceof SerializationException) {
throw new IllegalStateException("This error handler cannot process 'SerializationException's directly, "
+ "please consider configuring an 'ErrorHandlingDeserializer2' in the value and/or key "
+ "deserializer", thrownException);
}

if (!SeekUtils.doSeeks(records, consumer, thrownException, true, getSkipPredicate(records, thrownException),
LOGGER)) {
throw new KafkaException("Seek to current after exception", thrownException);
Expand Down
Expand Up @@ -18,6 +18,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand All @@ -30,6 +31,7 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;

Expand Down Expand Up @@ -93,4 +95,13 @@ public void testDeprecatedCtor() {
assertThat(KafkaTestUtils.getPropertyValue(handler, "failureTracker.backOff.maxAttempts"))
.isEqualTo(9L);
}

@Test
void testSerializationException() {
SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler();
SerializationException thrownException = new SerializationException();
assertThatIllegalStateException().isThrownBy(() -> handler.handle(thrownException, null, null, null))
.withCause(thrownException);
}

}

0 comments on commit 30dca49

Please sign in to comment.