Skip to content

Commit

Permalink
ReplyingKTemplate Handle DeserializationException
Browse files Browse the repository at this point in the history
Handle `DeserializationException` via `ErrorHandlingDeserializer` in
`ReplyingKafkaTemplate`.

**I will do the backports after review/merge; I expect conflicts**
  • Loading branch information
garyrussell committed Jul 27, 2020
1 parent ad7cad4 commit 3d1cd0a
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 11 deletions.
Expand Up @@ -37,14 +37,19 @@
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.BatchMessageListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.GenericMessageListenerContainer;
import org.springframework.kafka.listener.ListenerUtils;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
Expand Down Expand Up @@ -92,8 +97,7 @@ public class ReplyingKafkaTemplate<K, V, R> extends KafkaTemplate<K, V> implemen

private boolean sharedReplyTopic;

private Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy =
ReplyingKafkaTemplate::defaultCorrelationIdStrategy;
private Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy = ReplyingKafkaTemplate::defaultCorrelationIdStrategy;

private String correlationHeaderName = KafkaHeaders.CORRELATION_ID;

Expand Down Expand Up @@ -230,8 +234,8 @@ public Collection<TopicPartition> getAssignedReplyTopicPartitions() {
}

/**
* Set to true when multiple templates are using the same topic for replies.
* This simply changes logs for unexpected replies to debug instead of error.
* Set to true when multiple templates are using the same topic for replies. This
* simply changes logs for unexpected replies to debug instead of error.
* @param sharedReplyTopic true if using a shared topic.
* @since 2.2
*/
Expand Down Expand Up @@ -439,13 +443,54 @@ public void onMessage(List<ConsumerRecord<K, R>> data) {
logLateArrival(record, correlationId);
}
else {
this.logger.debug(() -> "Received: " + record + WITH_CORRELATION_ID + correlationKey);
future.set(record);
boolean ok = true;
if (record.value() == null) {
DeserializationException de = checkDeserialization(record, this.logger);
if (de != null) {
ok = false;
future.setException(de);
}
}
if (ok) {
this.logger.debug(() -> "Received: " + record + WITH_CORRELATION_ID + correlationKey);
future.set(record);
}
}
}
});
}

/**
* Return a {@link DeserializationException} if either the key or value failed
* deserialization; null otherwise. If you need to determine whether it was the key or
* value, call
* {@link ListenerUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)}
* with {@link ErrorHandlingDeserializer#KEY_DESERIALIZER_EXCEPTION_HEADER} and
* {@link ErrorHandlingDeserializer#VALUE_DESERIALIZER_EXCEPTION_HEADER} instead.
* @param record the record.
* @param logger a {@link LogAccessor}.
* @return the {@link DeserializationException} or {@code null}.
* @since 2.2.15
*/
@Nullable
public static DeserializationException checkDeserialization(ConsumerRecord<?, ?> record, LogAccessor logger) {
DeserializationException exception = ListenerUtils.getExceptionFromHeader(record,
ErrorHandlingDeserializer2.VALUE_DESERIALIZER_EXCEPTION_HEADER, logger);
if (exception != null) {
logger.error(exception, () -> "Reply value deserialization failed for " + record.topic() + "-"
+ record.partition() + "@" + record.offset());
return exception;
}
exception = ListenerUtils.getExceptionFromHeader(record,
ErrorHandlingDeserializer2.KEY_DESERIALIZER_EXCEPTION_HEADER, logger);
if (exception != null) {
logger.error(exception, () -> "Reply key deserialization failed for " + record.topic() + "-"
+ record.partition() + "@" + record.offset());
return exception;
}
return null;
}

protected void logLateArrival(ConsumerRecord<K, R> record, CorrelationKey correlationId) {
if (this.sharedReplyTopic) {
this.logger.debug(() -> missingCorrelationLogMessage(record, correlationId));
Expand Down
Expand Up @@ -51,6 +51,7 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -78,6 +79,8 @@
import org.springframework.kafka.support.SimpleKafkaHeaderMapper;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
Expand All @@ -102,7 +105,7 @@
ReplyingKafkaTemplateTests.D_REPLY, ReplyingKafkaTemplateTests.D_REQUEST,
ReplyingKafkaTemplateTests.E_REPLY, ReplyingKafkaTemplateTests.E_REQUEST,
ReplyingKafkaTemplateTests.F_REPLY, ReplyingKafkaTemplateTests.F_REQUEST,
ReplyingKafkaTemplateTests.G_REPLY, ReplyingKafkaTemplateTests.G_REQUEST })
ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST })
public class ReplyingKafkaTemplateTests {

public static final String A_REPLY = "aReply";
Expand Down Expand Up @@ -133,6 +136,10 @@ public class ReplyingKafkaTemplateTests {

public static final String G_REQUEST = "gRequest";

public static final String J_REPLY = "jReply";

public static final String J_REQUEST = "jRequest";

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

Expand Down Expand Up @@ -179,6 +186,25 @@ public void testGood() throws Exception {
}
}

@Test
public void testBadDeserialize() throws Exception {
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(J_REPLY, true);
try {
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
Headers headers = new RecordHeaders();
headers.add("baz", "buz".getBytes());
ProducerRecord<Integer, String> record = new ProducerRecord<>(J_REQUEST, null, null, null, "foo", headers);
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
assertThatExceptionOfType(ExecutionException.class).isThrownBy(() -> future.get(10, TimeUnit.SECONDS))
.withCauseExactlyInstanceOf(DeserializationException.class);
}
finally {
template.stop();
template.destroy();
}
}

@Test
public void testMultiListenerMessageReturn() throws Exception {
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(C_REPLY);
Expand Down Expand Up @@ -427,6 +453,12 @@ public void testAggregateOrphansNotStored() throws Exception {
}

public ReplyingKafkaTemplate<Integer, String, String> createTemplate(String topic) throws Exception {
return createTemplate(topic, false);
}

public ReplyingKafkaTemplate<Integer, String, String> createTemplate(String topic, boolean badDeser)
throws Exception {

ContainerProperties containerProperties = new ContainerProperties(topic);
final CountDownLatch latch = new CountDownLatch(1);
containerProperties.setConsumerRebalanceListener(new ConsumerRebalanceListener() {
Expand All @@ -442,9 +474,12 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}

});
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.testName, "false",
embeddedKafka);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.testName, "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
if (badDeser) {
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, BadDeser.class);
}
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
containerProperties);
Expand All @@ -460,7 +495,6 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}

public ReplyingKafkaTemplate<Integer, String, String> createTemplate(TopicPartitionOffset topic) {

ContainerProperties containerProperties = new ContainerProperties(topic);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.testName, "false",
embeddedKafka);
Expand Down Expand Up @@ -629,6 +663,12 @@ public void gListener(Message<String> in) {
in.getHeaders().get("custom.correlation.id", byte[].class)));
template().send(record);
}
@KafkaListener(id = J_REQUEST, topics = J_REQUEST)
@SendTo // default REPLY_TOPIC header
public String handleJ(String in) throws InterruptedException {
return in.toUpperCase();
}

}

@KafkaListener(topics = C_REQUEST, groupId = C_REQUEST)
Expand All @@ -647,4 +687,18 @@ public Message<?> listen1(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] re

}

public static class BadDeser implements Deserializer<Object> {

@Override
public Object deserialize(String topic, byte[] data) {
return null;
}

@Override
public Object deserialize(String topic, Headers headers, byte[] data) {
throw new IllegalStateException("test reply deserialization failure");
}

}

}
11 changes: 10 additions & 1 deletion src/reference/asciidoc/kafka.adoc
Expand Up @@ -411,6 +411,9 @@ public class KRequestingApplication {

Note that we can use Boot's auto-configured container factory to create the reply container.

If a non-trivial deserializer is being used for replies, consider using an <<error-handling-deserializer,`ErrorHandlingDeserializer`>> that delegates to your configured deserializer.
When so configured, the `RequestReplyFuture` will be completed exceptionally and you can catch the `ExecutionException`, with the `DeserializationException` in its `cause` property.

The template sets a header (named `KafkaHeaders.CORRELATION_ID` by default), which must be echoed back by the server side.

In this case, the following `@KafkaListener` application responds:
Expand Down Expand Up @@ -554,6 +557,12 @@ IMPORTANT: The listener container for the replies MUST be configured with `AckMo
To avoid any possibility of losing messages, the template only commits offsets when there are zero requests outstanding, i.e. when the last outstanding request is released by the release strategy.
After a rebalance, it is possible for duplicate reply deliveries; these will be ignored for any in-flight requests; you may see error log messages when duplicate replies are received for already released replies.

NOTE: If you use an <<error-handling-deserializer,`ErrorHandlingDeserializer`>> with this aggregating template, the framework will not automatically detect `DeserializationException` s.
Instead, the record (with a `null` value) will be returned intact, with the deserialization exception(s) in headers.
It is recommended that applications call the utility method `ReplyingKafkaTemplate.checkDeserialization()` method to determine if a deserialization exception occurred.
See its javadocs for more information.

[[receiving-messages]]
==== Receiving Messages

You can receive messages by configuring a `MessageListenerContainer` and providing a message listener or by using the `@KafkaListener` annotation.
Expand Down Expand Up @@ -3105,7 +3114,7 @@ CAUTION: When you use a `BatchMessageListener`, you must provide a `failedDeseri
Otherwise, the batch of records are not type safe.

You can use the `DefaultKafkaConsumerFactory` constructor that takes key and value `Deserializer` objects and wire in appropriate `ErrorHandlingDeserializer2` instances that you have configured with the proper delegates.
Alternatively, you can use consumer configuration properties (which are used by the `ErrorHandlingDeserializer`) to instantiate the delegates.
Alternatively, you can use consumer configuration properties (which are used by the `ErrorHandlingDeserializer2`) to instantiate the delegates.
The property names are `ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS` and `ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS`.
The property value can be a class or class name.
The following example shows how to set these properties:
Expand Down

0 comments on commit 3d1cd0a

Please sign in to comment.