Skip to content

Commit

Permalink
Ability to override deserializer in Kafka consumer
Browse files Browse the repository at this point in the history
 - When binding the consumer, the kafka consumer should not be set to use `ByteArrayDeserializer` for both key/value deserializer. Instead, they need to be used as the default values. Any extended consumer properties for key/value deserializer should override this default deserializer.

 - Add test

This resolves #55

Add tests for custom/native serialization

 - Test using built-in serialization without using kafka native serialization (ie. both the serializer/de-serializer are set to use ByteArraySe/Deserializer)
 - Test using custom serializer by explicitly setting value.deserializer for both Kafka producer/consumer properties
 - Test avro message conversion and Kafka Avro Serializer using Confluent Schema Registry
     - Given pre-released registry versions have some bugs that blocks the testing and `3.0.1` requires Kafka 0.10, this test is only in Kafka .10 binder

Update Schema based custom serializer/de-serializer tests

Fix import

Handle unbind during tests appropriately
  • Loading branch information
ilayaperumalg authored and mbogoevici committed Nov 15, 2016
1 parent 943a7ae commit 36c90aa
Show file tree
Hide file tree
Showing 7 changed files with 367 additions and 19 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -18,6 +18,7 @@ _site/
*.ipr
*.iws
.idea/*
*/.idea
.factorypath
dump.rdb
.apt_generated
Expand Down
25 changes: 25 additions & 0 deletions spring-cloud-stream-binder-kafka-0.10-test/pom.xml
Expand Up @@ -73,7 +73,32 @@
<artifactId>spring-cloud-stream-binder-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>1.1.1.BUILD-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>3.0.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>3.0.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>


</project>
Expand Up @@ -21,27 +21,45 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;

import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.eclipse.jetty.server.Server;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.Spy;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.RetryOperations;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.Assert.assertTrue;

/**
* Integration tests for the {@link KafkaMessageChannelBinder}.
*
Expand Down Expand Up @@ -164,4 +182,64 @@ private ConsumerFactory<byte[], byte[]> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(props, keyDecoder, valueDecoder);
}

@Test
@SuppressWarnings("unchecked")
public void testCustomAvroSerialization() throws Exception {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
Map<String, Object> schemaRegistryProps = new HashMap<>();
schemaRegistryProps.put("kafkastore.connection.url", configurationProperties.getZkConnectionString());
schemaRegistryProps.put("listeners", "http://0.0.0.0:8082");
schemaRegistryProps.put("port", "8082");
schemaRegistryProps.put("kafkastore.topic", "_schemas");
SchemaRegistryConfig config = new SchemaRegistryConfig(schemaRegistryProps);
SchemaRegistryRestApplication app = new SchemaRegistryRestApplication(config);
Server server = app.createServer();
server.start();
long endTime = System.currentTimeMillis() + 5000;
while(true) {
if (server.isRunning()) {
break;
}
else if (System.currentTimeMillis() > endTime) {
fail("Kafka Schema Registry Server failed to start");
}
}
User1 firstOutboundFoo = new User1();
String userName1 = "foo-name" + UUID.randomUUID().toString();
String favColor1 = "foo-color" + UUID.randomUUID().toString();
firstOutboundFoo.setName(userName1);
firstOutboundFoo.setFavoriteColor(favColor1);
Message<?> message = MessageBuilder.withPayload(firstOutboundFoo).build();
SubscribableChannel moduleOutputChannel = new DirectChannel();
String testTopicName = "existing" + System.currentTimeMillis();
invokeCreateTopic(zkUtils, testTopicName, 6, 1, new Properties());
configurationProperties.setAutoAddPartitions(true);
Binder binder = getBinder(configurationProperties);
QueueChannel moduleInputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.getExtension().getConfiguration().put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
producerProperties.getExtension().getConfiguration().put("schema.registry.url", "http://localhost:8082");
producerProperties.setUseNativeEncoding(true);
Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel, producerProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
consumerProperties.getExtension().getConfiguration().put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
consumerProperties.getExtension().getConfiguration().put("schema.registry.url", "http://localhost:8082");
Binding<MessageChannel> consumerBinding = binder.bindConsumer(testTopicName, "test", moduleInputChannel, consumerProperties);
// Let the consumer actually bind to the producer before sending a msg
binderBindUnbindLatency();
moduleOutputChannel.send(message);
Message<?> inbound = receive(moduleInputChannel);
assertThat(inbound).isNotNull();
assertTrue(message.getPayload() instanceof User1);
User1 receivedUser = (User1) message.getPayload();
assertThat(receivedUser.getName()).isEqualTo(userName1);
assertThat(receivedUser.getFavoriteColor()).isEqualTo(favColor1);
producerBinding.unbind();
consumerBinding.unbind();
}
}
@@ -0,0 +1,85 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.binder.kafka;

import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.reflect.Nullable;
import org.apache.avro.specific.SpecificRecordBase;

import org.springframework.core.io.ClassPathResource;

/**
* @author Marius Bogoevici
* @author Ilayaperumal Gopinathan
*/
public class User1 extends SpecificRecordBase {

@Nullable
private String name;

@Nullable
private String favoriteColor;

public String getName() {
return this.name;
}

public void setName(String name) {
this.name = name;
}

public String getFavoriteColor() {
return this.favoriteColor;
}

public void setFavoriteColor(String favoriteColor) {
this.favoriteColor = favoriteColor;
}

@Override
public Schema getSchema() {
try {
return new Schema.Parser().parse(new ClassPathResource("schemas/users_v1.schema").getInputStream());
}
catch (IOException e) {
throw new IllegalStateException(e);
}
}

@Override
public Object get(int i) {
if (i == 0) {
return getName().toString();
}
if (i == 1) {
return getFavoriteColor().toString();
}
return null;
}

@Override
public void put(int i, Object o) {
if (i == 0) {
setName((String) o);
}
if (i == 1) {
setFavoriteColor((String) o);
}
}
}
@@ -0,0 +1,8 @@
{"namespace": "org.springframework.cloud.stream.binder.kafka",
"type": "record",
"name": "User1",
"fields": [
{"name": "name", "type": "string"},
{"name": "favoriteColor", "type": "string"}
]
}
Expand Up @@ -38,7 +38,6 @@
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Utils;

import org.springframework.beans.factory.DisposableBean;
Expand Down Expand Up @@ -258,7 +257,6 @@ private DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(
String.valueOf(producerProperties.getExtension().getBatchTimeout()));
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
producerProperties.getExtension().getCompressionType().toString());

if (!ObjectUtils.isEmpty(producerProperties.getExtension().getConfiguration())) {
props.putAll(producerProperties.getExtension().getConfiguration());
}
Expand Down Expand Up @@ -303,29 +301,20 @@ protected MessageProducer createConsumerEndpoint(String name, String group, Coll
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
"DLQ support is not available for anonymous subscriptions");
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;

Map<String, Object> props = getConsumerConfig(anonymous, consumerGroup);
Deserializer<byte[]> valueDecoder = new ByteArrayDeserializer();
Deserializer<byte[]> keyDecoder = new ByteArrayDeserializer();

if (!ObjectUtils.isEmpty(properties.getExtension().getConfiguration())) {
props.putAll(properties.getExtension().getConfiguration());
}

ConsumerFactory<byte[], byte[]> consumerFactory = new DefaultKafkaConsumerFactory<>(props, keyDecoder,
valueDecoder);

ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
Collection<PartitionInfo> listenedPartitions = destination;
Assert.isTrue(!CollectionUtils.isEmpty(listenedPartitions), "A list of partitions must be provided");
final TopicPartitionInitialOffset[] topicPartitionInitialOffsets = getTopicPartitionInitialOffsets(
listenedPartitions);

final ContainerProperties containerProperties =
anonymous || properties.getExtension().isAutoRebalanceEnabled() ? new ContainerProperties(name)
: new ContainerProperties(topicPartitionInitialOffsets);

int concurrency = Math.min(properties.getConcurrency(), listenedPartitions.size());
final ConcurrentMessageListenerContainer<byte[], byte[]> messageListenerContainer =
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer =
new ConcurrentMessageListenerContainer(
consumerFactory, containerProperties) {

Expand All @@ -339,25 +328,20 @@ public void stop(Runnable callback) {
if (!properties.getExtension().isAutoCommitOffset()) {
messageListenerContainer.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
}

if (this.logger.isDebugEnabled()) {
this.logger.debug(
"Listened partitions: " + StringUtils.collectionToCommaDelimitedString(listenedPartitions));
}

if (this.logger.isDebugEnabled()) {
this.logger.debug(
"Listened partitions: " + StringUtils.collectionToCommaDelimitedString(listenedPartitions));
}

final KafkaMessageDrivenChannelAdapter<byte[], byte[]> kafkaMessageDrivenChannelAdapter =
final KafkaMessageDrivenChannelAdapter<?, ?> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(
messageListenerContainer);

kafkaMessageDrivenChannelAdapter.setBeanFactory(this.getBeanFactory());
final RetryTemplate retryTemplate = buildRetryTemplate(properties);
kafkaMessageDrivenChannelAdapter.setRetryTemplate(retryTemplate);

if (properties.getExtension().isEnableDlq()) {
final String dlqTopic = "error." + name + "." + group;
initDlqProducer();
Expand Down Expand Up @@ -400,6 +384,8 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {

private Map<String, Object> getConsumerConfig(boolean anonymous, String consumerGroup) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
props.putAll(configurationProperties.getConfiguration());
}
Expand Down

0 comments on commit 36c90aa

Please sign in to comment.