diff --git a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java index 33f16c748d..6e80b531c5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java @@ -37,14 +37,19 @@ import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.SmartLifecycle; +import org.springframework.core.log.LogAccessor; import org.springframework.kafka.KafkaException; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.BatchMessageListener; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.GenericMessageListenerContainer; +import org.springframework.kafka.listener.ListenerUtils; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.TopicPartitionOffset; +import org.springframework.kafka.support.serializer.DeserializationException; +import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; +import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2; import org.springframework.lang.Nullable; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -92,8 +97,7 @@ public class ReplyingKafkaTemplate extends KafkaTemplate implemen private boolean sharedReplyTopic; - private Function, CorrelationKey> correlationStrategy = - ReplyingKafkaTemplate::defaultCorrelationIdStrategy; + private Function, CorrelationKey> correlationStrategy = ReplyingKafkaTemplate::defaultCorrelationIdStrategy; private String correlationHeaderName = KafkaHeaders.CORRELATION_ID; @@ -230,8 +234,8 @@ public Collection getAssignedReplyTopicPartitions() { } /** - * Set to true when multiple templates are using the same topic for replies. - * This simply changes logs for unexpected replies to debug instead of error. + * Set to true when multiple templates are using the same topic for replies. This + * simply changes logs for unexpected replies to debug instead of error. * @param sharedReplyTopic true if using a shared topic. * @since 2.2 */ @@ -439,13 +443,54 @@ public void onMessage(List> data) { logLateArrival(record, correlationId); } else { - this.logger.debug(() -> "Received: " + record + WITH_CORRELATION_ID + correlationKey); - future.set(record); + boolean ok = true; + if (record.value() == null) { + DeserializationException de = checkDeserialization(record, this.logger); + if (de != null) { + ok = false; + future.setException(de); + } + } + if (ok) { + this.logger.debug(() -> "Received: " + record + WITH_CORRELATION_ID + correlationKey); + future.set(record); + } } } }); } + /** + * Return a {@link DeserializationException} if either the key or value failed + * deserialization; null otherwise. If you need to determine whether it was the key or + * value, call + * {@link ListenerUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)} + * with {@link ErrorHandlingDeserializer#KEY_DESERIALIZER_EXCEPTION_HEADER} and + * {@link ErrorHandlingDeserializer#VALUE_DESERIALIZER_EXCEPTION_HEADER} instead. + * @param record the record. + * @param logger a {@link LogAccessor}. + * @return the {@link DeserializationException} or {@code null}. + * @since 2.2.15 + */ + @Nullable + public static DeserializationException checkDeserialization(ConsumerRecord record, LogAccessor logger) { + DeserializationException exception = ListenerUtils.getExceptionFromHeader(record, + ErrorHandlingDeserializer2.VALUE_DESERIALIZER_EXCEPTION_HEADER, logger); + if (exception != null) { + logger.error(exception, () -> "Reply value deserialization failed for " + record.topic() + "-" + + record.partition() + "@" + record.offset()); + return exception; + } + exception = ListenerUtils.getExceptionFromHeader(record, + ErrorHandlingDeserializer2.KEY_DESERIALIZER_EXCEPTION_HEADER, logger); + if (exception != null) { + logger.error(exception, () -> "Reply key deserialization failed for " + record.topic() + "-" + + record.partition() + "@" + record.offset()); + return exception; + } + return null; + } + protected void logLateArrival(ConsumerRecord record, CorrelationKey correlationId) { if (this.sharedReplyTopic) { this.logger.debug(() -> missingCorrelationLogMessage(record, correlationId)); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java index 2bdde1fdef..a259ce7912 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java @@ -51,6 +51,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.serialization.Deserializer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -78,6 +79,8 @@ import org.springframework.kafka.support.SimpleKafkaHeaderMapper; import org.springframework.kafka.support.TopicPartitionOffset; import org.springframework.kafka.support.converter.MessagingMessageConverter; +import org.springframework.kafka.support.serializer.DeserializationException; +import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; @@ -102,7 +105,7 @@ ReplyingKafkaTemplateTests.D_REPLY, ReplyingKafkaTemplateTests.D_REQUEST, ReplyingKafkaTemplateTests.E_REPLY, ReplyingKafkaTemplateTests.E_REQUEST, ReplyingKafkaTemplateTests.F_REPLY, ReplyingKafkaTemplateTests.F_REQUEST, - ReplyingKafkaTemplateTests.G_REPLY, ReplyingKafkaTemplateTests.G_REQUEST }) + ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST }) public class ReplyingKafkaTemplateTests { public static final String A_REPLY = "aReply"; @@ -133,6 +136,10 @@ public class ReplyingKafkaTemplateTests { public static final String G_REQUEST = "gRequest"; + public static final String J_REPLY = "jReply"; + + public static final String J_REQUEST = "jRequest"; + @Autowired private EmbeddedKafkaBroker embeddedKafka; @@ -179,6 +186,25 @@ public void testGood() throws Exception { } } + @Test + public void testBadDeserialize() throws Exception { + ReplyingKafkaTemplate template = createTemplate(J_REPLY, true); + try { + template.setDefaultReplyTimeout(Duration.ofSeconds(30)); + Headers headers = new RecordHeaders(); + headers.add("baz", "buz".getBytes()); + ProducerRecord record = new ProducerRecord<>(J_REQUEST, null, null, null, "foo", headers); + RequestReplyFuture future = template.sendAndReceive(record); + future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok + assertThatExceptionOfType(ExecutionException.class).isThrownBy(() -> future.get(10, TimeUnit.SECONDS)) + .withCauseExactlyInstanceOf(DeserializationException.class); + } + finally { + template.stop(); + template.destroy(); + } + } + @Test public void testMultiListenerMessageReturn() throws Exception { ReplyingKafkaTemplate template = createTemplate(C_REPLY); @@ -427,6 +453,12 @@ public void testAggregateOrphansNotStored() throws Exception { } public ReplyingKafkaTemplate createTemplate(String topic) throws Exception { + return createTemplate(topic, false); + } + + public ReplyingKafkaTemplate createTemplate(String topic, boolean badDeser) + throws Exception { + ContainerProperties containerProperties = new ContainerProperties(topic); final CountDownLatch latch = new CountDownLatch(1); containerProperties.setConsumerRebalanceListener(new ConsumerRebalanceListener() { @@ -442,9 +474,12 @@ public void onPartitionsAssigned(Collection partitions) { } }); - Map consumerProps = KafkaTestUtils.consumerProps(this.testName, "false", - embeddedKafka); + Map consumerProps = KafkaTestUtils.consumerProps(this.testName, "false", embeddedKafka); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + if (badDeser) { + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class); + consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, BadDeser.class); + } DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProperties); @@ -460,7 +495,6 @@ public void onPartitionsAssigned(Collection partitions) { } public ReplyingKafkaTemplate createTemplate(TopicPartitionOffset topic) { - ContainerProperties containerProperties = new ContainerProperties(topic); Map consumerProps = KafkaTestUtils.consumerProps(this.testName, "false", embeddedKafka); @@ -629,6 +663,12 @@ public void gListener(Message in) { in.getHeaders().get("custom.correlation.id", byte[].class))); template().send(record); } + @KafkaListener(id = J_REQUEST, topics = J_REQUEST) + @SendTo // default REPLY_TOPIC header + public String handleJ(String in) throws InterruptedException { + return in.toUpperCase(); + } + } @KafkaListener(topics = C_REQUEST, groupId = C_REQUEST) @@ -647,4 +687,18 @@ public Message listen1(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] re } + public static class BadDeser implements Deserializer { + + @Override + public Object deserialize(String topic, byte[] data) { + return null; + } + + @Override + public Object deserialize(String topic, Headers headers, byte[] data) { + throw new IllegalStateException("test reply deserialization failure"); + } + + } + } diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index 4f65a8de2e..4863e2dc5e 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -411,6 +411,9 @@ public class KRequestingApplication { Note that we can use Boot's auto-configured container factory to create the reply container. +If a non-trivial deserializer is being used for replies, consider using an <> that delegates to your configured deserializer. +When so configured, the `RequestReplyFuture` will be completed exceptionally and you can catch the `ExecutionException`, with the `DeserializationException` in its `cause` property. + The template sets a header (named `KafkaHeaders.CORRELATION_ID` by default), which must be echoed back by the server side. In this case, the following `@KafkaListener` application responds: @@ -554,6 +557,12 @@ IMPORTANT: The listener container for the replies MUST be configured with `AckMo To avoid any possibility of losing messages, the template only commits offsets when there are zero requests outstanding, i.e. when the last outstanding request is released by the release strategy. After a rebalance, it is possible for duplicate reply deliveries; these will be ignored for any in-flight requests; you may see error log messages when duplicate replies are received for already released replies. +NOTE: If you use an <> with this aggregating template, the framework will not automatically detect `DeserializationException` s. +Instead, the record (with a `null` value) will be returned intact, with the deserialization exception(s) in headers. +It is recommended that applications call the utility method `ReplyingKafkaTemplate.checkDeserialization()` method to determine if a deserialization exception occurred. +See its javadocs for more information. + +[[receiving-messages]] ==== Receiving Messages You can receive messages by configuring a `MessageListenerContainer` and providing a message listener or by using the `@KafkaListener` annotation. @@ -3105,7 +3114,7 @@ CAUTION: When you use a `BatchMessageListener`, you must provide a `failedDeseri Otherwise, the batch of records are not type safe. You can use the `DefaultKafkaConsumerFactory` constructor that takes key and value `Deserializer` objects and wire in appropriate `ErrorHandlingDeserializer2` instances that you have configured with the proper delegates. -Alternatively, you can use consumer configuration properties (which are used by the `ErrorHandlingDeserializer`) to instantiate the delegates. +Alternatively, you can use consumer configuration properties (which are used by the `ErrorHandlingDeserializer2`) to instantiate the delegates. The property names are `ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS` and `ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS`. The property value can be a class or class name. The following example shows how to set these properties: