Skip to content

Commit

Permalink
GH-1125: Add FailedDeserializationInfo
Browse files Browse the repository at this point in the history
Fixes #1125

Main ideas:

    * It does not enforces one single function to be set,
    it simply makes the new one to take precedence.
    * When setting the function/supplier via configuration,
    one single configuration key/value pair is enough.
    It tries to fit in any of the two possible options, otherwise, raises errors.
    * I'd personally rename `private BiFunction<byte[], Headers, T> failedDeserializationFunction;`
     to `failedDeserializationBiFunction`, respecting the name at setter level.
     However i read about avoiding sugar refactoring as rule of thumb.

* Simplify and unify setFailedDeserializationFunction
& recoverFromSupplier methods from ErrorHandlingDeserializer2
* deprecated since was in the wrong place
* Javadocs for ErrorHandlingDeserializer2 and adocs syntax's amends.
* Correct FailedDeserializationInfo & ErrorHandlingDeserializer2 copyright years
* Correct FailedDeserializationInfo this invocation
& ErrorHandlingDeserializer2 java docs headers
*Polishing code style

**Cherry-pick to 2.2.x**

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer2.java
#	src/reference/asciidoc/whats-new.adoc
  • Loading branch information
viperey authored and artembilan committed Jun 27, 2019
1 parent 020dfc9 commit 5fae3a9
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.ObjectOutputStream;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
Expand All @@ -38,6 +39,7 @@
*
* @author Gary Russell
* @author Artem Bilan
* @author Victor Perez Rey
*
* @since 2.2
*
Expand Down Expand Up @@ -83,7 +85,7 @@ public class ErrorHandlingDeserializer2<T> implements ExtendedDeserializer<T> {

private boolean isKey;

private BiFunction<byte[], Headers, T> failedDeserializationFunction;
private Function<FailedDeserializationInfo, T> failedDeserializationFunction;

public ErrorHandlingDeserializer2() {
super();
Expand All @@ -93,7 +95,23 @@ public ErrorHandlingDeserializer2(Deserializer<T> delegate) {
this.delegate = setupDelegate(delegate);
}

/**
* Provide an alternative supplying mechanism when deserialization fails.
* @param failedDeserializationFunction the {@link BiFunction} to use.
* @deprecated since 2.2.8 in favor of {@link #setFailedDeserializationFunction(Function)}.
*/
@Deprecated
public void setFailedDeserializationFunction(BiFunction<byte[], Headers, T> failedDeserializationFunction) {
setFailedDeserializationFunction((failed) ->
failedDeserializationFunction.apply(failed.getData(), failed.getHeaders()));
}

/**
* Provide an alternative supplying mechanism when deserialization fails.
* @param failedDeserializationFunction the {@link Function} to use.
* @since 2.2.8
*/
public void setFailedDeserializationFunction(Function<FailedDeserializationInfo, T> failedDeserializationFunction) {
this.failedDeserializationFunction = failedDeserializationFunction;
}

Expand Down Expand Up @@ -159,9 +177,9 @@ private void setupFunction(Map<String, ?> configs, String configKey) {
try {
Object value = configs.get(configKey);
Class<?> clazz = value instanceof Class ? (Class<?>) value : ClassUtils.forName((String) value, null);
Assert.isTrue(BiFunction.class.isAssignableFrom(clazz), "'function' must be a 'BiFunction ', not a "
Assert.isTrue(Function.class.isAssignableFrom(clazz), "'function' must be a 'Function ', not a "
+ clazz.getName());
this.failedDeserializationFunction = (BiFunction<byte[], Headers, T>) clazz.newInstance();
this.failedDeserializationFunction = (Function<FailedDeserializationInfo, T>) clazz.newInstance();
}
catch (ClassNotFoundException | LinkageError | InstantiationException | IllegalAccessException e) {
throw new IllegalStateException(e);
Expand All @@ -175,9 +193,7 @@ public T deserialize(String topic, byte[] data) {
return this.delegate.deserialize(topic, data);
}
catch (Exception e) {
return this.failedDeserializationFunction != null
? this.failedDeserializationFunction.apply(data, null)
: null;
return recoverFromSupplier(topic, null, data, e);
}
}

Expand All @@ -188,9 +204,18 @@ public T deserialize(String topic, Headers headers, byte[] data) {
}
catch (Exception e) {
deserializationException(headers, data, e);
return this.failedDeserializationFunction != null
? this.failedDeserializationFunction.apply(data, headers)
: null;
return recoverFromSupplier(topic, headers, data, e);
}
}

private T recoverFromSupplier(String topic, Headers headers, byte[] data, Exception exception) {
if (this.failedDeserializationFunction != null) {
FailedDeserializationInfo failedDeserializationInfo =
new FailedDeserializationInfo(topic, headers, data, this.isForKey, exception);
return this.failedDeserializationFunction.apply(failedDeserializationInfo);
}
else {
return null;
}
}

Expand All @@ -210,8 +235,8 @@ private void deserializationException(Headers headers, byte[] data, Exception e)
try (ObjectOutputStream oos = new ObjectOutputStream(stream)) {
exception = new DeserializationException("failed to deserialize",
data, this.isKey, new RuntimeException("Could not deserialize type "
+ e.getClass().getName() + " with message " + e.getMessage()
+ " failure: " + ex.getMessage()));
+ e.getClass().getName() + " with message " + e.getMessage()
+ " failure: " + ex.getMessage()));
oos.writeObject(exception);
}
catch (IOException ex2) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.support.serializer;

import java.util.Arrays;

import org.apache.kafka.common.header.Headers;

/**
* Class containing all the contextual information around a deserialization error.
*
* @author Victor Perez Rey
*
* @since 2.2.8
*/
public class FailedDeserializationInfo {

private final String topic;

private final Headers headers;

private final byte[] data;

private final boolean isForKey;

private final Exception exception;

/**
* Construct an instance with the contextual information.
* @param topic topic associated with the data.
* @param headers headers associated with the record; may be empty.
* @param data serialized bytes; may be null.
* @param isForKey true for a key deserializer, false otherwise.
* @param exception exception causing the deserialization error.
*/
public FailedDeserializationInfo(String topic, Headers headers, byte[] data, boolean isForKey,
Exception exception) {

this.topic = topic;
this.headers = headers;
this.data = data;
this.isForKey = isForKey;
this.exception = exception;
}

public String getTopic() {
return this.topic;
}

public Headers getHeaders() {
return this.headers;
}

public byte[] getData() {
return this.data;
}

public boolean isForKey() {
return this.isForKey;
}

public Exception getException() {
return this.exception;
}

@Override
public String toString() {
return "FailedDeserializationInfo{" +
"topic='" + this.topic + '\'' +
", headers=" + this.headers +
", data=" + Arrays.toString(this.data) +
", isForKey=" + this.isForKey +
", exception=" + this.exception +
'}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.ClassRule;
import org.junit.Test;
Expand All @@ -43,21 +42,21 @@
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.converter.BytesJsonMessageConverter;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2;
import org.springframework.kafka.support.serializer.FailedDeserializationInfo;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit4.SpringRunner;

/**
* @author Gary Russell
* @author Victor Perez Rey
*
* @since 2.1.1
*
*/
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@RunWith(SpringRunner.class)
@DirtiesContext
public class BatchListenerConversion2Tests {

Expand Down Expand Up @@ -117,8 +116,7 @@ public Map<String, Object> consumerConfigs() {

@Bean
public KafkaTemplate<Integer, String> template() {
KafkaTemplate<Integer, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
return kafkaTemplate;
return new KafkaTemplate<>(producerFactory());
}

@Bean
Expand Down Expand Up @@ -197,23 +195,23 @@ public String toString() {

public static class BadFoo extends Foo {

private final byte[] failedDecode;
private final FailedDeserializationInfo failedDeserializationInfo;

public BadFoo(byte[] failedDecode) {
this.failedDecode = failedDecode;
public BadFoo(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}

public byte[] getFailedDecode() {
return this.failedDecode;
public FailedDeserializationInfo getFailedDeserializationInfo() {
return failedDeserializationInfo;
}

}

public static class FailedFooProvider implements BiFunction<byte[], Headers, Foo> {
public static class FailedFooProvider implements Function<FailedDeserializationInfo, Foo> {

@Override
public Foo apply(byte[] t, Headers u) {
return new BadFoo(t);
public Foo apply(FailedDeserializationInfo failedDeserializationInfo) {
return new BadFoo(failedDeserializationInfo);
}

}
Expand Down
20 changes: 10 additions & 10 deletions src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2286,9 +2286,9 @@ If the delegate fails to deserialize the record content, the `ErrorHandlingDeser
When you use a record-level `MessageListener`, if the `ConsumerRecord` contains a `DeserializationException` header for either the key or value, the container's `ErrorHandler` is called with the failed `ConsumerRecord`.
The record is not passed to the listener.

Alternatively, you can configure the `ErrorHandlingDeserializer2` to create a custom value by providing a `failedDeserializationFunction`, which is a `BiConsumer<byte[], Headers, T>`.
Alternatively, you can configure the `ErrorHandlingDeserializer2` to create a custom value by providing a `failedDeserializationFunction`, which is a `Function<FailedDeserializationInfo, T>`.
This function is invoked to create an instance of `T`, which is passed to the listener in the usual fashion.
The raw record value and headers are provided to the function.
An object of type `FailedDeserializationInfo`, which contains all the contextual information is provided to the function.
You can find the `DeserializationException` (as a serialized Java object) in headers.
See the https://docs.spring.io/spring-kafka/api/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer2.html[Javadoc] for the `ErrorHandlingDeserializer2` for more information.

Expand Down Expand Up @@ -2323,23 +2323,23 @@ The following example uses a `failedDeserializationFunction`.
----
public class BadFoo extends Foo {
private final byte[] failedDecode;
private final FailedDeserializationInfo failedDeserializationInfo;
public BadFoo(byte[] failedDecode) {
this.failedDecode = failedDecode;
public BadFoo(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public byte[] getFailedDecode() {
return this.failedDecode;
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
public class FailedFooProvider implements BiFunction<byte[], Headers, Foo> {
public class FailedFooProvider implements Function<FailedDeserializationInfo, Foo> {
@Override
public Foo apply(byte[] t, Headers u) {
return new BadFoo(t);
public Foo apply(FailedDeserializationInfo info) {
return new BadFoo(info);
}
}
Expand Down

0 comments on commit 5fae3a9

Please sign in to comment.