diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java
index 4f46260485..0d0fb25879 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java
@@ -29,6 +29,7 @@
import org.springframework.classify.BinaryExceptionClassifier;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.support.TopicPartitionOffset;
+import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.MessageConversionException;
@@ -141,6 +142,7 @@ public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
*
* - {@link DeserializationException}
* - {@link MessageConversionException}
+ * - {@link ConversionException}
* - {@link MethodArgumentResolutionException}
* - {@link NoSuchMethodException}
* - {@link ClassCastException}
@@ -163,6 +165,7 @@ public void addNotRetryableException(Class extends Exception> exceptionType) {
*
* - {@link DeserializationException}
* - {@link MessageConversionException}
+ * - {@link ConversionException}
* - {@link MethodArgumentResolutionException}
* - {@link NoSuchMethodException}
* - {@link ClassCastException}
@@ -191,6 +194,7 @@ public final void addNotRetryableExceptions(Class extends Exception>... except
*
* - {@link DeserializationException}
* - {@link MessageConversionException}
+ * - {@link ConversionException}
* - {@link MethodArgumentResolutionException}
* - {@link NoSuchMethodException}
* - {@link ClassCastException}
@@ -234,6 +238,7 @@ private static ExtendedBinaryExceptionClassifier configureDefaultClassifier() {
Map, Boolean> classified = new HashMap<>();
classified.put(DeserializationException.class, false);
classified.put(MessageConversionException.class, false);
+ classified.put(ConversionException.class, false);
classified.put(MethodArgumentResolutionException.class, false);
classified.put(NoSuchMethodException.class, false);
classified.put(ClassCastException.class, false);
diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentErrorHandlerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentErrorHandlerTests.java
index 04994f6968..60e127e67e 100644
--- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentErrorHandlerTests.java
+++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentErrorHandlerTests.java
@@ -37,6 +37,7 @@
import org.mockito.InOrder;
import org.springframework.kafka.KafkaException;
+import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.support.serializer.DeserializationException;
/**
@@ -68,6 +69,10 @@ consumer, mock(MessageListenerContainer.class)))
handler.handle(new DeserializationException("intended", null, false, illegalState), records,
consumer, mock(MessageListenerContainer.class));
assertThat(recovered.get()).isSameAs(record1);
+ recovered.set(null);
+ handler.handle(new ConversionException("intended", null), records,
+ consumer, mock(MessageListenerContainer.class));
+ assertThat(recovered.get()).isSameAs(record1);
handler.addNotRetryableExceptions(IllegalStateException.class);
recovered.set(null);
recovererShouldFail.set(true);
@@ -77,7 +82,7 @@ consumer, mock(MessageListenerContainer.class)))
assertThat(recovered.get()).isSameAs(record1);
InOrder inOrder = inOrder(consumer);
inOrder.verify(consumer).seek(new TopicPartition("foo", 0), 0L); // not recovered so seek
- inOrder.verify(consumer, times(2)).seek(new TopicPartition("foo", 1), 1L);
+ inOrder.verify(consumer, times(3)).seek(new TopicPartition("foo", 1), 1L);
inOrder.verify(consumer).seek(new TopicPartition("foo", 0), 0L); // recovery failed
inOrder.verify(consumer, times(2)).seek(new TopicPartition("foo", 1), 1L);
inOrder.verifyNoMoreInteractions();
diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc
index 6472f7874a..04103b1ddc 100644
--- a/src/reference/asciidoc/kafka.adoc
+++ b/src/reference/asciidoc/kafka.adoc
@@ -4611,6 +4611,7 @@ The exceptions that are considered fatal, by default, are:
* `DeserializationException`
* `MessageConversionException`
+* `ConversionException`
* `MethodArgumentResolutionException`
* `NoSuchMethodException`
* `ClassCastException`
@@ -4863,6 +4864,7 @@ The exceptions that are considered fatal, by default, are:
* `DeserializationException`
* `MessageConversionException`
+* `ConversionException`
* `MethodArgumentResolutionException`
* `NoSuchMethodException`
* `ClassCastException`