From b154b89e27275304b16cecfb77635576a2ea1a7c Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 13 Mar 2025 16:04:08 -0400 Subject: [PATCH 1/3] GH-3786: Remove ProducerRecord duplicated traceparent header Fixes: #3786 Issue link: https://github.com/spring-projects/spring-kafka/issues/3786 When tracing is enabled, the KafkaRecordSenderContext was adding a new traceparent header without removing existing ones, resulting in multiple traceparent headers in the same record. This commit fixes the issue by Updating KafkaRecordSenderContext to remove existing traceparent headers before adding new ones. **Auto-cherry-pick to `3.3.x` & `3.2.x`** Signed-off-by: Soby Chacko --- .../micrometer/KafkaRecordSenderContext.java | 15 +++- .../support/micrometer/ObservationTests.java | 75 ++++++++++++++++++- 2 files changed, 85 insertions(+), 5 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java index 0d18699313..c7f4ab290c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2024 the original author or authors. + * Copyright 2022-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import io.micrometer.observation.transport.SenderContext; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; /** * {@link SenderContext} for {@link ProducerRecord}s. @@ -28,6 +29,7 @@ * @author Gary Russell * @author Christian Mergenthaler * @author Wang Zhiyang + * @author Soby Chacko * * @since 3.0 * @@ -39,8 +41,15 @@ public class KafkaRecordSenderContext extends SenderContext private final ProducerRecord record; public KafkaRecordSenderContext(ProducerRecord record, String beanName, Supplier clusterId) { - super((carrier, key, value) -> record.headers().add(key, - value == null ? null : value.getBytes(StandardCharsets.UTF_8))); + super((carrier, key, value) -> { + Headers headers = record.headers(); + // For traceparent context headers, ensure there's only one + if ("traceparent".equals(key)) { + headers.remove(key); + } + headers.add(key, value == null ? null : value.getBytes(StandardCharsets.UTF_8)); + }); + setCarrier(record); this.beanName = beanName; this.record = record; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index 99442dd2ba..30d28071c9 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -16,6 +16,7 @@ package org.springframework.kafka.support.micrometer; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Deque; import java.util.List; @@ -26,6 +27,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import io.micrometer.common.KeyValues; import io.micrometer.core.instrument.MeterRegistry; @@ -56,6 +59,7 @@ import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; @@ -78,6 +82,7 @@ import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.listener.RecordInterceptor; +import org.springframework.kafka.support.ProducerListener; import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention; import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention; import org.springframework.kafka.test.EmbeddedKafkaBroker; @@ -104,7 +109,7 @@ @SpringJUnitConfig @EmbeddedKafka(topics = { ObservationTests.OBSERVATION_TEST_1, ObservationTests.OBSERVATION_TEST_2, ObservationTests.OBSERVATION_TEST_3, ObservationTests.OBSERVATION_RUNTIME_EXCEPTION, - ObservationTests.OBSERVATION_ERROR }, partitions = 1) + ObservationTests.OBSERVATION_ERROR, ObservationTests.OBSERVATION_TRACEPARENT_DUPLICATE }, partitions = 1) @DirtiesContext public class ObservationTests { @@ -122,6 +127,8 @@ public class ObservationTests { public final static String OBSERVATION_ERROR_MONO = "observation.error.mono"; + public final static String OBSERVATION_TRACEPARENT_DUPLICATE = "observation.traceparent.duplicate"; + @Test void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate template, @Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler, @@ -449,6 +456,65 @@ void kafkaAdminNotRecreatedIfBootstrapServersSameInProducerAndAdminConfig( assertThat(template.getKafkaAdmin()).isSameAs(kafkaAdmin); } + // https://github.com/spring-cloud/spring-cloud-stream/issues/3095#issuecomment-2707075861 + // https://github.com/spring-projects/spring-kafka/issues/3786 + @Test + void verifyKafkaRecordSenderContextTraceParentHandling() { + String initialTraceParent = "traceparent-from-previous"; + String updatedTraceParent = "traceparent-current"; + ProducerRecord record = new ProducerRecord<>("test-topic", "test-value"); + record.headers().add("traceparent", initialTraceParent.getBytes(StandardCharsets.UTF_8)); + + // Create the context and update the traceparent + KafkaRecordSenderContext context = new KafkaRecordSenderContext( + record, + "test-bean", + () -> "test-cluster" + ); + context.getSetter().set(record, "traceparent", updatedTraceParent); + + Iterable
traceparentHeaders = record.headers().headers("traceparent"); + List headerValues = StreamSupport.stream(traceparentHeaders.spliterator(), false) + .map(header -> new String(header.value(), StandardCharsets.UTF_8)) + .collect(Collectors.toList()); + + // Verify there's only one traceparent header and it contains the updated value + assertThat(headerValues).containsExactly(updatedTraceParent); + } + + // https://github.com/spring-cloud/spring-cloud-stream/issues/3095#issuecomment-2707075861 + // https://github.com/spring-projects/spring-kafka/issues/3786 + @Test + void verifyTraceParentHeader(@Autowired KafkaTemplate template, + @Autowired SimpleTracer tracer) throws Exception { + CompletableFuture> producerRecordFuture = new CompletableFuture<>(); + template.setProducerListener(new ProducerListener<>() { + @Override + public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { + producerRecordFuture.complete(producerRecord); + } + }); + String initialTraceParent = "traceparent-from-previous"; + Header header = new RecordHeader("traceparent", initialTraceParent.getBytes(StandardCharsets.UTF_8)); + ProducerRecord producerRecord = new ProducerRecord<>( + OBSERVATION_TRACEPARENT_DUPLICATE, + null, null, null, + "test-value", + List.of(header) + ); + + template.send(producerRecord).get(10, TimeUnit.SECONDS); + ProducerRecord recordResult = producerRecordFuture.get(10, TimeUnit.SECONDS); + + Iterable
traceparentHeaders = recordResult.headers().headers("traceparent"); + assertThat(traceparentHeaders).hasSize(1); + + String traceparentValue = new String(traceparentHeaders.iterator().next().value(), StandardCharsets.UTF_8); + assertThat(traceparentValue).isEqualTo("traceparent-from-propagator"); + + tracer.getSpans().clear(); + } + @Configuration @EnableKafka public static class Config { @@ -598,6 +664,9 @@ public List fields() { public void inject(TraceContext context, @Nullable C carrier, Setter setter) { setter.set(carrier, "foo", "some foo value"); setter.set(carrier, "bar", "some bar value"); + + // Add a traceparent header to simulate W3C trace context + setter.set(carrier, "traceparent", "traceparent-from-propagator"); } // This is called on the consumer side when the message is consumed @@ -606,7 +675,9 @@ public void inject(TraceContext context, @Nullable C carrier, Setter sett public Span.Builder extract(C carrier, Getter getter) { String foo = getter.get(carrier, "foo"); String bar = getter.get(carrier, "bar"); - return tracer.spanBuilder().tag("foo", foo).tag("bar", bar); + return tracer.spanBuilder() + .tag("foo", foo) + .tag("bar", bar); } }; } From 1a6b2ecab27581a6a64e8058b507a2a1263a9014 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 13 Mar 2025 17:39:04 -0400 Subject: [PATCH 2/3] PR review --- .../support/micrometer/KafkaRecordSenderContext.java | 5 +++-- .../kafka/support/micrometer/ObservationTests.java | 8 ++------ 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java index c7f4ab290c..64af5739a9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java @@ -21,6 +21,7 @@ import io.micrometer.observation.transport.SenderContext; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; /** @@ -43,8 +44,8 @@ public class KafkaRecordSenderContext extends SenderContext public KafkaRecordSenderContext(ProducerRecord record, String beanName, Supplier clusterId) { super((carrier, key, value) -> { Headers headers = record.headers(); - // For traceparent context headers, ensure there's only one - if ("traceparent".equals(key)) { + Iterable
existingHeaders = headers.headers(key); + if (existingHeaders.iterator().hasNext()) { headers.remove(key); } headers.add(key, value == null ? null : value.getBytes(StandardCharsets.UTF_8)); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index 30d28071c9..10f32d2a45 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; import java.util.stream.StreamSupport; import io.micrometer.common.KeyValues; @@ -456,8 +455,6 @@ void kafkaAdminNotRecreatedIfBootstrapServersSameInProducerAndAdminConfig( assertThat(template.getKafkaAdmin()).isSameAs(kafkaAdmin); } - // https://github.com/spring-cloud/spring-cloud-stream/issues/3095#issuecomment-2707075861 - // https://github.com/spring-projects/spring-kafka/issues/3786 @Test void verifyKafkaRecordSenderContextTraceParentHandling() { String initialTraceParent = "traceparent-from-previous"; @@ -474,16 +471,15 @@ void verifyKafkaRecordSenderContextTraceParentHandling() { context.getSetter().set(record, "traceparent", updatedTraceParent); Iterable
traceparentHeaders = record.headers().headers("traceparent"); + List headerValues = StreamSupport.stream(traceparentHeaders.spliterator(), false) .map(header -> new String(header.value(), StandardCharsets.UTF_8)) - .collect(Collectors.toList()); + .toList(); // Verify there's only one traceparent header and it contains the updated value assertThat(headerValues).containsExactly(updatedTraceParent); } - // https://github.com/spring-cloud/spring-cloud-stream/issues/3095#issuecomment-2707075861 - // https://github.com/spring-projects/spring-kafka/issues/3786 @Test void verifyTraceParentHeader(@Autowired KafkaTemplate template, @Autowired SimpleTracer tracer) throws Exception { From 102c555c3a6bd50a7b6131ad73c8bcca28a53526 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 13 Mar 2025 18:41:17 -0400 Subject: [PATCH 3/3] PR review --- .../kafka/support/micrometer/KafkaRecordSenderContext.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java index 64af5739a9..50e9e310a5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java @@ -21,7 +21,6 @@ import io.micrometer.observation.transport.SenderContext; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; /** @@ -44,10 +43,7 @@ public class KafkaRecordSenderContext extends SenderContext public KafkaRecordSenderContext(ProducerRecord record, String beanName, Supplier clusterId) { super((carrier, key, value) -> { Headers headers = record.headers(); - Iterable
existingHeaders = headers.headers(key); - if (existingHeaders.iterator().hasNext()) { - headers.remove(key); - } + headers.remove(key); headers.add(key, value == null ? null : value.getBytes(StandardCharsets.UTF_8)); });