Skip to content

Commit

Permalink
Move kafka metrics to separate instrumentation module (#9862)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit committed Nov 16, 2023
1 parent 7f0b079 commit 977a6f9
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ muzzle {
module.set("kafka-clients")
versions.set("[0.11.0.0,)")
assertInverse.set(true)
excludeInstrumentationName("kafka-clients-metrics")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.consumerReceiveInstrumenter;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.enhanceConfig;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
Expand All @@ -24,9 +22,6 @@
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand All @@ -43,12 +38,6 @@ public ElementMatcher<TypeDescription> typeMatcher() {

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Map.class)),
this.getClass().getName() + "$ConstructorMapAdvice");
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Properties.class)),
this.getClass().getName() + "$ConstructorPropertiesAdvice");
transformer.applyAdviceToMethod(
named("poll")
.and(isPublic())
Expand All @@ -58,29 +47,6 @@ public void transform(TypeTransformer transformer) {
this.getClass().getName() + "$PollAdvice");
}

@SuppressWarnings("unused")
public static class ConstructorMapAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false) Map<String, Object> config) {
// ensure config is a mutable map
if (config.getClass() != HashMap.class) {
config = new HashMap<>(config);
}
enhanceConfig(config);
}
}

@SuppressWarnings("unused")
public static class ConstructorPropertiesAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(0) Properties config) {
enhanceConfig(config);
}
}

@SuppressWarnings("unused")
public static class PollAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@

package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11;

import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.enhanceConfig;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.producerInstrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
Expand All @@ -20,9 +18,6 @@
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand All @@ -39,12 +34,6 @@ public ElementMatcher<TypeDescription> typeMatcher() {

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Map.class)),
this.getClass().getName() + "$ConstructorMapAdvice");
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Properties.class)),
this.getClass().getName() + "$ConstructorPropertiesAdvice");
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
Expand All @@ -54,29 +43,6 @@ public void transform(TypeTransformer transformer) {
KafkaProducerInstrumentation.class.getName() + "$SendAdvice");
}

@SuppressWarnings("unused")
public static class ConstructorMapAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false) Map<String, Object> config) {
// ensure config is a mutable map
if (config.getClass() != HashMap.class) {
config = new HashMap<>(config);
}
enhanceConfig(config);
}
}

@SuppressWarnings("unused")
public static class ConstructorPropertiesAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(0) Properties config) {
enhanceConfig(config);
}
}

@SuppressWarnings("unused")
public static class SendAdvice {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,9 @@
import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest;
import io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest;
import io.opentelemetry.instrumentation.kafka.internal.KafkaReceiveRequest;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier;
import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.RecordMetadata;

public final class KafkaSingletons {
Expand All @@ -31,9 +25,6 @@ public final class KafkaSingletons {
"otel.instrumentation.kafka.client-propagation.enabled",
"otel.instrumentation.kafka.producer-propagation.enabled",
true);
private static final boolean METRICS_ENABLED =
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.metric-reporter.enabled", true);

private static final Instrumenter<KafkaProducerRequest, RecordMetadata> PRODUCER_INSTRUMENTER;
private static final Instrumenter<KafkaReceiveRequest, Void> CONSUMER_RECEIVE_INSTRUMENTER;
Expand Down Expand Up @@ -69,39 +60,5 @@ public static Instrumenter<KafkaProcessRequest, Void> consumerProcessInstrumente
return CONSUMER_PROCESS_INSTRUMENTER;
}

@SuppressWarnings("unchecked")
public static void enhanceConfig(Map<? super String, Object> config) {
// skip enhancing configuration when metrics are disabled or when we have already enhanced it
if (!METRICS_ENABLED
|| config.get(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME)
!= null) {
return;
}
config.merge(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
OpenTelemetryMetricsReporter.class.getName(),
(class1, class2) -> {
// class1 is either a class name or List of class names or classes
if (class1 instanceof List) {
List<Object> result = new ArrayList<>();
result.addAll((List<Object>) class1);
result.add(class2);
return result;
} else if (class1 instanceof String) {
String className1 = (String) class1;
if (className1.isEmpty()) {
return class2;
}
}
return class1 + "," + class2;
});
config.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
new OpenTelemetrySupplier(GlobalOpenTelemetry.get()));
config.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME,
INSTRUMENTATION_NAME);
}

private KafkaSingletons() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.metrics;

import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.metrics.KafkaMetricsUtil.enhanceConfig;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class KafkaMetricsConsumerInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.kafka.clients.consumer.KafkaConsumer");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Map.class)),
this.getClass().getName() + "$ConstructorMapAdvice");
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Properties.class)),
this.getClass().getName() + "$ConstructorPropertiesAdvice");
}

@SuppressWarnings("unused")
public static class ConstructorMapAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false) Map<String, Object> config) {
// ensure config is a mutable map
if (config.getClass() != HashMap.class) {
config = new HashMap<>(config);
}
enhanceConfig(config);
}
}

@SuppressWarnings("unused")
public static class ConstructorPropertiesAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(0) Properties config) {
enhanceConfig(config);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.metrics;

import static java.util.Arrays.asList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class KafkaMetricsInstrumentationModule extends InstrumentationModule {
public KafkaMetricsInstrumentationModule() {
super(
"kafka-clients-metrics",
"kafka-clients",
"kafka-clients-metrics-0.11",
"kafka-clients-0.11",
"kafka");
}

@Override
public boolean isIndyModule() {
// OpenTelemetryMetricsReporter is not available in app class loader
return false;
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new KafkaMetricsProducerInstrumentation(), new KafkaMetricsConsumerInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.metrics;

import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.metrics.KafkaMetricsUtil.enhanceConfig;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class KafkaMetricsProducerInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.kafka.clients.producer.KafkaProducer");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Map.class)),
this.getClass().getName() + "$ConstructorMapAdvice");
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Properties.class)),
this.getClass().getName() + "$ConstructorPropertiesAdvice");
}

@SuppressWarnings("unused")
public static class ConstructorMapAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false) Map<String, Object> config) {
// ensure config is a mutable map
if (config.getClass() != HashMap.class) {
config = new HashMap<>(config);
}
enhanceConfig(config);
}
}

@SuppressWarnings("unused")
public static class ConstructorPropertiesAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(0) Properties config) {
enhanceConfig(config);
}
}
}
Loading

0 comments on commit 977a6f9

Please sign in to comment.