diff --git a/src/main/java/org/springframework/integration/kafka/config/xml/KafkaParsingUtils.java b/src/main/java/org/springframework/integration/kafka/config/xml/KafkaParsingUtils.java
index 3f2b6d42..8c022f00 100644
--- a/src/main/java/org/springframework/integration/kafka/config/xml/KafkaParsingUtils.java
+++ b/src/main/java/org/springframework/integration/kafka/config/xml/KafkaParsingUtils.java
@@ -27,6 +27,7 @@
* Utilities to assist with parsing XML.
*
* @author Gary Russell
+ * @author Tom van den Berge
* @since 3.2
*
*/
@@ -81,6 +82,8 @@ public static void commonOutboundProperties(final Element element, final ParserC
if (timestampExpressionDef != null) {
builder.addPropertyValue("timestampExpression", timestampExpressionDef);
}
+
+ IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "header-mapper");
}
}
diff --git a/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java b/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java
index 6c4e10b2..38a86098 100644
--- a/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java
+++ b/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java
@@ -84,6 +84,7 @@
* @author Gary Russell
* @author Marius Bogoevici
* @author Biju Kunjummen
+ * @author Tom van den Berge
*
* @since 0.5
*/
@@ -186,6 +187,10 @@ public void setHeaderMapper(KafkaHeaderMapper headerMapper) {
this.headerMapper = headerMapper;
}
+ public KafkaHeaderMapper getHeaderMapper() {
+ return this.headerMapper;
+ }
+
public KafkaTemplate, ?> getKafkaTemplate() {
return this.kafkaTemplate;
}
diff --git a/src/main/resources/org/springframework/integration/kafka/config/spring-integration-kafka-3.2.xsd b/src/main/resources/org/springframework/integration/kafka/config/spring-integration-kafka-3.2.xsd
index 7b6080ac..5eaae947 100644
--- a/src/main/resources/org/springframework/integration/kafka/config/spring-integration-kafka-3.2.xsd
+++ b/src/main/resources/org/springframework/integration/kafka/config/spring-integration-kafka-3.2.xsd
@@ -31,6 +31,7 @@
+
@@ -126,6 +127,7 @@
]]>
+
@@ -640,4 +642,19 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundAdapterParserTests-context.xml b/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundAdapterParserTests-context.xml
index 8497e22c..51ecacff 100644
--- a/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundAdapterParserTests-context.xml
+++ b/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundAdapterParserTests-context.xml
@@ -23,6 +23,7 @@
error-message-strategy="ems"
send-failure-channel="failures"
send-success-channel="successes"
+ header-mapper="customHeaderMapper"
>
@@ -53,5 +54,7 @@
+
+
diff --git a/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundAdapterParserTests.java b/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundAdapterParserTests.java
index 12c4b3ed..a2fbcbbe 100644
--- a/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundAdapterParserTests.java
+++ b/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundAdapterParserTests.java
@@ -53,6 +53,7 @@
* @author Artem Bilan
* @author Gary Russell
* @author Biju Kunjummen
+ * @author Tom van den Berge
*
* @since 0.5
*/
@@ -84,6 +85,8 @@ public void testOutboundAdapterConfiguration() {
.isSameAs(this.appContext.getBean("failures"));
assertThat(TestUtils.getPropertyValue(messageHandler, "sendSuccessChannel"))
.isSameAs(this.appContext.getBean("successes"));
+ assertThat(TestUtils.getPropertyValue(messageHandler, "headerMapper"))
+ .isSameAs(this.appContext.getBean("customHeaderMapper"));
messageHandler
= this.appContext.getBean("kafkaOutboundChannelAdapter2.handler", KafkaProducerMessageHandler.class);
@@ -94,7 +97,6 @@ public void testOutboundAdapterConfiguration() {
assertThat(TestUtils.getPropertyValue(messageHandler, "sendTimeoutExpression.literalValue")).isEqualTo("500");
}
-
@Test
public void testSyncMode() {
MockProducer mockProducer =
diff --git a/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests-context.xml b/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests-context.xml
index 74fe603b..31123d48 100644
--- a/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests-context.xml
+++ b/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests-context.xml
@@ -23,7 +23,9 @@
send-timeout-expression="44"
sync="true"
timestamp-expression="T(System).currentTimeMillis()"
- topic-expression="'topic'"/>
+ topic-expression="'topic'"
+ header-mapper="customHeaderMapper"
+ />
@@ -39,4 +41,5 @@
+
diff --git a/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests.java b/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests.java
index acee42b7..9804875e 100644
--- a/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests.java
+++ b/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests.java
@@ -31,6 +31,7 @@
/**
* @author Gary Russell
+ * @author Tom van den Berge
* @since 3.2
*
*/
@@ -66,6 +67,8 @@ public void testProps() {
.isSameAs(this.context.getBean("failures"));
assertThat(TestUtils.getPropertyValue(this.messageHandler, "sendSuccessChannel"))
.isSameAs(this.context.getBean("successes"));
+ assertThat(TestUtils.getPropertyValue(this.messageHandler, "headerMapper"))
+ .isSameAs(this.context.getBean("customHeaderMapper"));
}
public static class EMS extends DefaultErrorMessageStrategy {
diff --git a/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java b/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java
index 53c25dd3..c576919c 100644
--- a/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java
+++ b/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java
@@ -104,6 +104,7 @@
* @author Gary Russell
* @author Biju Kunjummen
* @author Artem Bilan
+ * @author Tom van den Berge
*
* @since 2.0
*/
@@ -329,6 +330,28 @@ protected ListenableFuture> doSend(
producerFactory.destroy();
}
+ @Test
+ public void testOutboundWithCustomHeaderMapper() throws Exception {
+ DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(
+ KafkaTestUtils.producerProps(embeddedKafka));
+ KafkaTemplate template = new KafkaTemplate<>(producerFactory);
+ KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler<>(template);
+ handler.setBeanFactory(mock(BeanFactory.class));
+ handler.setHeaderMapper(new DefaultKafkaHeaderMapper("!*"));
+ handler.afterPropertiesSet();
+
+ Message> message = MessageBuilder.withPayload("foo")
+ .setHeader(KafkaHeaders.TOPIC, topic1)
+ .setHeader("foo-header", "foo-header-value")
+ .build();
+ handler.handleMessage(message);
+
+ ConsumerRecord record = KafkaTestUtils.getSingleRecord(consumer, topic1);
+ assertThat(record.headers().toArray().length).isEqualTo(0);
+
+ producerFactory.destroy();
+ }
+
@Test
public void testOutboundGateway() throws Exception {
ConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(