-
Notifications
You must be signed in to change notification settings - Fork 640
Closed
Description
With the following reproducer application:
@Component
class MessagePublisher(
private val streamBridge: StreamBridge
) {
companion object {
const val BINDING_NAME = "messages-out"
}
fun publishMessage(message: String) {
streamBridge.send(BINDING_NAME,
MessageBuilder.withPayload(message)
//.setHeader(CUSTOM_HEADER, "Custom header by publisher")
.build())
}
}
@Component
class MessageMiddleware: (Message<ByteArray>) -> List<Message<String>> {
override fun invoke(p1: Message<ByteArray>): List<Message<String>> {
return listOf(
MessageBuilder.withPayload(String(p1.payload))
.setHeader(CUSTOM_HEADER, "Custom header by middleware")
.build()
)
}
}
@Component
class MessageConsumer: Consumer<Message<ByteArray>> {
override fun accept(kafkaMessage: Message<ByteArray>) {
println(kafkaMessage.headers[CUSTOM_HEADER])
}
}
The messages produced by the MessageMiddleware are missing the custom header in the Kafka topic and thus in the consumer. On the other hand, the messages sent by the MessagePublisher contain the custom headers correctly, if set.
The complete reproducer with the bindings configuration is available on: https://github.com/airamhr9/spring-cloud-kafka-header-reproducer