diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index 9fc484dced..ca580f4bc5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -125,6 +125,7 @@ public RecordMessageConverter getRecordMessageConverter() { @Override public Message toMessage(List> records, Acknowledgment acknowledgment, Consumer consumer, Type type) { + KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp); @@ -167,9 +168,11 @@ public Message toMessage(List> records, Acknowledgment a topics.add(record.topic()); partitions.add(record.partition()); offsets.add(record.offset()); - timestampTypes.add(record.timestampType().name()); + if (record.timestampType() != null) { + timestampTypes.add(record.timestampType().name()); + } timestamps.add(record.timestamp()); - if (this.headerMapper != null) { + if (this.headerMapper != null && record.headers() != null) { Map converted = new HashMap<>(); this.headerMapper.toHeaders(record.headers(), converted); convertedHeaders.add(converted); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java index 6822776d8b..066329057c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java @@ -16,6 +16,14 @@ package org.springframework.kafka.support.converter; +import java.util.Map; + +import org.apache.kafka.clients.consumer.Consumer; + +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.lang.Nullable; + /** * A top level interface for message converters. * @@ -25,4 +33,23 @@ */ public interface MessageConverter { + default void commonHeaders(Acknowledgment acknowledgment, Consumer consumer, Map rawHeaders, + Object theKey, Object topic, Object partition, Object offset, + @Nullable Object timestampType, Object timestamp) { + + rawHeaders.put(KafkaHeaders.RECEIVED_MESSAGE_KEY, theKey); + rawHeaders.put(KafkaHeaders.RECEIVED_TOPIC, topic); + rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION_ID, partition); + rawHeaders.put(KafkaHeaders.OFFSET, offset); + rawHeaders.put(KafkaHeaders.TIMESTAMP_TYPE, timestampType); + rawHeaders.put(KafkaHeaders.RECEIVED_TIMESTAMP, timestamp); + + if (acknowledgment != null) { + rawHeaders.put(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment); + } + if (consumer != null) { + rawHeaders.put(KafkaHeaders.CONSUMER, consumer); + } + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java index c17254b6ad..00a85c7f3b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java @@ -101,11 +101,12 @@ public void setHeaderMapper(KafkaHeaderMapper headerMapper) { @Override public Message toMessage(ConsumerRecord record, Acknowledgment acknowledgment, Consumer consumer, Type type) { + KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp); Map rawHeaders = kafkaMessageHeaders.getRawHeaders(); - if (this.headerMapper != null) { + if (this.headerMapper != null && record.headers() != null) { this.headerMapper.toHeaders(record.headers(), rawHeaders); } else { @@ -117,12 +118,9 @@ public Message toMessage(ConsumerRecord record, Acknowledgment acknowle } rawHeaders.put(KafkaHeaders.NATIVE_HEADERS, record.headers()); } - rawHeaders.put(KafkaHeaders.RECEIVED_MESSAGE_KEY, record.key()); - rawHeaders.put(KafkaHeaders.RECEIVED_TOPIC, record.topic()); - rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION_ID, record.partition()); - rawHeaders.put(KafkaHeaders.OFFSET, record.offset()); - rawHeaders.put(KafkaHeaders.TIMESTAMP_TYPE, record.timestampType().name()); - rawHeaders.put(KafkaHeaders.RECEIVED_TIMESTAMP, record.timestamp()); + String ttName = record.timestampType() != null ? record.timestampType().name() : null; + commonHeaders(acknowledgment, consumer, rawHeaders, record.key(), record.topic(), record.partition(), + record.offset(), ttName, record.timestamp()); if (acknowledgment != null) { rawHeaders.put(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java index 11a79dd839..dae78a0234 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -112,4 +113,18 @@ private MessageHeaders testGuts(BatchMessageConverter batchMessageConverter) { return headers; } + @SuppressWarnings("unchecked") + @Test + public void missingHeaders() { + BatchMessageConverter converter = new BatchMessagingMessageConverter(); + Headers nullHeaders = null; + ConsumerRecord record = new ConsumerRecord<>("foo", 1, 42, -1L, null, 0L, 0, 0, "bar", "baz", + nullHeaders); + List> records = Collections.singletonList(record); + Message message = converter.toMessage(records, null, null, null); + assertThat(((List) message.getPayload())).contains("baz"); + assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, List.class)).contains("foo"); + assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, List.class)).contains("bar"); + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/converter/MessagingMessageConverterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/converter/MessagingMessageConverterTests.java new file mode 100644 index 0000000000..643d9a140d --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/converter/MessagingMessageConverterTests.java @@ -0,0 +1,47 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.converter; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Headers; +import org.junit.jupiter.api.Test; + +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; + +/** + * @author Gary Russell + * @since 2.1.13 + * + */ +public class MessagingMessageConverterTests { + + @Test + void missingHeaders() { + MessagingMessageConverter converter = new MessagingMessageConverter(); + Headers nullHeaders = null; + ConsumerRecord record = new ConsumerRecord<>("foo", 1, 42, -1L, null, 0L, 0, 0, "bar", "baz", + nullHeaders); + Message message = converter.toMessage(record, null, null, null); + assertThat(message.getPayload()).isEqualTo("baz"); + assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo("foo"); + assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo("bar"); + } + +}