From 82a3f72549998fa9190f6ee5082e81d166b68ea7 Mon Sep 17 00:00:00 2001 From: patpatpat123 <43899031+patpatpat123@users.noreply.github.com> Date: Thu, 18 Sep 2025 12:19:09 +0800 Subject: [PATCH 1/5] Update TracingIterable.java --- .../kafkaclients/common/v0_11/internal/TracingIterable.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingIterable.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingIterable.java index 540a15659765..2a30128d49a9 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingIterable.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingIterable.java @@ -16,9 +16,9 @@ */ public class TracingIterable implements Iterable> { private final Iterable> delegate; - private final Instrumenter instrumenter; - private final BooleanSupplier wrappingEnabled; - private final KafkaConsumerContext consumerContext; + protected final Instrumenter instrumenter; + protected final BooleanSupplier wrappingEnabled; + protected final KafkaConsumerContext consumerContext; private boolean firstIterator = true; protected TracingIterable( From 2e5ca6320885c7c113b3539a322b0aef297adeed Mon Sep 17 00:00:00 2001 From: patpatpat123 <43899031+patpatpat123@users.noreply.github.com> Date: Thu, 18 Sep 2025 12:19:38 +0800 Subject: [PATCH 2/5] Update TracingList.java --- .../kafkaclients/common/v0_11/internal/TracingList.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingList.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingList.java index a896f1ebf70f..3cd2eef8c45c 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingList.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingList.java @@ -137,16 +137,12 @@ public int lastIndexOf(Object o) { @Override public ListIterator> listIterator() { - // TODO: the API for ListIterator is not really good to instrument it in context of Kafka - // Consumer so we will not do that for now - return delegate.listIterator(); + return TracingListIterator.wrap(delegate.listIterator(), instrumenter, wrappingEnabled, consumerContext); } @Override public ListIterator> listIterator(int index) { - // TODO: the API for ListIterator is not really good to instrument it in context of Kafka - // Consumer so we will not do that for now - return delegate.listIterator(index); + return TracingListIterator.wrap(delegate.listIterator(index), instrumenter, wrappingEnabled, consumerContext); } @Override From af9522acd952d204bcd7082173be05ff4b342ed5 Mon Sep 17 00:00:00 2001 From: patpatpat123 <43899031+patpatpat123@users.noreply.github.com> Date: Thu, 18 Sep 2025 12:20:14 +0800 Subject: [PATCH 3/5] Create TracingListIterator.java --- .../v0_11/internal/TracingListIterator.java | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingListIterator.java diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingListIterator.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingListIterator.java new file mode 100644 index 000000000000..b0b131846411 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingListIterator.java @@ -0,0 +1,88 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal; + +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import java.util.Iterator; +import java.util.ListIterator; +import java.util.function.BooleanSupplier; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ +public class TracingListIterator implements ListIterator> { + + private final ListIterator> delegateListIterator; + private final Iterator> tracingIterator; + + private TracingListIterator( + ListIterator> delegateListIterator, + Instrumenter instrumenter, + BooleanSupplier wrappingEnabled, + KafkaConsumerContext consumerContext) { + this.delegateListIterator = delegateListIterator; + this.tracingIterator = TracingIterator.wrap(delegateListIterator, instrumenter, wrappingEnabled, consumerContext); + } + + public static ListIterator> wrap( + ListIterator> delegateListIterator, + Instrumenter instrumenter, + BooleanSupplier wrappingEnabled, + KafkaConsumerContext consumerContext) { + if (wrappingEnabled.getAsBoolean()) { + return new TracingListIterator<>( + delegateListIterator, instrumenter, wrappingEnabled, consumerContext); + } + return delegateListIterator; + } + + @Override + public boolean hasNext() { + return tracingIterator.hasNext(); + } + + @Override + public ConsumerRecord next() { + return tracingIterator.next(); + } + + @Override + public boolean hasPrevious() { + return delegateListIterator.hasPrevious(); + } + + @Override + public ConsumerRecord previous() { + return delegateListIterator.previous(); + } + + @Override + public int nextIndex() { + return delegateListIterator.nextIndex(); + } + + @Override + public int previousIndex() { + return delegateListIterator.previousIndex(); + } + + @Override + public void remove() { + delegateListIterator.remove(); + } + + @Override + public void set(ConsumerRecord consumerRecord) { + delegateListIterator.set(consumerRecord); + } + + @Override + public void add(ConsumerRecord consumerRecord) { + delegateListIterator.add(consumerRecord); + } +} From da485fd1e88bb389348b5a8a05c69a1af1324017 Mon Sep 17 00:00:00 2001 From: patpatpat123 <43899031+patpatpat123@users.noreply.github.com> Date: Thu, 18 Sep 2025 15:10:58 +0800 Subject: [PATCH 4/5] Update ConsumerRecordsInstrumentation.java --- .../v0_11/ConsumerRecordsInstrumentation.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java index d680e78143e9..e1826055f136 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java @@ -19,10 +19,12 @@ import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingIterable; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingIterator; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingList; +import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingListIterator; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.util.Iterator; import java.util.List; +import java.util.ListIterator; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -59,6 +61,13 @@ public void transform(TypeTransformer transformer) { .and(takesArguments(0)) .and(returns(Iterator.class)), ConsumerRecordsInstrumentation.class.getName() + "$IteratorAdvice"); + transformer.applyAdviceToMethod( + isMethod() + .and(isPublic()) + .and(named("listIterator")) + .and(takesArguments(0)) + .and(returns(ListIterator.class)), + ConsumerRecordsInstrumentation.class.getName() + "$ListIteratorAdvice"); } @SuppressWarnings("unused") @@ -120,4 +129,24 @@ public static void wrap( iterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext); } } + + @SuppressWarnings("unused") + public static class ListIteratorAdvice { + + @SuppressWarnings("unchecked") + @Advice.OnMethodExit(suppress = Throwable.class) + public static void wrap( + @Advice.This ConsumerRecords records, + @Advice.Return(readOnly = false) ListIterator> listIterator) { + + // it's important not to suppress consumer span creation here because this instrumentation can + // leak the context and so there may be a leaked consumer span in the context, in which + // case it's important to overwrite the leaked span instead of suppressing the correct span + // (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947) + KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records); + listIterator = + TracingListIterator.wrap( + listIterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext); + } + } } From 34851cec3692d1f7f6d92667ef5a15ca73c25d83 Mon Sep 17 00:00:00 2001 From: otelbot <197425009+otelbot@users.noreply.github.com> Date: Thu, 18 Sep 2025 13:29:48 +0000 Subject: [PATCH 5/5] ./gradlew spotlessApply --- .../kafkaclients/v0_11/ConsumerRecordsInstrumentation.java | 5 ++++- .../kafkaclients/common/v0_11/internal/TracingList.java | 6 ++++-- .../common/v0_11/internal/TracingListIterator.java | 3 ++- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java index e1826055f136..8979cf725932 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java @@ -146,7 +146,10 @@ public static void wrap( KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records); listIterator = TracingListIterator.wrap( - listIterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext); + listIterator, + consumerProcessInstrumenter(), + wrappingEnabledSupplier(), + consumerContext); } } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingList.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingList.java index 3cd2eef8c45c..8af6d6ea2962 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingList.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingList.java @@ -137,12 +137,14 @@ public int lastIndexOf(Object o) { @Override public ListIterator> listIterator() { - return TracingListIterator.wrap(delegate.listIterator(), instrumenter, wrappingEnabled, consumerContext); + return TracingListIterator.wrap( + delegate.listIterator(), instrumenter, wrappingEnabled, consumerContext); } @Override public ListIterator> listIterator(int index) { - return TracingListIterator.wrap(delegate.listIterator(index), instrumenter, wrappingEnabled, consumerContext); + return TracingListIterator.wrap( + delegate.listIterator(index), instrumenter, wrappingEnabled, consumerContext); } @Override diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingListIterator.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingListIterator.java index b0b131846411..7f153a0ef7e8 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingListIterator.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingListIterator.java @@ -26,7 +26,8 @@ private TracingListIterator( BooleanSupplier wrappingEnabled, KafkaConsumerContext consumerContext) { this.delegateListIterator = delegateListIterator; - this.tracingIterator = TracingIterator.wrap(delegateListIterator, instrumenter, wrappingEnabled, consumerContext); + this.tracingIterator = + TracingIterator.wrap(delegateListIterator, instrumenter, wrappingEnabled, consumerContext); } public static ListIterator> wrap(