From a067eaa288df66601ba69fa2acf12235d025c0bb Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 12 Jul 2023 13:22:50 -0400 Subject: [PATCH] GH-2736: Fix Possible Observation NPEs Resolves https://github.com/spring-projects/spring-kafka/issues/2736 --- .../kafka/support/micrometer/KafkaRecordReceiverContext.java | 4 ++-- .../kafka/support/micrometer/KafkaRecordSenderContext.java | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java index 016dd358f8..b3de789176 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,7 +40,7 @@ public class KafkaRecordReceiverContext extends ReceiverContext record, String listenerId, Supplier clusterId) { super((carrier, key) -> { Header header = carrier.headers().lastHeader(key); - if (header == null) { + if (header == null || header.value() == null) { return null; } return new String(header.value(), StandardCharsets.UTF_8); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java index c8cfd3aea9..ea6005b883 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,7 +37,8 @@ public class KafkaRecordSenderContext extends SenderContext private final ProducerRecord record; public KafkaRecordSenderContext(ProducerRecord record, String beanName, Supplier clusterId) { - super((carrier, key, value) -> record.headers().add(key, value.getBytes(StandardCharsets.UTF_8))); + super((carrier, key, value) -> record.headers().add(key, + value == null ? null : value.getBytes(StandardCharsets.UTF_8))); setCarrier(record); this.beanName = beanName; this.record = record;