Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingIterable;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingIterator;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingList;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingListIterator;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand Down Expand Up @@ -59,6 +61,13 @@ public void transform(TypeTransformer transformer) {
.and(takesArguments(0))
.and(returns(Iterator.class)),
ConsumerRecordsInstrumentation.class.getName() + "$IteratorAdvice");
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(named("listIterator"))
.and(takesArguments(0))
.and(returns(ListIterator.class)),
ConsumerRecordsInstrumentation.class.getName() + "$ListIteratorAdvice");
}

@SuppressWarnings("unused")
Expand Down Expand Up @@ -120,4 +129,27 @@ public static <K, V> void wrap(
iterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
}
}

@SuppressWarnings("unused")
public static class ListIteratorAdvice {

@SuppressWarnings("unchecked")
@Advice.OnMethodExit(suppress = Throwable.class)
public static <K, V> void wrap(
@Advice.This ConsumerRecords<?, ?> records,
@Advice.Return(readOnly = false) ListIterator<ConsumerRecord<K, V>> listIterator) {

// it's important not to suppress consumer span creation here because this instrumentation can
// leak the context and so there may be a leaked consumer span in the context, in which
// case it's important to overwrite the leaked span instead of suppressing the correct span
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
listIterator =
TracingListIterator.wrap(
listIterator,
consumerProcessInstrumenter(),
wrappingEnabledSupplier(),
consumerContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
*/
public class TracingIterable<K, V> implements Iterable<ConsumerRecord<K, V>> {
private final Iterable<ConsumerRecord<K, V>> delegate;
private final Instrumenter<KafkaProcessRequest, Void> instrumenter;
private final BooleanSupplier wrappingEnabled;
private final KafkaConsumerContext consumerContext;
protected final Instrumenter<KafkaProcessRequest, Void> instrumenter;
protected final BooleanSupplier wrappingEnabled;
protected final KafkaConsumerContext consumerContext;
private boolean firstIterator = true;

protected TracingIterable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,14 @@ public int lastIndexOf(Object o) {

@Override
public ListIterator<ConsumerRecord<K, V>> listIterator() {
// TODO: the API for ListIterator is not really good to instrument it in context of Kafka
// Consumer so we will not do that for now
return delegate.listIterator();
return TracingListIterator.wrap(
delegate.listIterator(), instrumenter, wrappingEnabled, consumerContext);
}

@Override
public ListIterator<ConsumerRecord<K, V>> listIterator(int index) {
// TODO: the API for ListIterator is not really good to instrument it in context of Kafka
// Consumer so we will not do that for now
return delegate.listIterator(index);
return TracingListIterator.wrap(
delegate.listIterator(index), instrumenter, wrappingEnabled, consumerContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal;

import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.Iterator;
import java.util.ListIterator;
import java.util.function.BooleanSupplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public class TracingListIterator<K, V> implements ListIterator<ConsumerRecord<K, V>> {

private final ListIterator<ConsumerRecord<K, V>> delegateListIterator;
private final Iterator<ConsumerRecord<K, V>> tracingIterator;

private TracingListIterator(
ListIterator<ConsumerRecord<K, V>> delegateListIterator,
Instrumenter<KafkaProcessRequest, Void> instrumenter,
BooleanSupplier wrappingEnabled,
KafkaConsumerContext consumerContext) {
this.delegateListIterator = delegateListIterator;
this.tracingIterator =
TracingIterator.wrap(delegateListIterator, instrumenter, wrappingEnabled, consumerContext);
}

public static <K, V> ListIterator<ConsumerRecord<K, V>> wrap(
ListIterator<ConsumerRecord<K, V>> delegateListIterator,
Instrumenter<KafkaProcessRequest, Void> instrumenter,
BooleanSupplier wrappingEnabled,
KafkaConsumerContext consumerContext) {
if (wrappingEnabled.getAsBoolean()) {
return new TracingListIterator<>(
delegateListIterator, instrumenter, wrappingEnabled, consumerContext);
}
return delegateListIterator;
}

@Override
public boolean hasNext() {
return tracingIterator.hasNext();
}

@Override
public ConsumerRecord<K, V> next() {
return tracingIterator.next();
}

@Override
public boolean hasPrevious() {
return delegateListIterator.hasPrevious();
}

@Override
public ConsumerRecord<K, V> previous() {
return delegateListIterator.previous();
}

@Override
public int nextIndex() {
return delegateListIterator.nextIndex();
}

@Override
public int previousIndex() {
return delegateListIterator.previousIndex();
}

@Override
public void remove() {
delegateListIterator.remove();
}

@Override
public void set(ConsumerRecord<K, V> consumerRecord) {
delegateListIterator.set(consumerRecord);
}

@Override
public void add(ConsumerRecord<K, V> consumerRecord) {
delegateListIterator.add(consumerRecord);
}
}