Skip to content
This repository has been archived by the owner on Mar 30, 2023. It is now read-only.

Commit

Permalink
Add XML attribute for header-mapper
Browse files Browse the repository at this point in the history
* Added support for header-mapper to outbound-channel-adapter and
outbound-gateway XML

* Follow up on review

* Corrected mistake

* Corrected indentation
  • Loading branch information
tomvandenberge authored and artembilan committed Sep 30, 2019
1 parent 1b55b07 commit 7e38788
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* Utilities to assist with parsing XML.
*
* @author Gary Russell
* @author Tom van den Berge
* @since 3.2
*
*/
Expand Down Expand Up @@ -81,6 +82,8 @@ public static void commonOutboundProperties(final Element element, final ParserC
if (timestampExpressionDef != null) {
builder.addPropertyValue("timestampExpression", timestampExpressionDef);
}

IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "header-mapper");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
* @author Gary Russell
* @author Marius Bogoevici
* @author Biju Kunjummen
* @author Tom van den Berge
*
* @since 0.5
*/
Expand Down Expand Up @@ -186,6 +187,10 @@ public void setHeaderMapper(KafkaHeaderMapper headerMapper) {
this.headerMapper = headerMapper;
}

public KafkaHeaderMapper getHeaderMapper() {
return this.headerMapper;
}

public KafkaTemplate<?, ?> getKafkaTemplate() {
return this.kafkaTemplate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
<xsd:extension base="outboundType">
<xsd:attributeGroup ref="integration:channelAdapterAttributes"/>
<xsd:attributeGroup ref="kafkaTemplate"/>
<xsd:attributeGroup ref="headerMapper"/>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
Expand Down Expand Up @@ -126,6 +127,7 @@
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attributeGroup ref="headerMapper"/>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
Expand Down Expand Up @@ -640,4 +642,19 @@
</xsd:annotation>
</xsd:attribute>
</xsd:attributeGroup>

<xsd:attributeGroup name="headerMapper">
<xsd:attribute name="header-mapper" use="optional" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
Specifies the HeaderMapper used map Spring message headers to or from Kafka message headers.
]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="org.springframework.kafka.support.KafkaHeaderMapper" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
</xsd:attributeGroup>
</xsd:schema>
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
error-message-strategy="ems"
send-failure-channel="failures"
send-success-channel="successes"
header-mapper="customHeaderMapper"
>
<int-kafka:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice" />
Expand Down Expand Up @@ -53,5 +54,7 @@
<int:channel id="failures" />

<int:channel id="successes" />

<bean id="customHeaderMapper" class="org.springframework.kafka.support.DefaultKafkaHeaderMapper" />

</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
* @author Artem Bilan
* @author Gary Russell
* @author Biju Kunjummen
* @author Tom van den Berge
*
* @since 0.5
*/
Expand Down Expand Up @@ -84,6 +85,8 @@ public void testOutboundAdapterConfiguration() {
.isSameAs(this.appContext.getBean("failures"));
assertThat(TestUtils.getPropertyValue(messageHandler, "sendSuccessChannel"))
.isSameAs(this.appContext.getBean("successes"));
assertThat(TestUtils.getPropertyValue(messageHandler, "headerMapper"))
.isSameAs(this.appContext.getBean("customHeaderMapper"));

messageHandler
= this.appContext.getBean("kafkaOutboundChannelAdapter2.handler", KafkaProducerMessageHandler.class);
Expand All @@ -94,7 +97,6 @@ public void testOutboundAdapterConfiguration() {
assertThat(TestUtils.getPropertyValue(messageHandler, "sendTimeoutExpression.literalValue")).isEqualTo("500");
}


@Test
public void testSyncMode() {
MockProducer<Integer, String> mockProducer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
send-timeout-expression="44"
sync="true"
timestamp-expression="T(System).currentTimeMillis()"
topic-expression="'topic'"/>
topic-expression="'topic'"
header-mapper="customHeaderMapper"
/>

<int:channel id="requests"/>

Expand All @@ -39,4 +41,5 @@
<constructor-arg value="org.springframework.kafka.requestreply.ReplyingKafkaTemplate"/>
</bean>

<bean id="customHeaderMapper" class="org.springframework.kafka.support.DefaultKafkaHeaderMapper" />
</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

/**
* @author Gary Russell
* @author Tom van den Berge
* @since 3.2
*
*/
Expand Down Expand Up @@ -66,6 +67,8 @@ public void testProps() {
.isSameAs(this.context.getBean("failures"));
assertThat(TestUtils.getPropertyValue(this.messageHandler, "sendSuccessChannel"))
.isSameAs(this.context.getBean("successes"));
assertThat(TestUtils.getPropertyValue(this.messageHandler, "headerMapper"))
.isSameAs(this.context.getBean("customHeaderMapper"));
}

public static class EMS extends DefaultErrorMessageStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
* @author Gary Russell
* @author Biju Kunjummen
* @author Artem Bilan
* @author Tom van den Berge
*
* @since 2.0
*/
Expand Down Expand Up @@ -329,6 +330,28 @@ protected ListenableFuture<SendResult<Integer, String>> doSend(
producerFactory.destroy();
}

@Test
public void testOutboundWithCustomHeaderMapper() throws Exception {
DefaultKafkaProducerFactory<Integer, String> producerFactory = new DefaultKafkaProducerFactory<>(
KafkaTestUtils.producerProps(embeddedKafka));
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(producerFactory);
KafkaProducerMessageHandler<Integer, String> handler = new KafkaProducerMessageHandler<>(template);
handler.setBeanFactory(mock(BeanFactory.class));
handler.setHeaderMapper(new DefaultKafkaHeaderMapper("!*"));
handler.afterPropertiesSet();

Message<?> message = MessageBuilder.withPayload("foo")
.setHeader(KafkaHeaders.TOPIC, topic1)
.setHeader("foo-header", "foo-header-value")
.build();
handler.handleMessage(message);

ConsumerRecord<Integer, String> record = KafkaTestUtils.getSingleRecord(consumer, topic1);
assertThat(record.headers().toArray().length).isEqualTo(0);

producerFactory.destroy();
}

@Test
public void testOutboundGateway() throws Exception {
ConsumerFactory<Integer, String> consumerFactory = new DefaultKafkaConsumerFactory<>(
Expand Down

0 comments on commit 7e38788

Please sign in to comment.