From 7e38788b661343a47b21d05b32c78b29d12ff4f9 Mon Sep 17 00:00:00 2001 From: tomvandenberge Date: Mon, 30 Sep 2019 16:23:37 +0200 Subject: [PATCH] Add XML attribute for header-mapper * Added support for header-mapper to outbound-channel-adapter and outbound-gateway XML * Follow up on review * Corrected mistake * Corrected indentation --- .../kafka/config/xml/KafkaParsingUtils.java | 3 +++ .../outbound/KafkaProducerMessageHandler.java | 5 ++++ .../config/spring-integration-kafka-3.2.xsd | 17 ++++++++++++++ ...afkaOutboundAdapterParserTests-context.xml | 3 +++ .../xml/KafkaOutboundAdapterParserTests.java | 4 +++- ...afkaOutboundGatewayParserTests-context.xml | 5 +++- .../xml/KafkaOutboundGatewayParserTests.java | 3 +++ .../KafkaProducerMessageHandlerTests.java | 23 +++++++++++++++++++ 8 files changed, 61 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/springframework/integration/kafka/config/xml/KafkaParsingUtils.java b/src/main/java/org/springframework/integration/kafka/config/xml/KafkaParsingUtils.java index 3f2b6d42..8c022f00 100644 --- a/src/main/java/org/springframework/integration/kafka/config/xml/KafkaParsingUtils.java +++ b/src/main/java/org/springframework/integration/kafka/config/xml/KafkaParsingUtils.java @@ -27,6 +27,7 @@ * Utilities to assist with parsing XML. * * @author Gary Russell + * @author Tom van den Berge * @since 3.2 * */ @@ -81,6 +82,8 @@ public static void commonOutboundProperties(final Element element, final ParserC if (timestampExpressionDef != null) { builder.addPropertyValue("timestampExpression", timestampExpressionDef); } + + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "header-mapper"); } } diff --git a/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java b/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java index 6c4e10b2..38a86098 100644 --- a/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java +++ b/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java @@ -84,6 +84,7 @@ * @author Gary Russell * @author Marius Bogoevici * @author Biju Kunjummen + * @author Tom van den Berge * * @since 0.5 */ @@ -186,6 +187,10 @@ public void setHeaderMapper(KafkaHeaderMapper headerMapper) { this.headerMapper = headerMapper; } + public KafkaHeaderMapper getHeaderMapper() { + return this.headerMapper; + } + public KafkaTemplate getKafkaTemplate() { return this.kafkaTemplate; } diff --git a/src/main/resources/org/springframework/integration/kafka/config/spring-integration-kafka-3.2.xsd b/src/main/resources/org/springframework/integration/kafka/config/spring-integration-kafka-3.2.xsd index 7b6080ac..5eaae947 100644 --- a/src/main/resources/org/springframework/integration/kafka/config/spring-integration-kafka-3.2.xsd +++ b/src/main/resources/org/springframework/integration/kafka/config/spring-integration-kafka-3.2.xsd @@ -31,6 +31,7 @@ + @@ -126,6 +127,7 @@ ]]> + @@ -640,4 +642,19 @@ + + + + + + + + + + + + + diff --git a/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundAdapterParserTests-context.xml b/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundAdapterParserTests-context.xml index 8497e22c..51ecacff 100644 --- a/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundAdapterParserTests-context.xml +++ b/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundAdapterParserTests-context.xml @@ -23,6 +23,7 @@ error-message-strategy="ems" send-failure-channel="failures" send-success-channel="successes" + header-mapper="customHeaderMapper" > @@ -53,5 +54,7 @@ + + diff --git a/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundAdapterParserTests.java b/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundAdapterParserTests.java index 12c4b3ed..a2fbcbbe 100644 --- a/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundAdapterParserTests.java +++ b/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundAdapterParserTests.java @@ -53,6 +53,7 @@ * @author Artem Bilan * @author Gary Russell * @author Biju Kunjummen + * @author Tom van den Berge * * @since 0.5 */ @@ -84,6 +85,8 @@ public void testOutboundAdapterConfiguration() { .isSameAs(this.appContext.getBean("failures")); assertThat(TestUtils.getPropertyValue(messageHandler, "sendSuccessChannel")) .isSameAs(this.appContext.getBean("successes")); + assertThat(TestUtils.getPropertyValue(messageHandler, "headerMapper")) + .isSameAs(this.appContext.getBean("customHeaderMapper")); messageHandler = this.appContext.getBean("kafkaOutboundChannelAdapter2.handler", KafkaProducerMessageHandler.class); @@ -94,7 +97,6 @@ public void testOutboundAdapterConfiguration() { assertThat(TestUtils.getPropertyValue(messageHandler, "sendTimeoutExpression.literalValue")).isEqualTo("500"); } - @Test public void testSyncMode() { MockProducer mockProducer = diff --git a/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests-context.xml b/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests-context.xml index 74fe603b..31123d48 100644 --- a/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests-context.xml +++ b/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests-context.xml @@ -23,7 +23,9 @@ send-timeout-expression="44" sync="true" timestamp-expression="T(System).currentTimeMillis()" - topic-expression="'topic'"/> + topic-expression="'topic'" + header-mapper="customHeaderMapper" + /> @@ -39,4 +41,5 @@ + diff --git a/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests.java b/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests.java index acee42b7..9804875e 100644 --- a/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests.java +++ b/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests.java @@ -31,6 +31,7 @@ /** * @author Gary Russell + * @author Tom van den Berge * @since 3.2 * */ @@ -66,6 +67,8 @@ public void testProps() { .isSameAs(this.context.getBean("failures")); assertThat(TestUtils.getPropertyValue(this.messageHandler, "sendSuccessChannel")) .isSameAs(this.context.getBean("successes")); + assertThat(TestUtils.getPropertyValue(this.messageHandler, "headerMapper")) + .isSameAs(this.context.getBean("customHeaderMapper")); } public static class EMS extends DefaultErrorMessageStrategy { diff --git a/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java b/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java index 53c25dd3..c576919c 100644 --- a/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java @@ -104,6 +104,7 @@ * @author Gary Russell * @author Biju Kunjummen * @author Artem Bilan + * @author Tom van den Berge * * @since 2.0 */ @@ -329,6 +330,28 @@ protected ListenableFuture> doSend( producerFactory.destroy(); } + @Test + public void testOutboundWithCustomHeaderMapper() throws Exception { + DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>( + KafkaTestUtils.producerProps(embeddedKafka)); + KafkaTemplate template = new KafkaTemplate<>(producerFactory); + KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler<>(template); + handler.setBeanFactory(mock(BeanFactory.class)); + handler.setHeaderMapper(new DefaultKafkaHeaderMapper("!*")); + handler.afterPropertiesSet(); + + Message message = MessageBuilder.withPayload("foo") + .setHeader(KafkaHeaders.TOPIC, topic1) + .setHeader("foo-header", "foo-header-value") + .build(); + handler.handleMessage(message); + + ConsumerRecord record = KafkaTestUtils.getSingleRecord(consumer, topic1); + assertThat(record.headers().toArray().length).isEqualTo(0); + + producerFactory.destroy(); + } + @Test public void testOutboundGateway() throws Exception { ConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(