Skip to content

Commit

Permalink
GH-1066: Protect against null Headers
Browse files Browse the repository at this point in the history
Resolves #1066

Certain clients (e.g. mapR) that emulate the Kafka clients do not properly
populate the `ConsumerRecord.headers()` field.

Check for null before mapping; also check `timestampType`.

**cherry-pick to 2.2.x, 2.1.x**

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java
#	spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java
  • Loading branch information
garyrussell authored and artembilan committed Apr 12, 2019
1 parent 04f4db3 commit 40b21c1
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 9 deletions.
Expand Up @@ -125,6 +125,7 @@ public RecordMessageConverter getRecordMessageConverter() {
@Override
public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type type) {

KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId,
this.generateTimestamp);

Expand Down Expand Up @@ -167,9 +168,11 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment a
topics.add(record.topic());
partitions.add(record.partition());
offsets.add(record.offset());
timestampTypes.add(record.timestampType().name());
if (record.timestampType() != null) {
timestampTypes.add(record.timestampType().name());
}
timestamps.add(record.timestamp());
if (this.headerMapper != null) {
if (this.headerMapper != null && record.headers() != null) {
Map<String, Object> converted = new HashMap<>();
this.headerMapper.toHeaders(record.headers(), converted);
convertedHeaders.add(converted);
Expand Down
Expand Up @@ -16,6 +16,14 @@

package org.springframework.kafka.support.converter;

import java.util.Map;

import org.apache.kafka.clients.consumer.Consumer;

import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.lang.Nullable;

/**
* A top level interface for message converters.
*
Expand All @@ -25,4 +33,23 @@
*/
public interface MessageConverter {

default void commonHeaders(Acknowledgment acknowledgment, Consumer<?, ?> consumer, Map<String, Object> rawHeaders,
Object theKey, Object topic, Object partition, Object offset,
@Nullable Object timestampType, Object timestamp) {

rawHeaders.put(KafkaHeaders.RECEIVED_MESSAGE_KEY, theKey);
rawHeaders.put(KafkaHeaders.RECEIVED_TOPIC, topic);
rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION_ID, partition);
rawHeaders.put(KafkaHeaders.OFFSET, offset);
rawHeaders.put(KafkaHeaders.TIMESTAMP_TYPE, timestampType);
rawHeaders.put(KafkaHeaders.RECEIVED_TIMESTAMP, timestamp);

if (acknowledgment != null) {
rawHeaders.put(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment);
}
if (consumer != null) {
rawHeaders.put(KafkaHeaders.CONSUMER, consumer);
}
}

}
Expand Up @@ -101,11 +101,12 @@ public void setHeaderMapper(KafkaHeaderMapper headerMapper) {
@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer,
Type type) {

KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId,
this.generateTimestamp);

Map<String, Object> rawHeaders = kafkaMessageHeaders.getRawHeaders();
if (this.headerMapper != null) {
if (this.headerMapper != null && record.headers() != null) {
this.headerMapper.toHeaders(record.headers(), rawHeaders);
}
else {
Expand All @@ -117,12 +118,9 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
}
rawHeaders.put(KafkaHeaders.NATIVE_HEADERS, record.headers());
}
rawHeaders.put(KafkaHeaders.RECEIVED_MESSAGE_KEY, record.key());
rawHeaders.put(KafkaHeaders.RECEIVED_TOPIC, record.topic());
rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION_ID, record.partition());
rawHeaders.put(KafkaHeaders.OFFSET, record.offset());
rawHeaders.put(KafkaHeaders.TIMESTAMP_TYPE, record.timestampType().name());
rawHeaders.put(KafkaHeaders.RECEIVED_TIMESTAMP, record.timestamp());
String ttName = record.timestampType() != null ? record.timestampType().name() : null;
commonHeaders(acknowledgment, consumer, rawHeaders, record.key(), record.topic(), record.partition(),
record.offset(), ttName, record.timestamp());

if (acknowledgment != null) {
rawHeaders.put(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment);
Expand Down
Expand Up @@ -21,6 +21,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -112,4 +113,18 @@ private MessageHeaders testGuts(BatchMessageConverter batchMessageConverter) {
return headers;
}

@SuppressWarnings("unchecked")
@Test
public void missingHeaders() {
BatchMessageConverter converter = new BatchMessagingMessageConverter();
Headers nullHeaders = null;
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 1, 42, -1L, null, 0L, 0, 0, "bar", "baz",
nullHeaders);
List<ConsumerRecord<?, ?>> records = Collections.singletonList(record);
Message<?> message = converter.toMessage(records, null, null, null);
assertThat(((List<String>) message.getPayload())).contains("baz");
assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, List.class)).contains("foo");
assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, List.class)).contains("bar");
}

}
@@ -0,0 +1,47 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.support.converter;

import static org.assertj.core.api.Assertions.assertThat;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;
import org.junit.jupiter.api.Test;

import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;

/**
* @author Gary Russell
* @since 2.1.13
*
*/
public class MessagingMessageConverterTests {

@Test
void missingHeaders() {
MessagingMessageConverter converter = new MessagingMessageConverter();
Headers nullHeaders = null;
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 1, 42, -1L, null, 0L, 0, 0, "bar", "baz",
nullHeaders);
Message<?> message = converter.toMessage(record, null, null, null);
assertThat(message.getPayload()).isEqualTo("baz");
assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo("foo");
assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo("bar");
}

}

0 comments on commit 40b21c1

Please sign in to comment.