Skip to content

Commit

Permalink
ReplyingKTemplate Handle DeserializationException
Browse files Browse the repository at this point in the history
Handle `DeserializationException` via `ErrorHandlingDeserializer` in
`ReplyingKafkaTemplate`.

**I will do the backports after review/merge; I expect conflicts**
  • Loading branch information
garyrussell committed Jul 27, 2020
1 parent cd9bad4 commit aca09ba
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,18 @@
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.BatchMessageListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.GenericMessageListenerContainer;
import org.springframework.kafka.listener.ListenerUtils;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
Expand Down Expand Up @@ -92,8 +96,7 @@ public class ReplyingKafkaTemplate<K, V, R> extends KafkaTemplate<K, V> implemen

private boolean sharedReplyTopic;

private Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy =
ReplyingKafkaTemplate::defaultCorrelationIdStrategy;
private Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy = ReplyingKafkaTemplate::defaultCorrelationIdStrategy;

private String correlationHeaderName = KafkaHeaders.CORRELATION_ID;

Expand Down Expand Up @@ -207,8 +210,8 @@ public Collection<TopicPartition> getAssignedReplyTopicPartitions() {
}

/**
* Set to true when multiple templates are using the same topic for replies.
* This simply changes logs for unexpected replies to debug instead of error.
* Set to true when multiple templates are using the same topic for replies. This
* simply changes logs for unexpected replies to debug instead of error.
* @param sharedReplyTopic true if using a shared topic.
* @since 2.2
*/
Expand Down Expand Up @@ -404,13 +407,54 @@ public void onMessage(List<ConsumerRecord<K, R>> data) {
logLateArrival(record, correlationId);
}
else {
this.logger.debug(() -> "Received: " + record + WITH_CORRELATION_ID + correlationKey);
future.set(record);
boolean ok = true;
if (record.value() == null) {
DeserializationException de = checkDeserialization(record, this.logger);
if (de != null) {
ok = false;
future.setException(de);
}
}
if (ok) {
this.logger.debug(() -> "Received: " + record + WITH_CORRELATION_ID + correlationKey);
future.set(record);
}
}
}
});
}

/**
* Return a {@link DeserializationException} if either the key or value failed
* deserialization; null otherwise. If you need to determine whether it was the key or
* value, call
* {@link ListenerUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)}
* with {@link ErrorHandlingDeserializer#KEY_DESERIALIZER_EXCEPTION_HEADER} and
* {@link ErrorHandlingDeserializer#VALUE_DESERIALIZER_EXCEPTION_HEADER} instead.
* @param record the record.
* @param logger a {@link LogAccessor}.
* @return the {@link DeserializationException} or {@code null}.
* @since 2.2.15
*/
@Nullable
public static DeserializationException checkDeserialization(ConsumerRecord<?, ?> record, LogAccessor logger) {
DeserializationException exception = ListenerUtils.getExceptionFromHeader(record,
ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER, logger);
if (exception != null) {
logger.error(exception, () -> "Reply value deserialization failed for " + record.topic() + "-"
+ record.partition() + "@" + record.offset());
return exception;
}
exception = ListenerUtils.getExceptionFromHeader(record,
ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER, logger);
if (exception != null) {
logger.error(exception, () -> "Reply key deserialization failed for " + record.topic() + "-"
+ record.partition() + "@" + record.offset());
return exception;
}
return null;
}

protected void logLateArrival(ConsumerRecord<K, R> record, CorrelationKey correlationId) {
if (this.sharedReplyTopic) {
this.logger.debug(() -> missingCorrelationLogMessage(record, correlationId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
Expand All @@ -51,6 +52,7 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -78,6 +80,8 @@
import org.springframework.kafka.support.SimpleKafkaHeaderMapper;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
Expand All @@ -104,7 +108,8 @@
ReplyingKafkaTemplateTests.F_REPLY, ReplyingKafkaTemplateTests.F_REQUEST,
ReplyingKafkaTemplateTests.G_REPLY, ReplyingKafkaTemplateTests.G_REQUEST,
ReplyingKafkaTemplateTests.H_REPLY, ReplyingKafkaTemplateTests.H_REQUEST,
ReplyingKafkaTemplateTests.I_REPLY, ReplyingKafkaTemplateTests.I_REQUEST })
ReplyingKafkaTemplateTests.I_REPLY, ReplyingKafkaTemplateTests.I_REQUEST,
ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST })
public class ReplyingKafkaTemplateTests {

public static final String A_REPLY = "aReply";
Expand Down Expand Up @@ -143,6 +148,10 @@ public class ReplyingKafkaTemplateTests {

public static final String I_REQUEST = "iRequest";

public static final String J_REPLY = "jReply";

public static final String J_REQUEST = "jRequest";

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

Expand Down Expand Up @@ -189,6 +198,25 @@ public void testGood() throws Exception {
}
}

@Test
public void testBadDeserialize() throws Exception {
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(J_REPLY, true);
try {
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
Headers headers = new RecordHeaders();
headers.add("baz", "buz".getBytes());
ProducerRecord<Integer, String> record = new ProducerRecord<>(J_REQUEST, null, null, null, "foo", headers);
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
assertThatExceptionOfType(ExecutionException.class).isThrownBy(() -> future.get(10, TimeUnit.SECONDS))
.withCauseExactlyInstanceOf(DeserializationException.class);
}
finally {
template.stop();
template.destroy();
}
}

@Test
public void testMultiListenerMessageReturn() throws Exception {
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(C_REPLY);
Expand Down Expand Up @@ -477,6 +505,12 @@ public void testAggregateOrphansNotStored() throws Exception {
}

public ReplyingKafkaTemplate<Integer, String, String> createTemplate(String topic) throws Exception {
return createTemplate(topic, false);
}

public ReplyingKafkaTemplate<Integer, String, String> createTemplate(String topic, boolean badDeser)
throws Exception {

ContainerProperties containerProperties = new ContainerProperties(topic);
final CountDownLatch latch = new CountDownLatch(1);
containerProperties.setConsumerRebalanceListener(new ConsumerRebalanceListener() {
Expand All @@ -493,6 +527,10 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

});
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.testName, "false", embeddedKafka);
if (badDeser) {
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, BadDeser.class);
}
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
containerProperties);
Expand All @@ -508,7 +546,6 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}

public ReplyingKafkaTemplate<Integer, String, String> createTemplate(TopicPartitionOffset topic) {

ContainerProperties containerProperties = new ContainerProperties(topic);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.testName, "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Expand Down Expand Up @@ -687,6 +724,12 @@ public Message<?> messageReturn(String in) {
.build();
}

@KafkaListener(id = J_REQUEST, topics = J_REQUEST)
@SendTo // default REPLY_TOPIC header
public String handleJ(String in) throws InterruptedException {
return in.toUpperCase();
}

}

@KafkaListener(topics = C_REQUEST, groupId = C_REQUEST)
Expand Down Expand Up @@ -717,4 +760,18 @@ public String listen1(String in) {

}

public static class BadDeser implements Deserializer<Object> {

@Override
public Object deserialize(String topic, byte[] data) {
return null;
}

@Override
public Object deserialize(String topic, Headers headers, byte[] data) {
throw new IllegalStateException("test reply deserialization failure");
}

}

}
8 changes: 8 additions & 0 deletions src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,9 @@ public class KRequestingApplication {

Note that we can use Boot's auto-configured container factory to create the reply container.

If a non-trivial deserializer is being used for replies, consider using an <<error-handling-deserializer,`ErrorHandlingDeserializer`>> that delegates to your configured deserializer.
When so configured, the `RequestReplyFuture` will be completed exceptionally and you can catch the `ExecutionException`, with the `DeserializationException` in its `cause` property.

The template sets a header (named `KafkaHeaders.CORRELATION_ID` by default), which must be echoed back by the server side.

In this case, the following `@KafkaListener` application responds:
Expand Down Expand Up @@ -771,6 +774,11 @@ IMPORTANT: The listener container for the replies MUST be configured with `AckMo
To avoid any possibility of losing messages, the template only commits offsets when there are zero requests outstanding, i.e. when the last outstanding request is released by the release strategy.
After a rebalance, it is possible for duplicate reply deliveries; these will be ignored for any in-flight requests; you may see error log messages when duplicate replies are received for already released replies.

NOTE: If you use an <<error-handling-deserializer,`ErrorHandlingDeserializer`>> with this aggregating template, the framework will not automatically detect `DeserializationException` s.
Instead, the record (with a `null` value) will be returned intact, with the deserialization exception(s) in headers.
It is recommended that applications call the utility method `ReplyingKafkaTemplate.checkDeserialization()` method to determine if a deserialization exception occurred.
See its javadocs for more information.

[[receiving-messages]]
==== Receiving Messages

Expand Down

0 comments on commit aca09ba

Please sign in to comment.