From 36c90aa8c0acf1fe29e235b1d2aac05115fc4c90 Mon Sep 17 00:00:00 2001 From: Ilayaperumal Gopinathan Date: Tue, 11 Oct 2016 13:17:28 +0530 Subject: [PATCH] Ability to override deserializer in Kafka consumer - When binding the consumer, the kafka consumer should not be set to use `ByteArrayDeserializer` for both key/value deserializer. Instead, they need to be used as the default values. Any extended consumer properties for key/value deserializer should override this default deserializer. - Add test This resolves #55 Add tests for custom/native serialization - Test using built-in serialization without using kafka native serialization (ie. both the serializer/de-serializer are set to use ByteArraySe/Deserializer) - Test using custom serializer by explicitly setting value.deserializer for both Kafka producer/consumer properties - Test avro message conversion and Kafka Avro Serializer using Confluent Schema Registry - Given pre-released registry versions have some bugs that blocks the testing and `3.0.1` requires Kafka 0.10, this test is only in Kafka .10 binder Update Schema based custom serializer/de-serializer tests Fix import Handle unbind during tests appropriately --- .gitignore | 1 + .../pom.xml | 25 +++ .../binder/kafka/Kafka10BinderTests.java | 78 +++++++++ .../cloud/stream/binder/kafka/User1.java | 85 +++++++++ .../test/resources/schemas/users_v1.schema | 8 + .../kafka/KafkaMessageChannelBinder.java | 24 +-- .../stream/binder/kafka/KafkaBinderTests.java | 165 ++++++++++++++++++ 7 files changed, 367 insertions(+), 19 deletions(-) create mode 100644 spring-cloud-stream-binder-kafka-0.10-test/src/test/java/org/springframework/cloud/stream/binder/kafka/User1.java create mode 100644 spring-cloud-stream-binder-kafka-0.10-test/src/test/resources/schemas/users_v1.schema diff --git a/.gitignore b/.gitignore index 42bdaef55..d446a4952 100644 --- a/.gitignore +++ b/.gitignore @@ -18,6 +18,7 @@ _site/ *.ipr *.iws .idea/* +*/.idea .factorypath dump.rdb .apt_generated diff --git a/spring-cloud-stream-binder-kafka-0.10-test/pom.xml b/spring-cloud-stream-binder-kafka-0.10-test/pom.xml index 6dc252dea..3c98dbbfe 100644 --- a/spring-cloud-stream-binder-kafka-0.10-test/pom.xml +++ b/spring-cloud-stream-binder-kafka-0.10-test/pom.xml @@ -73,7 +73,32 @@ spring-cloud-stream-binder-test test + + org.springframework.cloud + spring-cloud-stream-schema + 1.1.1.BUILD-SNAPSHOT + test + + + io.confluent + kafka-avro-serializer + 3.0.1 + test + + + io.confluent + kafka-schema-registry + 3.0.1 + test + + + + confluent + http://packages.confluent.io/maven/ + + + diff --git a/spring-cloud-stream-binder-kafka-0.10-test/src/test/java/org/springframework/cloud/stream/binder/kafka/Kafka10BinderTests.java b/spring-cloud-stream-binder-kafka-0.10-test/src/test/java/org/springframework/cloud/stream/binder/kafka/Kafka10BinderTests.java index 0e6b31770..798fe6a8c 100644 --- a/spring-cloud-stream-binder-kafka-0.10-test/src/test/java/org/springframework/cloud/stream/binder/kafka/Kafka10BinderTests.java +++ b/spring-cloud-stream-binder-kafka-0.10-test/src/test/java/org/springframework/cloud/stream/binder/kafka/Kafka10BinderTests.java @@ -21,27 +21,45 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.UUID; +import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig; +import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.Deserializer; +import org.eclipse.jetty.server.Server; import org.junit.Before; import org.junit.ClassRule; +import org.junit.Test; import org.springframework.cloud.stream.binder.Binder; +import org.springframework.cloud.stream.binder.Binding; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.Spy; import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation; import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.channel.QueueChannel; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.test.core.BrokerAddress; import org.springframework.kafka.test.rule.KafkaEmbedded; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.messaging.support.MessageBuilder; import org.springframework.retry.RetryOperations; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.junit.Assert.assertTrue; + /** * Integration tests for the {@link KafkaMessageChannelBinder}. * @@ -164,4 +182,64 @@ private ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(props, keyDecoder, valueDecoder); } + @Test + @SuppressWarnings("unchecked") + public void testCustomAvroSerialization() throws Exception { + KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties(); + final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(), + configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(), + ZKStringSerializer$.MODULE$); + final ZkUtils zkUtils = new ZkUtils(zkClient, null, false); + Map schemaRegistryProps = new HashMap<>(); + schemaRegistryProps.put("kafkastore.connection.url", configurationProperties.getZkConnectionString()); + schemaRegistryProps.put("listeners", "http://0.0.0.0:8082"); + schemaRegistryProps.put("port", "8082"); + schemaRegistryProps.put("kafkastore.topic", "_schemas"); + SchemaRegistryConfig config = new SchemaRegistryConfig(schemaRegistryProps); + SchemaRegistryRestApplication app = new SchemaRegistryRestApplication(config); + Server server = app.createServer(); + server.start(); + long endTime = System.currentTimeMillis() + 5000; + while(true) { + if (server.isRunning()) { + break; + } + else if (System.currentTimeMillis() > endTime) { + fail("Kafka Schema Registry Server failed to start"); + } + } + User1 firstOutboundFoo = new User1(); + String userName1 = "foo-name" + UUID.randomUUID().toString(); + String favColor1 = "foo-color" + UUID.randomUUID().toString(); + firstOutboundFoo.setName(userName1); + firstOutboundFoo.setFavoriteColor(favColor1); + Message message = MessageBuilder.withPayload(firstOutboundFoo).build(); + SubscribableChannel moduleOutputChannel = new DirectChannel(); + String testTopicName = "existing" + System.currentTimeMillis(); + invokeCreateTopic(zkUtils, testTopicName, 6, 1, new Properties()); + configurationProperties.setAutoAddPartitions(true); + Binder binder = getBinder(configurationProperties); + QueueChannel moduleInputChannel = new QueueChannel(); + ExtendedProducerProperties producerProperties = createProducerProperties(); + producerProperties.getExtension().getConfiguration().put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); + producerProperties.getExtension().getConfiguration().put("schema.registry.url", "http://localhost:8082"); + producerProperties.setUseNativeEncoding(true); + Binding producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel, producerProperties); + ExtendedConsumerProperties consumerProperties = createConsumerProperties(); + consumerProperties.getExtension().setAutoRebalanceEnabled(false); + consumerProperties.getExtension().getConfiguration().put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); + consumerProperties.getExtension().getConfiguration().put("schema.registry.url", "http://localhost:8082"); + Binding consumerBinding = binder.bindConsumer(testTopicName, "test", moduleInputChannel, consumerProperties); + // Let the consumer actually bind to the producer before sending a msg + binderBindUnbindLatency(); + moduleOutputChannel.send(message); + Message inbound = receive(moduleInputChannel); + assertThat(inbound).isNotNull(); + assertTrue(message.getPayload() instanceof User1); + User1 receivedUser = (User1) message.getPayload(); + assertThat(receivedUser.getName()).isEqualTo(userName1); + assertThat(receivedUser.getFavoriteColor()).isEqualTo(favColor1); + producerBinding.unbind(); + consumerBinding.unbind(); + } } diff --git a/spring-cloud-stream-binder-kafka-0.10-test/src/test/java/org/springframework/cloud/stream/binder/kafka/User1.java b/spring-cloud-stream-binder-kafka-0.10-test/src/test/java/org/springframework/cloud/stream/binder/kafka/User1.java new file mode 100644 index 000000000..b5c45c9b3 --- /dev/null +++ b/spring-cloud-stream-binder-kafka-0.10-test/src/test/java/org/springframework/cloud/stream/binder/kafka/User1.java @@ -0,0 +1,85 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.kafka; + +import java.io.IOException; + +import org.apache.avro.Schema; +import org.apache.avro.reflect.Nullable; +import org.apache.avro.specific.SpecificRecordBase; + +import org.springframework.core.io.ClassPathResource; + +/** + * @author Marius Bogoevici + * @author Ilayaperumal Gopinathan + */ +public class User1 extends SpecificRecordBase { + + @Nullable + private String name; + + @Nullable + private String favoriteColor; + + public String getName() { + return this.name; + } + + public void setName(String name) { + this.name = name; + } + + public String getFavoriteColor() { + return this.favoriteColor; + } + + public void setFavoriteColor(String favoriteColor) { + this.favoriteColor = favoriteColor; + } + + @Override + public Schema getSchema() { + try { + return new Schema.Parser().parse(new ClassPathResource("schemas/users_v1.schema").getInputStream()); + } + catch (IOException e) { + throw new IllegalStateException(e); + } + } + + @Override + public Object get(int i) { + if (i == 0) { + return getName().toString(); + } + if (i == 1) { + return getFavoriteColor().toString(); + } + return null; + } + + @Override + public void put(int i, Object o) { + if (i == 0) { + setName((String) o); + } + if (i == 1) { + setFavoriteColor((String) o); + } + } +} diff --git a/spring-cloud-stream-binder-kafka-0.10-test/src/test/resources/schemas/users_v1.schema b/spring-cloud-stream-binder-kafka-0.10-test/src/test/resources/schemas/users_v1.schema new file mode 100644 index 000000000..366387be1 --- /dev/null +++ b/spring-cloud-stream-binder-kafka-0.10-test/src/test/resources/schemas/users_v1.schema @@ -0,0 +1,8 @@ +{"namespace": "org.springframework.cloud.stream.binder.kafka", + "type": "record", + "name": "User1", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favoriteColor", "type": "string"} + ] +} \ No newline at end of file diff --git a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java index f3098034b..62947b3d4 100644 --- a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java +++ b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java @@ -38,7 +38,6 @@ import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.Utils; import org.springframework.beans.factory.DisposableBean; @@ -258,7 +257,6 @@ private DefaultKafkaProducerFactory getProducerFactory( String.valueOf(producerProperties.getExtension().getBatchTimeout())); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, producerProperties.getExtension().getCompressionType().toString()); - if (!ObjectUtils.isEmpty(producerProperties.getExtension().getConfiguration())) { props.putAll(producerProperties.getExtension().getConfiguration()); } @@ -303,29 +301,20 @@ protected MessageProducer createConsumerEndpoint(String name, String group, Coll Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(), "DLQ support is not available for anonymous subscriptions"); String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group; - Map props = getConsumerConfig(anonymous, consumerGroup); - Deserializer valueDecoder = new ByteArrayDeserializer(); - Deserializer keyDecoder = new ByteArrayDeserializer(); - if (!ObjectUtils.isEmpty(properties.getExtension().getConfiguration())) { props.putAll(properties.getExtension().getConfiguration()); } - - ConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(props, keyDecoder, - valueDecoder); - + ConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(props); Collection listenedPartitions = destination; Assert.isTrue(!CollectionUtils.isEmpty(listenedPartitions), "A list of partitions must be provided"); final TopicPartitionInitialOffset[] topicPartitionInitialOffsets = getTopicPartitionInitialOffsets( listenedPartitions); - final ContainerProperties containerProperties = anonymous || properties.getExtension().isAutoRebalanceEnabled() ? new ContainerProperties(name) : new ContainerProperties(topicPartitionInitialOffsets); - int concurrency = Math.min(properties.getConcurrency(), listenedPartitions.size()); - final ConcurrentMessageListenerContainer messageListenerContainer = + final ConcurrentMessageListenerContainer messageListenerContainer = new ConcurrentMessageListenerContainer( consumerFactory, containerProperties) { @@ -339,25 +328,20 @@ public void stop(Runnable callback) { if (!properties.getExtension().isAutoCommitOffset()) { messageListenerContainer.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL); } - if (this.logger.isDebugEnabled()) { this.logger.debug( "Listened partitions: " + StringUtils.collectionToCommaDelimitedString(listenedPartitions)); } - if (this.logger.isDebugEnabled()) { this.logger.debug( "Listened partitions: " + StringUtils.collectionToCommaDelimitedString(listenedPartitions)); } - - final KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = + final KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>( messageListenerContainer); - kafkaMessageDrivenChannelAdapter.setBeanFactory(this.getBeanFactory()); final RetryTemplate retryTemplate = buildRetryTemplate(properties); kafkaMessageDrivenChannelAdapter.setRetryTemplate(retryTemplate); - if (properties.getExtension().isEnableDlq()) { final String dlqTopic = "error." + name + "." + group; initDlqProducer(); @@ -400,6 +384,8 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { private Map getConsumerConfig(boolean anonymous, String consumerGroup) { Map props = new HashMap<>(); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) { props.putAll(configurationProperties.getConfiguration()); } diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java index 7d6bcfabb..9f1ec8cf1 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java @@ -19,6 +19,7 @@ import java.util.Arrays; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -27,6 +28,10 @@ import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; import org.assertj.core.api.Condition; import org.junit.Ignore; import org.junit.Test; @@ -35,6 +40,7 @@ import org.springframework.cloud.stream.binder.Binder; import org.springframework.cloud.stream.binder.BinderException; import org.springframework.cloud.stream.binder.Binding; +import org.springframework.cloud.stream.binder.DefaultBinding; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.PartitionCapableBinderTests; @@ -46,7 +52,10 @@ import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter; import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.TopicPartitionInitialOffset; @@ -54,6 +63,7 @@ import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; +import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.support.GenericMessage; import org.springframework.messaging.support.MessageBuilder; import org.springframework.retry.RetryOperations; @@ -63,9 +73,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; +import static org.junit.Assert.assertTrue; /** * @author Soby Chacko + * @author Ilayaperumal Gopinathan */ public abstract class KafkaBinderTests extends PartitionCapableBinderTests, ExtendedProducerProperties> { @@ -1185,6 +1197,159 @@ public void testPartitionCountNotReduced() throws Exception { assertThat(partitionSize(testTopicName)).isEqualTo(6); } + @Test + @SuppressWarnings("unchecked") + public void testConsumerDefaultDeserializer() throws Exception { + Binding binding = null; + try { + KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties(); + final ZkUtils zkUtils = getZkUtils(configurationProperties); + String testTopicName = "existing" + System.currentTimeMillis(); + invokeCreateTopic(zkUtils, testTopicName, 5, 1, new Properties()); + configurationProperties.setAutoCreateTopics(false); + Binder binder = getBinder(configurationProperties); + DirectChannel output = new DirectChannel(); + ExtendedConsumerProperties consumerProperties = createConsumerProperties(); + binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties); + DirectFieldAccessor consumerAccessor = new DirectFieldAccessor(getKafkaConsumer(binding)); + assertTrue(consumerAccessor.getPropertyValue("keyDeserializer") instanceof ByteArrayDeserializer); + assertTrue(consumerAccessor.getPropertyValue("valueDeserializer") instanceof ByteArrayDeserializer); + } + finally { + if (binding != null) { + binding.unbind(); + } + } + } + + @Test + @SuppressWarnings("unchecked") + public void testConsumerCustomDeserializer() throws Exception { + Binding binding = null; + try { + KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties(); + Map propertiesToOverride = configurationProperties.getConfiguration(); + propertiesToOverride.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + propertiesToOverride.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer"); + configurationProperties.setConfiguration(propertiesToOverride); + final ZkUtils zkUtils = getZkUtils(configurationProperties); + String testTopicName = "existing" + System.currentTimeMillis(); + invokeCreateTopic(zkUtils, testTopicName, 5, 1, new Properties()); + configurationProperties.setAutoCreateTopics(false); + Binder binder = getBinder(configurationProperties); + DirectChannel output = new DirectChannel(); + ExtendedConsumerProperties consumerProperties = createConsumerProperties(); + binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties); + DirectFieldAccessor consumerAccessor = new DirectFieldAccessor(getKafkaConsumer(binding)); + assertTrue("Expected StringDeserializer as a custom key deserializer", consumerAccessor.getPropertyValue("keyDeserializer") instanceof StringDeserializer); + assertTrue("Expected LongDeserializer as a custom value deserializer", consumerAccessor.getPropertyValue("valueDeserializer") instanceof LongDeserializer); + } + finally { + if (binding != null) { + binding.unbind(); + } + } + } + + private KafkaConsumer getKafkaConsumer(Binding binding) { + DirectFieldAccessor bindingAccessor = new DirectFieldAccessor((DefaultBinding)binding); + KafkaMessageDrivenChannelAdapter adapter = (KafkaMessageDrivenChannelAdapter) bindingAccessor.getPropertyValue("endpoint"); + DirectFieldAccessor adapterAccessor = new DirectFieldAccessor(adapter); + ConcurrentMessageListenerContainer messageListenerContainer = (ConcurrentMessageListenerContainer) adapterAccessor.getPropertyValue("messageListenerContainer"); + DirectFieldAccessor containerAccessor = new DirectFieldAccessor((ConcurrentMessageListenerContainer)messageListenerContainer); + DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) containerAccessor.getPropertyValue("consumerFactory"); + return (KafkaConsumer) consumerFactory.createConsumer(); + } + + @Test + @SuppressWarnings("unchecked") + public void testNativeSerializationWithCustomSerializerDeserializer() throws Exception { + Binding producerBinding = null; + Binding consumerBinding = null; + try { + Integer testPayload = new Integer(10); + Message message = MessageBuilder.withPayload(testPayload).build(); + SubscribableChannel moduleOutputChannel = new DirectChannel(); + String testTopicName = "existing" + System.currentTimeMillis(); + KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties(); + final ZkClient zkClient; + zkClient = new ZkClient(configurationProperties.getZkConnectionString(), + configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(), + ZKStringSerializer$.MODULE$); + final ZkUtils zkUtils = new ZkUtils(zkClient, null, false); + invokeCreateTopic(zkUtils, testTopicName, 6, 1, new Properties()); + configurationProperties.setAutoAddPartitions(true); + Binder binder = getBinder(configurationProperties); + QueueChannel moduleInputChannel = new QueueChannel(); + ExtendedProducerProperties producerProperties = createProducerProperties(); + producerProperties.setUseNativeEncoding(true); + producerProperties.getExtension().getConfiguration().put("value.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); + producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel, producerProperties); + ExtendedConsumerProperties consumerProperties = createConsumerProperties(); + consumerProperties.getExtension().setAutoRebalanceEnabled(false); + consumerProperties.getExtension().getConfiguration().put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); + consumerBinding = binder.bindConsumer(testTopicName, "test", moduleInputChannel, consumerProperties); + // Let the consumer actually bind to the producer before sending a msg + binderBindUnbindLatency(); + moduleOutputChannel.send(message); + Message inbound = receive(moduleInputChannel, 500); + assertThat(inbound).isNotNull(); + assertThat(inbound.getPayload()).isEqualTo(10); + assertThat(inbound.getHeaders()).doesNotContainKey("contentType"); + } + finally { + if (producerBinding != null) { + producerBinding.unbind(); + } + if (consumerBinding != null) { + consumerBinding.unbind(); + } + } + } + + @Test + @SuppressWarnings("unchecked") + public void testBuiltinSerialization() throws Exception { + Binding producerBinding = null; + Binding consumerBinding = null; + try { + String testPayload = new String("test"); + Message message = MessageBuilder.withPayload(testPayload).build(); + SubscribableChannel moduleOutputChannel = new DirectChannel(); + String testTopicName = "existing" + System.currentTimeMillis(); + KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties(); + final ZkClient zkClient; + zkClient = new ZkClient(configurationProperties.getZkConnectionString(), + configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(), + ZKStringSerializer$.MODULE$); + final ZkUtils zkUtils = new ZkUtils(zkClient, null, false); + invokeCreateTopic(zkUtils, testTopicName, 6, 1, new Properties()); + configurationProperties.setAutoAddPartitions(true); + Binder binder = getBinder(configurationProperties); + QueueChannel moduleInputChannel = new QueueChannel(); + ExtendedProducerProperties producerProperties = createProducerProperties(); + producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel, producerProperties); + ExtendedConsumerProperties consumerProperties = createConsumerProperties(); + consumerProperties.getExtension().setAutoRebalanceEnabled(false); + consumerBinding = binder.bindConsumer(testTopicName, "test", moduleInputChannel, consumerProperties); + // Let the consumer actually bind to the producer before sending a msg + binderBindUnbindLatency(); + moduleOutputChannel.send(message); + Message inbound = receive(moduleInputChannel, 5); + assertThat(inbound).isNotNull(); + assertThat(inbound.getPayload()).isEqualTo("test"); + assertThat(inbound.getHeaders()).containsEntry("contentType", "text/plain"); + } + finally { + if (producerBinding != null) { + producerBinding.unbind(); + } + if (consumerBinding != null) { + consumerBinding.unbind(); + } + } + } + @Override protected void binderBindUnbindLatency() throws InterruptedException { Thread.sleep(500);