Skip to content

Commit

Permalink
Delegating(De)Serializer Improvements
Browse files Browse the repository at this point in the history
Automatically handle "standard" types supported by the `Serdes`.
  • Loading branch information
garyrussell authored and artembilan committed May 21, 2020
1 parent 0f43e34 commit ae03a53
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,33 +18,45 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;

/**
* A {@link Deserializer} that delegates to other deserializers based on a serialization
* selector header.
* selector header. It is not necessary to configure standard deserializers supported by
* {@link Serdes}.
*
* @author Gary Russell
* @since 2.3
*
*/
public class DelegatingDeserializer implements Deserializer<Object> {

private static final LogAccessor LOGGER = new LogAccessor(DelegatingDeserializer.class);

/**
* Name of the configuration property containing the serialization selector map with
* format {@code selector:class,...}.
*/
public static final String SERIALIZATION_SELECTOR_CONFIG = DelegatingSerializer.SERIALIZATION_SELECTOR_CONFIG;


private final Map<String, Deserializer<?>> delegates = new HashMap<>();
private final Map<String, Deserializer<? extends Object>> delegates = new ConcurrentHashMap<>();

private final Map<String, Object> autoConfigs = new HashMap<>();

private boolean forKeys;

/**
* Construct an instance that will be configured in {@link #configure(Map, boolean)}
Expand All @@ -57,7 +69,8 @@ public DelegatingDeserializer() {
/**
* Construct an instance with the supplied mapping of selectors to delegate
* deserializers. The selector must be supplied in the
* {@link DelegatingSerializer#SERIALIZATION_SELECTOR} header.
* {@link DelegatingSerializer#SERIALIZATION_SELECTOR} header. It is not necessary to
* configure standard deserializers supported by {@link Serdes}.
* @param delegates the map of delegates.
*/
public DelegatingDeserializer(Map<String, Deserializer<?>> delegates) {
Expand All @@ -67,6 +80,8 @@ public DelegatingDeserializer(Map<String, Deserializer<?>> delegates) {
@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.autoConfigs.putAll(configs);
this.forKeys = isKey;
Object value = configs.get(SERIALIZATION_SELECTOR_CONFIG);
if (value == null) {
return;
Expand Down Expand Up @@ -151,13 +166,19 @@ public Object deserialize(String topic, byte[] data) {

@Override
public Object deserialize(String topic, Headers headers, byte[] data) {
byte[] value = headers.lastHeader(DelegatingSerializer.SERIALIZATION_SELECTOR).value();
byte[] value = null;
Header header = headers.lastHeader(DelegatingSerializer.SERIALIZATION_SELECTOR);
if (header != null) {
value = header.value();
}
if (value == null) {
throw new IllegalStateException("No '" + DelegatingSerializer.SERIALIZATION_SELECTOR + "' header present");
}
String selector = new String(value).replaceAll("\"", "");
@SuppressWarnings("unchecked")
Deserializer<Object> deserializer = (Deserializer<Object>) this.delegates.get(selector);
Deserializer<? extends Object> deserializer = this.delegates.get(selector);
if (deserializer == null) {
deserializer = trySerdes(selector);
}
if (deserializer == null) {
return data;
}
Expand All @@ -166,6 +187,25 @@ public Object deserialize(String topic, Headers headers, byte[] data) {
}
}

/*
* Package for testing.
*/
@Nullable
Deserializer<? extends Object> trySerdes(String key) {
try {
Class<?> clazz = ClassUtils.forName(key, ClassUtils.getDefaultClassLoader());
Serde<? extends Object> serdeFrom = Serdes.serdeFrom(clazz);
Deserializer<? extends Object> deserializer = serdeFrom.deserializer();
deserializer.configure(this.autoConfigs, this.forKeys);
this.delegates.put(key, deserializer);
return deserializer;
}
catch (IllegalStateException | ClassNotFoundException | LinkageError e) {
this.delegates.put(key, Serdes.serdeFrom(byte[].class).deserializer());
return null;
}
}

@Override
public void close() {
this.delegates.values().forEach(deser -> deser.close());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,25 +18,34 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;

import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;

/**
* A {@link Serializer} that delegates to other serializers based on a serialization
* selector header.
* selector header. If the header is missing, and the type is supported by {@link Serdes}
* we will delegate to that serializer type.
*
* @author Gary Russell
* @since 2.3
*
*/
public class DelegatingSerializer implements Serializer<Object> {

private static final LogAccessor LOGGER = new LogAccessor(DelegatingDeserializer.class);

/**
* Name of the header containing the serialization selector.
*/
Expand All @@ -48,7 +57,11 @@ public class DelegatingSerializer implements Serializer<Object> {
*/
public static final String SERIALIZATION_SELECTOR_CONFIG = "spring.kafka.serialization.selector.config";

private final Map<String, Serializer<?>> delegates = new HashMap<>();
private final Map<String, Serializer<?>> delegates = new ConcurrentHashMap<>();

private final Map<String, Object> autoConfigs = new HashMap<>();

private boolean forKeys;

/**
* Construct an instance that will be configured in {@link #configure(Map, boolean)}
Expand All @@ -61,7 +74,8 @@ public DelegatingSerializer() {
/**
* Construct an instance with the supplied mapping of selectors to delegate
* serializers. The selector must be supplied in the
* {@link DelegatingSerializer#SERIALIZATION_SELECTOR} header.
* {@link DelegatingSerializer#SERIALIZATION_SELECTOR} header. It is not necessary to
* configure standard serializers supported by {@link Serdes}.
* @param delegates the map of delegates.
*/
public DelegatingSerializer(Map<String, Serializer<?>> delegates) {
Expand All @@ -71,6 +85,8 @@ public DelegatingSerializer(Map<String, Serializer<?>> delegates) {
@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.autoConfigs.putAll(configs);
this.forKeys = isKey;
Object value = configs.get(SERIALIZATION_SELECTOR_CONFIG);
if (value == null) {
return;
Expand Down Expand Up @@ -156,9 +172,24 @@ public byte[] serialize(String topic, Object data) {

@Override
public byte[] serialize(String topic, Headers headers, Object data) {
byte[] value = headers.lastHeader(SERIALIZATION_SELECTOR).value();
byte[] value = null;
Header header = headers.lastHeader(SERIALIZATION_SELECTOR);
if (header != null) {
value = header.value();
}
if (value == null) {
throw new IllegalStateException("No '" + SERIALIZATION_SELECTOR + "' header present");
value = trySerdes(data);
if (value == null) {
throw new IllegalStateException("No '" + SERIALIZATION_SELECTOR
+ "' header present and type (" + data.getClass().getName()
+ ") is not supported by Serdes");
}
try {
headers.add(new RecordHeader(SERIALIZATION_SELECTOR, value));
}
catch (IllegalStateException e) {
LOGGER.debug(e, () -> "Could not set header for type " + data.getClass());
}
}
String selector = new String(value).replaceAll("\"", "");
@SuppressWarnings("unchecked")
Expand All @@ -170,6 +201,24 @@ public byte[] serialize(String topic, Headers headers, Object data) {
return serializer.serialize(topic, headers, data);
}

/*
* Package for testing.
*/
@Nullable
byte[] trySerdes(Object data) {
try {
Serde<? extends Object> serdeFrom = Serdes.serdeFrom(data.getClass());
Serializer<?> serializer = serdeFrom.serializer();
serializer.configure(this.autoConfigs, this.forKeys);
String key = data.getClass().getName();
this.delegates.put(key, serializer);
return key.getBytes();
}
catch (IllegalStateException e) {
return null;
}
}

@Override
public void close() {
this.delegates.values().forEach(ser -> ser.close());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,9 @@
package org.springframework.kafka.support.serializer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -93,6 +96,19 @@ private void doTest(DelegatingSerializer serializer, DelegatingDeserializer dese
assertThat(serialized).isEqualTo(new byte[] { 'b', 'a', 'r' });
assertThat(deserializer.deserialize("foo", headers, serialized)).isEqualTo("bar");

// implicit Serdes
headers.remove(DelegatingSerializer.SERIALIZATION_SELECTOR);
DelegatingSerializer spySe = spy(serializer);
serialized = spySe.serialize("foo", headers, 42L);
serialized = spySe.serialize("foo", headers, 42L);
verify(spySe, times(1)).trySerdes(42L);
assertThat(headers.lastHeader(DelegatingSerializer.SERIALIZATION_SELECTOR).value())
.isEqualTo(Long.class.getName().getBytes());
DelegatingDeserializer spyDe = spy(deserializer);
assertThat(spyDe.deserialize("foo", headers, serialized)).isEqualTo(42L);
spyDe.deserialize("foo", headers, serialized);
verify(spyDe, times(1)).trySerdes(Long.class.getName());

// The DKHM will jsonize the value; test that we ignore the quotes
MessageHeaders messageHeaders = new MessageHeaders(
Collections.singletonMap(DelegatingSerializer.SERIALIZATION_SELECTOR, "string"));
Expand All @@ -102,6 +118,18 @@ private void doTest(DelegatingSerializer serializer, DelegatingDeserializer dese
serialized = serializer.serialize("foo", headers, "bar");
assertThat(serialized).isEqualTo(new byte[] { 'b', 'a', 'r' });
assertThat(deserializer.deserialize("foo", headers, serialized)).isEqualTo("bar");

}

@Test
void testBadIncomingOnlyOnce() {
DelegatingDeserializer spy = spy(new DelegatingDeserializer());
Headers headers = new RecordHeaders();
headers.add(new RecordHeader(DelegatingSerializer.SERIALIZATION_SELECTOR, "junk".getBytes()));
byte[] data = "foo".getBytes();
assertThat(spy.deserialize("foo", headers, data)).isSameAs(data);
spy.deserialize("foo", headers, data);
verify(spy, times(1)).trySerdes("junk");
}

}
4 changes: 4 additions & 0 deletions src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3748,6 +3748,10 @@ Producers would then set the `DelegatingSerializer.SERIALIZATION_SELECTOR` heade

This technique supports sending different types to the same topic (or different topics).

NOTE: Starting with version 2.5.1, it is not necessary to set the header if the type (key or value) is one of the standard types supported by `Serdes` (`Long`, `Integer`, etc).
Instead, the serializer will set the header to the class name of the type.
It is not necessary to configure serializers or deserializers for these types, they will be created (once) dynamically.

For another technique to send different types to different topics, see <<routing-template>>.

[[retrying-deserialization]]
Expand Down
6 changes: 6 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ See <<string-serde>> for more information.
The `JsonDeserializer` now has more flexibility to determine the deserialization type.
See <<serdes-type-methods>> for more information.

[[x25-delegate-serde]]
==== Delegating Serializer/Deserializer

The `DelegatingSerializer` can now handle "standard" types, when the outbound record has no header.
See <<delegating-serialization>> for more information.

[[x25-testing]]
==== Testing Changes

Expand Down

0 comments on commit ae03a53

Please sign in to comment.