From b9544744370fdfbfede7acbab9c9fe1e92af7065 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Fri, 14 Jun 2024 15:40:21 +0800 Subject: [PATCH 01/20] support pulsar messaging.publish.duration semantic --- .../build.gradle.kts | 1 + .../messaging/MessagingMetricsAdvice.java | 39 ++++++ .../messaging/MessagingProducerMetrics.java | 79 ++++++++++++ .../MessagingProducerMetricsTest.java | 119 ++++++++++++++++++ .../v2_8/telemetry/PulsarSingletons.java | 4 +- 5 files changed, 241 insertions(+), 1 deletion(-) create mode 100644 instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java create mode 100644 instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java create mode 100644 instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java diff --git a/instrumentation-api-incubator/build.gradle.kts b/instrumentation-api-incubator/build.gradle.kts index 6acb75e5f79d..f901051e5438 100644 --- a/instrumentation-api-incubator/build.gradle.kts +++ b/instrumentation-api-incubator/build.gradle.kts @@ -12,6 +12,7 @@ group = "io.opentelemetry.instrumentation" dependencies { api("io.opentelemetry.semconv:opentelemetry-semconv") + api("io.opentelemetry.semconv:opentelemetry-semconv-incubating") api(project(":instrumentation-api")) implementation("io.opentelemetry:opentelemetry-api-incubator") diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java new file mode 100644 index 000000000000..01d266d4670b --- /dev/null +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java @@ -0,0 +1,39 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.api.incubator.semconv.messaging; + +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; + +import io.opentelemetry.api.incubator.metrics.ExtendedDoubleHistogramBuilder; +import io.opentelemetry.api.metrics.DoubleHistogramBuilder; +import io.opentelemetry.semconv.ErrorAttributes; +import io.opentelemetry.semconv.ServerAttributes; +import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; +import java.util.List; + +public class MessagingMetricsAdvice { + static final List DURATION_SECONDS_BUCKETS = + unmodifiableList( + asList(0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0)); + + static void applyPublishDurationAdvice(DoubleHistogramBuilder builder) { + if (!(builder instanceof ExtendedDoubleHistogramBuilder)) { + return; + } + ((ExtendedDoubleHistogramBuilder) builder) + .setAttributesAdvice( + asList( + MessagingIncubatingAttributes.MESSAGING_SYSTEM, + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + MessagingIncubatingAttributes.MESSAGING_OPERATION, + MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT, + ErrorAttributes.ERROR_TYPE, + ServerAttributes.SERVER_PORT, + ServerAttributes.SERVER_ADDRESS)); + } + private MessagingMetricsAdvice() {} +} diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java new file mode 100644 index 000000000000..8dda57a4203b --- /dev/null +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java @@ -0,0 +1,79 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.api.incubator.semconv.messaging; + +import static java.util.logging.Level.FINE; + +import com.google.auto.value.AutoValue; +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.DoubleHistogramBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.ContextKey; +import io.opentelemetry.instrumentation.api.instrumenter.OperationListener; +import io.opentelemetry.instrumentation.api.instrumenter.OperationMetrics; +import io.opentelemetry.instrumentation.api.internal.OperationMetricsUtil; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +public class MessagingProducerMetrics implements OperationListener { + private static final double NANOS_PER_S = TimeUnit.SECONDS.toNanos(1); + + private static final ContextKey PULSAR_PUBLISH_METRICS_STATE = + ContextKey.named("pulsar-producer-metrics-state"); + private static final Logger logger = Logger.getLogger(MessagingProducerMetrics.class.getName()); + + + private final DoubleHistogram publishDurationHistogram; + private MessagingProducerMetrics(Meter meter) { + DoubleHistogramBuilder durationBuilder = + meter + .histogramBuilder("messaging.publish.duration") + .setDescription("Measures the duration of publish operation.") + .setExplicitBucketBoundariesAdvice(MessagingMetricsAdvice.DURATION_SECONDS_BUCKETS) + .setUnit("s"); + MessagingMetricsAdvice.applyPublishDurationAdvice(durationBuilder); + publishDurationHistogram = durationBuilder.build(); + } + + public static OperationMetrics get() { + return OperationMetricsUtil.create("messaging produce", MessagingProducerMetrics::new); + } + + @Override + @CanIgnoreReturnValue + public Context onStart(Context context, Attributes startAttributes, long startNanos) { + return context.with( + PULSAR_PUBLISH_METRICS_STATE, + new AutoValue_MessagingProducerMetrics_State(startAttributes, startNanos)); + } + + @Override + public void onEnd(Context context, Attributes endAttributes, long endNanos) { + MessagingProducerMetrics.State state = context.get(PULSAR_PUBLISH_METRICS_STATE); + if (state == null) { + logger.log( + FINE, + "No state present when ending context {0}. Cannot record pulsar publish metrics.", + context); + return; + } + + Attributes attributes = state.startAttributes().toBuilder().putAll(endAttributes).build(); + + publishDurationHistogram.record((endNanos - state.startTimeNanos()) / NANOS_PER_S, attributes, context); + } + + @AutoValue + abstract static class State { + + abstract Attributes startAttributes(); + + abstract long startTimeNanos(); + } +} diff --git a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java new file mode 100644 index 000000000000..ea41e02e0870 --- /dev/null +++ b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java @@ -0,0 +1,119 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.api.incubator.semconv.messaging; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.OperationListener; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import io.opentelemetry.semconv.ServerAttributes; +import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Test; + +public class MessagingProducerMetricsTest { + + static final double[] DURATION_BUCKETS = + MessagingMetricsAdvice.DURATION_SECONDS_BUCKETS.stream().mapToDouble(d -> d).toArray(); + + @Test + void collectsMetrics() { + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + SdkMeterProvider meterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader).build(); + + OperationListener listener = MessagingProducerMetrics.get().create(meterProvider.get("test")); + + Attributes requestAttributes = + Attributes.builder() + .put(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "pulsar") + .put(MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + "persistent://public/default/topic") + .put(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish") + .put(ServerAttributes.SERVER_PORT, 6650) + .put(ServerAttributes.SERVER_ADDRESS, "localhost") + .build(); + + Attributes responseAttributes = + Attributes.builder() + .put(MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID, "1:1:0:0") + .put(MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 2) + .build(); + + Context parent = + Context.root() + .with( + Span.wrap( + SpanContext.create( + "ff01020304050600ff0a0b0c0d0e0f00", + "090a0b0c0d0e0f00", + TraceFlags.getSampled(), + TraceState.getDefault()))); + + Context context1 = listener.onStart(parent, requestAttributes, nanos(100)); + + assertThat(metricReader.collectAllMetrics()).isEmpty(); + + Context context2 = listener.onStart(Context.root(), requestAttributes, nanos(150)); + + assertThat(metricReader.collectAllMetrics()).isEmpty(); + + listener.onEnd(context1, responseAttributes, nanos(250)); + + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.publish.duration") + .hasUnit("s") + .hasDescription("Measures the duration of publish operation.") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSum(0.15 /* seconds */) + .hasAttributesSatisfying( + equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, + "pulsar"), + equalTo( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + "persistent://public/default/topic"), + equalTo(ServerAttributes.SERVER_PORT, 6650), + equalTo(ServerAttributes.SERVER_ADDRESS, "localhost")) + .hasExemplarsSatisfying( + exemplar -> + exemplar + .hasTraceId("ff01020304050600ff0a0b0c0d0e0f00") + .hasSpanId("090a0b0c0d0e0f00")) + .hasBucketBoundaries(DURATION_BUCKETS)))); + + listener.onEnd(context2, responseAttributes, nanos(300)); + + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.publish.duration") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> point.hasSum(0.3 /* seconds */)))); + } + + private static long nanos(int millis) { + return TimeUnit.MILLISECONDS.toNanos(millis); + } +} diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java index 498c075f68d6..0b15cf51f025 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java @@ -13,6 +13,7 @@ import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingProducerMetrics; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; @@ -136,7 +137,8 @@ private static Instrumenter createProducerInstrumenter() { .addAttributesExtractor( createMessagingAttributesExtractor(getter, MessageOperation.PUBLISH)) .addAttributesExtractor( - ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter())); + ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter())) + .addOperationMetrics(MessagingProducerMetrics.get()); if (InstrumentationConfig.get() .getBoolean("otel.instrumentation.pulsar.experimental-span-attributes", false)) { From d855f97adfd4228a4c50e39ca04ff428c2d84108 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Fri, 14 Jun 2024 16:28:34 +0800 Subject: [PATCH 02/20] code style --- .../semconv/messaging/MessagingMetricsAdvice.java | 1 + .../semconv/messaging/MessagingProducerMetrics.java | 5 +++-- .../semconv/messaging/MessagingProducerMetricsTest.java | 9 ++++++--- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java index 01d266d4670b..730f82c7ded2 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java @@ -35,5 +35,6 @@ static void applyPublishDurationAdvice(DoubleHistogramBuilder builder) { ServerAttributes.SERVER_PORT, ServerAttributes.SERVER_ADDRESS)); } + private MessagingMetricsAdvice() {} } diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java index 8dda57a4203b..7dc4400a9738 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java @@ -28,8 +28,8 @@ public class MessagingProducerMetrics implements OperationListener { ContextKey.named("pulsar-producer-metrics-state"); private static final Logger logger = Logger.getLogger(MessagingProducerMetrics.class.getName()); - private final DoubleHistogram publishDurationHistogram; + private MessagingProducerMetrics(Meter meter) { DoubleHistogramBuilder durationBuilder = meter @@ -66,7 +66,8 @@ public void onEnd(Context context, Attributes endAttributes, long endNanos) { Attributes attributes = state.startAttributes().toBuilder().putAll(endAttributes).build(); - publishDurationHistogram.record((endNanos - state.startTimeNanos()) / NANOS_PER_S, attributes, context); + publishDurationHistogram.record( + (endNanos - state.startTimeNanos()) / NANOS_PER_S, attributes, context); } @AutoValue diff --git a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java index ea41e02e0870..03d3505f1e8a 100644 --- a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java +++ b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java @@ -39,7 +39,8 @@ void collectsMetrics() { Attributes requestAttributes = Attributes.builder() .put(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "pulsar") - .put(MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + .put( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, "persistent://public/default/topic") .put(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish") .put(ServerAttributes.SERVER_PORT, 6650) @@ -86,10 +87,12 @@ void collectsMetrics() { point .hasSum(0.15 /* seconds */) .hasAttributesSatisfying( - equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, + equalTo( + MessagingIncubatingAttributes.MESSAGING_SYSTEM, "pulsar"), equalTo( - MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + MessagingIncubatingAttributes + .MESSAGING_DESTINATION_NAME, "persistent://public/default/topic"), equalTo(ServerAttributes.SERVER_PORT, 6650), equalTo(ServerAttributes.SERVER_ADDRESS, "localhost")) From b5d512bdff50029ed4792a32d6bec4cd4fb4fc56 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Mon, 17 Jun 2024 20:33:59 +0800 Subject: [PATCH 03/20] Update instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java Co-authored-by: Lauri Tulmin --- .../api/incubator/semconv/messaging/MessagingMetricsAdvice.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java index 730f82c7ded2..6c0ef92be758 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java @@ -15,7 +15,7 @@ import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; import java.util.List; -public class MessagingMetricsAdvice { +final class MessagingMetricsAdvice { static final List DURATION_SECONDS_BUCKETS = unmodifiableList( asList(0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0)); From 3de71a852f062fde81234fe2908cc3b6bff06148 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Mon, 17 Jun 2024 20:34:45 +0800 Subject: [PATCH 04/20] Update instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java Co-authored-by: Lauri Tulmin --- .../incubator/semconv/messaging/MessagingProducerMetrics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java index 7dc4400a9738..4a11c53b804b 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Logger; -public class MessagingProducerMetrics implements OperationListener { +public final class MessagingProducerMetrics implements OperationListener { private static final double NANOS_PER_S = TimeUnit.SECONDS.toNanos(1); private static final ContextKey PULSAR_PUBLISH_METRICS_STATE = From 6bb2c90abcf705a57c4fdc0d9ab7838f6ed2319a Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Mon, 17 Jun 2024 20:35:56 +0800 Subject: [PATCH 05/20] Update instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java Co-authored-by: Steve Rao --- .../semconv/messaging/MessagingProducerMetricsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java index 03d3505f1e8a..8b1ed642add4 100644 --- a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java +++ b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java @@ -25,7 +25,7 @@ public class MessagingProducerMetricsTest { - static final double[] DURATION_BUCKETS = + private static final double[] DURATION_BUCKETS = MessagingMetricsAdvice.DURATION_SECONDS_BUCKETS.stream().mapToDouble(d -> d).toArray(); @Test From 6f68ebd42dd63fed0a686a0edca900ba6e06b28b Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Mon, 17 Jun 2024 20:36:41 +0800 Subject: [PATCH 06/20] Update instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java Co-authored-by: Steve Rao --- .../semconv/messaging/MessagingProducerMetricsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java index 8b1ed642add4..491387253ee9 100644 --- a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java +++ b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; -public class MessagingProducerMetricsTest { +class MessagingProducerMetricsTest { private static final double[] DURATION_BUCKETS = MessagingMetricsAdvice.DURATION_SECONDS_BUCKETS.stream().mapToDouble(d -> d).toArray(); From 9f42598dc8d8b682d6ea7fcd7b5c263e3714d795 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 18 Jun 2024 00:01:10 +0800 Subject: [PATCH 07/20] fix with cr --- .../build.gradle.kts | 2 +- .../messaging/MessagingMetricsAdvice.java | 20 ++++++++++++++----- .../messaging/MessagingProducerMetrics.java | 13 ++++++++---- 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/instrumentation-api-incubator/build.gradle.kts b/instrumentation-api-incubator/build.gradle.kts index f901051e5438..00d887da1b25 100644 --- a/instrumentation-api-incubator/build.gradle.kts +++ b/instrumentation-api-incubator/build.gradle.kts @@ -12,7 +12,6 @@ group = "io.opentelemetry.instrumentation" dependencies { api("io.opentelemetry.semconv:opentelemetry-semconv") - api("io.opentelemetry.semconv:opentelemetry-semconv-incubating") api(project(":instrumentation-api")) implementation("io.opentelemetry:opentelemetry-api-incubator") @@ -22,6 +21,7 @@ dependencies { testImplementation(project(":testing-common")) testImplementation("io.opentelemetry:opentelemetry-sdk") testImplementation("io.opentelemetry:opentelemetry-sdk-testing") + testImplementation("io.opentelemetry.semconv:opentelemetry-semconv-incubating") } tasks { diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java index 6c0ef92be758..8ada61c1683f 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java @@ -8,11 +8,11 @@ import static java.util.Arrays.asList; import static java.util.Collections.unmodifiableList; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.incubator.metrics.ExtendedDoubleHistogramBuilder; import io.opentelemetry.api.metrics.DoubleHistogramBuilder; import io.opentelemetry.semconv.ErrorAttributes; import io.opentelemetry.semconv.ServerAttributes; -import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; import java.util.List; final class MessagingMetricsAdvice { @@ -20,6 +20,16 @@ final class MessagingMetricsAdvice { unmodifiableList( asList(0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0)); + // copied from MessagingIncubatingAttributes + private static final AttributeKey MESSAGING_SYSTEM = + AttributeKey.stringKey("messaging.system"); + private static final AttributeKey MESSAGING_DESTINATION_NAME = + AttributeKey.stringKey("messaging.destination.name"); + private static final AttributeKey MESSAGING_OPERATION = + AttributeKey.stringKey("messaging.operation"); + private static final AttributeKey MESSAGING_BATCH_MESSAGE_COUNT = + AttributeKey.longKey("messaging.batch.message_count"); + static void applyPublishDurationAdvice(DoubleHistogramBuilder builder) { if (!(builder instanceof ExtendedDoubleHistogramBuilder)) { return; @@ -27,10 +37,10 @@ static void applyPublishDurationAdvice(DoubleHistogramBuilder builder) { ((ExtendedDoubleHistogramBuilder) builder) .setAttributesAdvice( asList( - MessagingIncubatingAttributes.MESSAGING_SYSTEM, - MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, - MessagingIncubatingAttributes.MESSAGING_OPERATION, - MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT, + MESSAGING_SYSTEM, + MESSAGING_DESTINATION_NAME, + MESSAGING_OPERATION, + MESSAGING_BATCH_MESSAGE_COUNT, ErrorAttributes.ERROR_TYPE, ServerAttributes.SERVER_PORT, ServerAttributes.SERVER_ADDRESS)); diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java index 4a11c53b804b..0800436f96d3 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java @@ -21,11 +21,16 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Logger; +/** + * {@link OperationListener} which keeps track of Producer + * metrics. + */ public final class MessagingProducerMetrics implements OperationListener { private static final double NANOS_PER_S = TimeUnit.SECONDS.toNanos(1); - private static final ContextKey PULSAR_PUBLISH_METRICS_STATE = - ContextKey.named("pulsar-producer-metrics-state"); + private static final ContextKey MESSAGING_PRODUCER_METRICS_STATE = + ContextKey.named("messaging-producer-metrics-state"); private static final Logger logger = Logger.getLogger(MessagingProducerMetrics.class.getName()); private final DoubleHistogram publishDurationHistogram; @@ -49,13 +54,13 @@ public static OperationMetrics get() { @CanIgnoreReturnValue public Context onStart(Context context, Attributes startAttributes, long startNanos) { return context.with( - PULSAR_PUBLISH_METRICS_STATE, + MESSAGING_PRODUCER_METRICS_STATE, new AutoValue_MessagingProducerMetrics_State(startAttributes, startNanos)); } @Override public void onEnd(Context context, Attributes endAttributes, long endNanos) { - MessagingProducerMetrics.State state = context.get(PULSAR_PUBLISH_METRICS_STATE); + MessagingProducerMetrics.State state = context.get(MESSAGING_PRODUCER_METRICS_STATE); if (state == null) { logger.log( FINE, From 8b5e585843ef97356b32df97520f2c64d2e5031c Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 18 Jun 2024 15:14:19 +0800 Subject: [PATCH 08/20] Add pulsar metrics test --- .../messaging/MessagingProducerMetrics.java | 2 +- .../pulsar/v2_8/AbstractPulsarClientTest.java | 29 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java index 0800436f96d3..8a2824793bb8 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java @@ -64,7 +64,7 @@ public void onEnd(Context context, Attributes endAttributes, long endNanos) { if (state == null) { logger.log( FINE, - "No state present when ending context {0}. Cannot record pulsar publish metrics.", + "No state present when ending context {0}. Cannot record produce publish metrics.", context); return; } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java index f32daa20e074..bebbffd773c2 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java @@ -15,6 +15,8 @@ import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.common.AttributeKey; @@ -22,6 +24,7 @@ import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; +import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions; import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.SpanData; import java.time.Duration; @@ -77,6 +80,12 @@ abstract class AbstractPulsarClientTest { private static final AttributeKey MESSAGE_TYPE = AttributeKey.stringKey("messaging.pulsar.message.type"); + private static final List DURATION_SECONDS_BUCKETS = + unmodifiableList( + asList(0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0)); + private static final double[] DURATION_BUCKETS = + DURATION_SECONDS_BUCKETS.stream().mapToDouble(d -> d).toArray(); + @BeforeAll static void beforeAll() throws PulsarClientException { pulsar = @@ -163,6 +172,26 @@ void testConsumeNonPartitionedTopicUsingBatchReceive() throws Exception { .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( batchReceiveAttributes(topic, null, false)))); + + assertThat(testing.metrics()) + .satisfiesExactlyInAnyOrder( + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.publish.duration") + .hasUnit("s") + .hasDescription("Measures the duration of publish operation.") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSumGreaterThan(0.0) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)) + .hasBucketBoundaries(DURATION_BUCKETS)))); } @Test From 53b2dee0427c49617f701f271ac258bf8d3b651f Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 18 Jun 2024 18:17:40 +0800 Subject: [PATCH 09/20] Update doc url --- .../incubator/semconv/messaging/MessagingProducerMetrics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java index 8a2824793bb8..44d5b243744a 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java @@ -23,7 +23,7 @@ /** * {@link OperationListener} which keeps track of Producer + * href="https://github.com/open-telemetry/semantic-conventions/blob/v1.26.0/docs/messaging/messaging-metrics.md#metric-messagingpublishduration">Producer * metrics. */ public final class MessagingProducerMetrics implements OperationListener { From df187f432dfc7ade4ab5f5d51102ca8edec5f849 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 18 Jun 2024 23:18:10 +0800 Subject: [PATCH 10/20] use array --- .../pulsar/v2_8/AbstractPulsarClientTest.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java index bebbffd773c2..18deed26be82 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java @@ -15,8 +15,6 @@ import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; -import static java.util.Arrays.asList; -import static java.util.Collections.unmodifiableList; import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.common.AttributeKey; @@ -79,12 +77,9 @@ abstract class AbstractPulsarClientTest { private static final AttributeKey MESSAGE_TYPE = AttributeKey.stringKey("messaging.pulsar.message.type"); - - private static final List DURATION_SECONDS_BUCKETS = - unmodifiableList( - asList(0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0)); - private static final double[] DURATION_BUCKETS = - DURATION_SECONDS_BUCKETS.stream().mapToDouble(d -> d).toArray(); + private static final double[] DURATION_BUCKETS = new double[] { + 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0 + }; @BeforeAll static void beforeAll() throws PulsarClientException { From e2d5e0982cc41aa5aac82d89f24d0b8a7b7e5521 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 18 Jun 2024 23:33:31 +0800 Subject: [PATCH 11/20] use array --- .../pulsar/v2_8/AbstractPulsarClientTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java index 18deed26be82..7883d5082c4f 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java @@ -77,9 +77,10 @@ abstract class AbstractPulsarClientTest { private static final AttributeKey MESSAGE_TYPE = AttributeKey.stringKey("messaging.pulsar.message.type"); - private static final double[] DURATION_BUCKETS = new double[] { - 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0 - }; + private static final double[] DURATION_BUCKETS = + new double[] { + 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0 + }; @BeforeAll static void beforeAll() throws PulsarClientException { From 4b9f15c67f95e9ba1b578d1a4e4c5b54269eae94 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Thu, 27 Mar 2025 16:46:12 +0800 Subject: [PATCH 12/20] pulsar4.0 support messaging.client.sent.messages --- .../messaging/MessagingMetricsAdvice.java | 6 + .../messaging/MessagingProducerMetrics.java | 30 ++++ .../pulsar-2.8/javaagent/build.gradle.kts | 1 + .../v2_8/ConsumerBaseInstrumentation.java | 2 +- .../v2_8/ConsumerImplInstrumentation.java | 1 + .../pulsar/v2_8/MessageInstrumentation.java | 1 + .../v2_8/MessageListenerInstrumentation.java | 3 +- .../v2_8/ProducerImplInstrumentation.java | 3 +- .../v2_8/SendCallbackInstrumentation.java | 4 +- .../v2_8/telemetry/PulsarBatchRequest.java | 5 +- .../PulsarBatchRequestSpanLinksExtractor.java | 2 + .../PulsarMessagingAttributesGetter.java | 1 + .../v2_8/telemetry/PulsarSingletons.java | 8 +- .../pulsar-4.0/javaagent/build.gradle.kts | 22 +++ .../v4_0/ProducerImplInstrumentation.java | 85 ++++++++++ .../v4_0/PulsarInstrumentationModule.java | 24 +++ .../v4_0/SendCallbackInstrumentation.java | 80 +++++++++ .../PulsarMessagingAttributesGetter.java | 102 ++++++++++++ .../v4_0/telemetry/PulsarSingletons.java | 57 +++++++ .../pulsar/v4_0/PulsarClientTest.java | 152 ++++++++++++++++++ .../pulsar-common/javaagent/build.gradle.kts | 7 + .../pulsar/common}/ProducerData.java | 2 +- .../pulsar/common}/SendCallbackData.java | 4 +- .../pulsar/common}/UrlParser.java | 2 +- .../pulsar/common}/VirtualFieldStore.java | 4 +- .../common}/telemetry/BasePulsarRequest.java | 4 +- ...perimentalProducerAttributesExtractor.java | 4 +- .../telemetry/MessageListenerContext.java | 2 +- .../telemetry/MessageTextMapGetter.java | 4 +- .../telemetry/MessageTextMapSetter.java | 7 +- .../PulsarNetClientAttributesGetter.java | 3 +- .../common}/telemetry/PulsarRequest.java | 24 ++- settings.gradle.kts | 2 + 33 files changed, 627 insertions(+), 31 deletions(-) create mode 100644 instrumentation/pulsar/pulsar-4.0/javaagent/build.gradle.kts create mode 100644 instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/ProducerImplInstrumentation.java create mode 100644 instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/PulsarInstrumentationModule.java create mode 100644 instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/SendCallbackInstrumentation.java create mode 100644 instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/telemetry/PulsarMessagingAttributesGetter.java create mode 100644 instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/telemetry/PulsarSingletons.java create mode 100644 instrumentation/pulsar/pulsar-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/PulsarClientTest.java create mode 100644 instrumentation/pulsar/pulsar-common/javaagent/build.gradle.kts rename instrumentation/pulsar/{pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8 => pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common}/ProducerData.java (85%) rename instrumentation/pulsar/{pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8 => pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common}/SendCallbackData.java (76%) rename instrumentation/pulsar/{pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8 => pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common}/UrlParser.java (95%) rename instrumentation/pulsar/{pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8 => pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common}/VirtualFieldStore.java (94%) rename instrumentation/pulsar/{pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8 => pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common}/telemetry/BasePulsarRequest.java (73%) rename instrumentation/pulsar/{pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8 => pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common}/telemetry/ExperimentalProducerAttributesExtractor.java (90%) rename instrumentation/pulsar/{pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8 => pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common}/telemetry/MessageListenerContext.java (92%) rename instrumentation/pulsar/{pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8 => pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common}/telemetry/MessageTextMapGetter.java (76%) rename instrumentation/pulsar/{pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8 => pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common}/telemetry/MessageTextMapSetter.java (79%) rename instrumentation/pulsar/{pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8 => pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common}/telemetry/PulsarNetClientAttributesGetter.java (89%) rename instrumentation/pulsar/{pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8 => pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common}/telemetry/PulsarRequest.java (63%) diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java index f87c814d2a31..df4866f5f1ca 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java @@ -65,6 +65,12 @@ static void applyReceiveMessagesAdvice(LongCounterBuilder builder) { } ((ExtendedLongCounterBuilder) builder).setAttributesAdvice(MESSAGING_ATTRIBUTES); } + static void applyProduceMessagesAdvice(LongCounterBuilder builder) { + if (!(builder instanceof ExtendedLongCounterBuilder)) { + return; + } + ((ExtendedLongCounterBuilder) builder).setAttributesAdvice(MESSAGING_ATTRIBUTES); + } private MessagingMetricsAdvice() {} } diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java index 44d5b243744a..ce4b16950044 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java @@ -9,9 +9,12 @@ import com.google.auto.value.AutoValue; import com.google.errorprone.annotations.CanIgnoreReturnValue; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.DoubleHistogramBuilder; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongCounterBuilder; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.context.Context; import io.opentelemetry.context.ContextKey; @@ -29,11 +32,15 @@ public final class MessagingProducerMetrics implements OperationListener { private static final double NANOS_PER_S = TimeUnit.SECONDS.toNanos(1); + // copied from MessagingIncubatingAttributes + private static final AttributeKey MESSAGING_BATCH_MESSAGE_COUNT = + AttributeKey.longKey("messaging.batch.message_count"); private static final ContextKey MESSAGING_PRODUCER_METRICS_STATE = ContextKey.named("messaging-producer-metrics-state"); private static final Logger logger = Logger.getLogger(MessagingProducerMetrics.class.getName()); private final DoubleHistogram publishDurationHistogram; + private final LongCounter produceMessageCount; private MessagingProducerMetrics(Meter meter) { DoubleHistogramBuilder durationBuilder = @@ -44,6 +51,14 @@ private MessagingProducerMetrics(Meter meter) { .setUnit("s"); MessagingMetricsAdvice.applyPublishDurationAdvice(durationBuilder); publishDurationHistogram = durationBuilder.build(); + + LongCounterBuilder longCounterBuilder = + meter + .counterBuilder("messaging.client.sent.messages") + .setDescription("Number of messages producer attempted to send to the broker.") + .setUnit("{message}"); + MessagingMetricsAdvice.applyProduceMessagesAdvice(longCounterBuilder); + produceMessageCount = longCounterBuilder.build(); } public static OperationMetrics get() { @@ -73,6 +88,21 @@ public void onEnd(Context context, Attributes endAttributes, long endNanos) { publishDurationHistogram.record( (endNanos - state.startTimeNanos()) / NANOS_PER_S, attributes, context); + + long receiveMessagesCount = getProduceMessagesCount(state.startAttributes(), endAttributes); + if (receiveMessagesCount > 0) { + produceMessageCount.add(receiveMessagesCount, attributes, context); + } + } + + private static long getProduceMessagesCount(Attributes... attributesList) { + for (Attributes attributes : attributesList) { + Long value = attributes.get(MESSAGING_BATCH_MESSAGE_COUNT); + if (value != null) { + return value; + } + } + return 0; } @AutoValue diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/build.gradle.kts b/instrumentation/pulsar/pulsar-2.8/javaagent/build.gradle.kts index a0f01abb60b3..c1c041b34335 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/build.gradle.kts +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/build.gradle.kts @@ -13,6 +13,7 @@ muzzle { dependencies { library("org.apache.pulsar:pulsar-client:2.8.0") + implementation(project(":instrumentation:pulsar:pulsar-common:javaagent")) testImplementation("javax.annotation:javax.annotation-api:1.3.2") testImplementation("org.testcontainers:pulsar") diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerBaseInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerBaseInstrumentation.java index dc2e3baea918..c755e64e4092 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerBaseInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerBaseInstrumentation.java @@ -11,7 +11,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.MessageListenerContext; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.MessageListenerContext; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java index 0919440dc11e..3d90ce114370 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java @@ -20,6 +20,7 @@ import io.opentelemetry.instrumentation.api.internal.Timer; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.VirtualFieldStore; import java.util.concurrent.CompletableFuture; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/MessageInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/MessageInstrumentation.java index f1ff1814b4f7..654ac19920ff 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/MessageInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/MessageInstrumentation.java @@ -12,6 +12,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.VirtualFieldStore; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/MessageListenerInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/MessageListenerInstrumentation.java index d0b8dfe6a8ff..4a8d2419058c 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/MessageListenerInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/MessageListenerInstrumentation.java @@ -15,7 +15,8 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.VirtualFieldStore; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.implementation.bytecode.assign.Assigner; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerImplInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerImplInstrumentation.java index 79211824e317..6466ffcd7913 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerImplInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerImplInstrumentation.java @@ -16,7 +16,8 @@ import io.opentelemetry.context.Context; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.VirtualFieldStore; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackInstrumentation.java index 85295042e335..23acf3766d7c 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackInstrumentation.java @@ -15,7 +15,9 @@ import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.SendCallbackData; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.VirtualFieldStore; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchRequest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchRequest.java index ae1c81cf188c..65aa5c96a469 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchRequest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchRequest.java @@ -5,9 +5,10 @@ package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry; -import static io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.UrlParser.parseUrl; +import static io.opentelemetry.javaagent.instrumentation.pulsar.common.UrlParser.parseUrl; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.UrlParser.UrlData; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.UrlParser.UrlData; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.BasePulsarRequest; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Messages; import org.apache.pulsar.common.naming.TopicName; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchRequestSpanLinksExtractor.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchRequestSpanLinksExtractor.java index 9f93fab83e25..cc1dfbc51f66 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchRequestSpanLinksExtractor.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchRequestSpanLinksExtractor.java @@ -10,6 +10,8 @@ import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksBuilder; import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor; import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.MessageTextMapGetter; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; import org.apache.pulsar.client.api.Message; final class PulsarBatchRequestSpanLinksExtractor implements SpanLinksExtractor { diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarMessagingAttributesGetter.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarMessagingAttributesGetter.java index 08492480c143..0c1be0eb0121 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarMessagingAttributesGetter.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarMessagingAttributesGetter.java @@ -9,6 +9,7 @@ import static java.util.Collections.singletonList; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; import java.util.List; import javax.annotation.Nullable; import org.apache.pulsar.client.api.Message; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java index c26022efea02..3fcc0701a9b3 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java @@ -27,7 +27,13 @@ import io.opentelemetry.instrumentation.api.semconv.network.ServerAttributesExtractor; import io.opentelemetry.javaagent.bootstrap.internal.AgentInstrumentationConfig; import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.VirtualFieldStore; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.VirtualFieldStore; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.ExperimentalProducerAttributesExtractor; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.MessageListenerContext; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.MessageTextMapGetter; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.MessageTextMapSetter; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarNetClientAttributesGetter; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; import java.util.List; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.Consumer; diff --git a/instrumentation/pulsar/pulsar-4.0/javaagent/build.gradle.kts b/instrumentation/pulsar/pulsar-4.0/javaagent/build.gradle.kts new file mode 100644 index 000000000000..6a6e60cc59b0 --- /dev/null +++ b/instrumentation/pulsar/pulsar-4.0/javaagent/build.gradle.kts @@ -0,0 +1,22 @@ +plugins { + id("otel.javaagent-instrumentation") +} + +muzzle { + pass { + group.set("org.apache.pulsar") + module.set("pulsar-client") + versions.set("[4.0.0,)") + assertInverse.set(true) + } +} + +dependencies { + library("org.apache.pulsar:pulsar-client:4.0.0") + implementation("com.google.code.findbugs:findbugs-annotations:3.0.1") + implementation(project(":instrumentation:pulsar:pulsar-common:javaagent")) + + testImplementation("javax.annotation:javax.annotation-api:1.3.2") + testImplementation("org.testcontainers:pulsar") + testImplementation("org.apache.pulsar:pulsar-client-admin:4.0.0") +} diff --git a/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/ProducerImplInstrumentation.java b/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/ProducerImplInstrumentation.java new file mode 100644 index 000000000000..81ddfa2820cb --- /dev/null +++ b/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/ProducerImplInstrumentation.java @@ -0,0 +1,85 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v4_0; + +import static io.opentelemetry.javaagent.instrumentation.pulsar.v4_0.telemetry.PulsarSingletons.producerInstrumenter; +import static net.bytebuddy.matcher.ElementMatchers.hasSuperType; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import io.opentelemetry.context.Context; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.VirtualFieldStore; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.SendCallback; + +public class ProducerImplInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("org.apache.pulsar.client.impl.ProducerImpl"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isConstructor() + .and(isPublic()) + .and( + takesArgument(0, hasSuperType(named("org.apache.pulsar.client.api.PulsarClient")))), + ProducerImplInstrumentation.class.getName() + "$ProducerImplConstructorAdvice"); + transformer.applyAdviceToMethod( + isMethod() + .and(named("sendAsync")) + .and(takesArgument(1, named("org.apache.pulsar.client.impl.SendCallback"))), + ProducerImplInstrumentation.class.getName() + "$ProducerSendAsyncMethodAdvice"); + } + + @SuppressWarnings("unused") + public static class ProducerImplConstructorAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void intercept( + @Advice.This ProducerImpl producer, @Advice.Argument(value = 0) PulsarClient client) { + PulsarClientImpl pulsarClient = (PulsarClientImpl) client; + String brokerUrl = pulsarClient.getLookup().getServiceUrl(); + String topic = producer.getTopic(); + VirtualFieldStore.inject(producer, brokerUrl, topic); + } + } + + @SuppressWarnings("unused") + public static class ProducerSendAsyncMethodAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void before( + @Advice.This ProducerImpl producer, + @Advice.Argument(value = 0) Message message, + @Advice.Argument(value = 1) SendCallback callback) { + Context parent = Context.current(); + PulsarRequest request = PulsarRequest.create(message, VirtualFieldStore.extract(producer)); + + if (!producerInstrumenter().shouldStart(parent, request)) { + return; + } + + Context context = producerInstrumenter().start(parent, request); + // Inject the context/request into the SendCallback. This will be extracted and used when the + // message is sent and the callback is invoked. see `SendCallbackInstrumentation`. + VirtualFieldStore.inject(callback, context, request); + } + } +} diff --git a/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/PulsarInstrumentationModule.java b/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/PulsarInstrumentationModule.java new file mode 100644 index 000000000000..f6c9f7844b73 --- /dev/null +++ b/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/PulsarInstrumentationModule.java @@ -0,0 +1,24 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v4_0; + +import com.google.auto.service.AutoService; +import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import java.util.Arrays; +import java.util.List; + +@AutoService(InstrumentationModule.class) +public class PulsarInstrumentationModule extends InstrumentationModule { + public PulsarInstrumentationModule() { + super("pulsar", "pulsar-4.0"); + } + + @Override + public List typeInstrumentations() { + return Arrays.asList(new ProducerImplInstrumentation(), new SendCallbackInstrumentation()); + } +} diff --git a/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/SendCallbackInstrumentation.java b/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/SendCallbackInstrumentation.java new file mode 100644 index 000000000000..429127417571 --- /dev/null +++ b/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/SendCallbackInstrumentation.java @@ -0,0 +1,80 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v4_0; + +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasSuperType; +import static io.opentelemetry.javaagent.instrumentation.pulsar.v4_0.telemetry.PulsarSingletons.producerInstrumenter; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.named; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.SendCallbackData; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.VirtualFieldStore; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.pulsar.client.impl.OpSendMsgStats; +import org.apache.pulsar.client.impl.SendCallback; + +public class SendCallbackInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher classLoaderOptimization() { + return hasClassesNamed("org.apache.pulsar.client.impl.SendCallback"); + } + + @Override + public ElementMatcher typeMatcher() { + return hasSuperType(named("org.apache.pulsar.client.impl.SendCallback")); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isMethod().and(named("sendComplete")), + SendCallbackInstrumentation.class.getName() + "$SendCallbackSendCompleteAdvice"); + } + + @SuppressWarnings("unused") + public static class SendCallbackSendCompleteAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.This SendCallback callback, + @Advice.Local("otelContext") Context otelContext, + @Advice.Local("otelScope") Scope otelScope, + @Advice.Local("otelRequest") PulsarRequest request) { + // Extract the Context and PulsarRequest from the SendCallback instance. + SendCallbackData callBackData = VirtualFieldStore.extract(callback); + if (callBackData != null) { + // If the extraction was successful, store the Context and PulsarRequest in local variables. + otelContext = callBackData.context; + request = callBackData.request; + otelScope = otelContext.makeCurrent(); + } + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit( + @Advice.Argument(0) Throwable t, + @Advice.Argument(1) OpSendMsgStats opSendMsgStats, + @Advice.Local("otelContext") Context otelContext, + @Advice.Local("otelScope") Scope otelScope, + @Advice.Local("otelRequest") PulsarRequest request) { + if (otelScope != null) { + // Close the Scope and end the span. + otelScope.close(); + request.setProduceNumMessages(opSendMsgStats.getNumMessagesInBatch()); + producerInstrumenter().end(otelContext, request, null, t); + } + } + } +} diff --git a/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/telemetry/PulsarMessagingAttributesGetter.java b/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/telemetry/PulsarMessagingAttributesGetter.java new file mode 100644 index 000000000000..e0ea9b075bec --- /dev/null +++ b/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/telemetry/PulsarMessagingAttributesGetter.java @@ -0,0 +1,102 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v4_0.telemetry; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; + +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.common.naming.TopicName; + +enum PulsarMessagingAttributesGetter implements MessagingAttributesGetter { + INSTANCE; + + @Override + public String getSystem(PulsarRequest request) { + return "pulsar"; + } + + @Nullable + @Override + public String getDestination(PulsarRequest request) { + return request.getDestination(); + } + + @Nullable + @Override + public String getDestinationTemplate(PulsarRequest request) { + return null; + } + + @Override + public boolean isTemporaryDestination(PulsarRequest request) { + return false; + } + + @Override + public boolean isAnonymousDestination(PulsarRequest request) { + return false; + } + + @Nullable + @Override + public String getConversationId(PulsarRequest message) { + return null; + } + + @Override + public Long getMessageBodySize(PulsarRequest request) { + return (long) request.getMessage().size(); + } + + @Nullable + @Override + public Long getMessageEnvelopeSize(PulsarRequest request) { + return null; + } + + @Nullable + @Override + public String getMessageId(PulsarRequest request, @Nullable Void response) { + Message message = request.getMessage(); + if (message.getMessageId() != null) { + return message.getMessageId().toString(); + } + + return null; + } + + @Nullable + @Override + public String getClientId(PulsarRequest request) { + return null; + } + + @Override + public Long getBatchMessageCount(PulsarRequest request, @Nullable Void unused) { + return (long) request.getProduceNumMessages(); + } + + @Nullable + @Override + public String getDestinationPartitionId(PulsarRequest request) { + int partitionIndex = TopicName.getPartitionIndex(request.getDestination()); + if (partitionIndex == -1) { + return null; + } + return String.valueOf(partitionIndex); + } + + @Override + public List getMessageHeader(PulsarRequest request, String name) { + String value = request.getMessage().getProperty(name); + return value != null ? singletonList(value) : emptyList(); + } +} diff --git a/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/telemetry/PulsarSingletons.java b/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/telemetry/PulsarSingletons.java new file mode 100644 index 000000000000..da3dbe7a4acd --- /dev/null +++ b/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/telemetry/PulsarSingletons.java @@ -0,0 +1,57 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v4_0.telemetry; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingProducerMetrics; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; +import io.opentelemetry.instrumentation.api.semconv.network.ServerAttributesExtractor; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.MessageTextMapSetter; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarNetClientAttributesGetter; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; + +public class PulsarSingletons { + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.pulsar-4.0"; + private static final OpenTelemetry TELEMETRY = GlobalOpenTelemetry.get(); + private static final Instrumenter PRODUCER_INSTRUMENTER = + createProducerInstrumenter(); + + public static Instrumenter producerInstrumenter() { + return PRODUCER_INSTRUMENTER; + } + + private static Instrumenter createProducerInstrumenter() { + MessagingAttributesGetter getter = + PulsarMessagingAttributesGetter.INSTANCE; + + InstrumenterBuilder builder = + Instrumenter.builder( + TELEMETRY, + INSTRUMENTATION_NAME, + MessagingSpanNameExtractor.create(getter, MessageOperation.PUBLISH)) + .addAttributesExtractor( + createMessagingAttributesExtractor(getter, MessageOperation.PUBLISH)) + .addAttributesExtractor( + ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter())) + .addOperationMetrics(MessagingProducerMetrics.get()); + + return builder.buildProducerInstrumenter(MessageTextMapSetter.INSTANCE); + } + + private static AttributesExtractor createMessagingAttributesExtractor( + MessagingAttributesGetter getter, MessageOperation operation) { + return MessagingAttributesExtractor.builder(getter, operation).build(); + } + + private PulsarSingletons() {} +} diff --git a/instrumentation/pulsar/pulsar-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/PulsarClientTest.java b/instrumentation/pulsar/pulsar-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/PulsarClientTest.java new file mode 100644 index 000000000000..d6202d6647c1 --- /dev/null +++ b/instrumentation/pulsar/pulsar-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/PulsarClientTest.java @@ -0,0 +1,152 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v4_0; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import java.time.Duration; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.PulsarContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerImageName; + +class PulsarClientTest { + + private static final Logger logger = LoggerFactory.getLogger(PulsarClientTest.class); + + private static final DockerImageName DEFAULT_IMAGE_NAME = + DockerImageName.parse("apachepulsar/pulsar:2.8.0"); + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + private static PulsarContainer pulsar; + static PulsarClient client; + static PulsarAdmin admin; + static Producer producer; + static Consumer consumer; + + static String brokerHost; + static int brokerPort; + + static final double[] DURATION_BUCKETS = + new double[] { + 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0 + }; + + @BeforeAll + static void beforeAll() throws PulsarClientException { + pulsar = + new PulsarContainer(DEFAULT_IMAGE_NAME) + .withEnv("PULSAR_MEM", "-Xmx128m") + .withLogConsumer(new Slf4jLogConsumer(logger)) + .withStartupTimeout(Duration.ofMinutes(2)) + .withTransactions(); + pulsar.start(); + + brokerHost = pulsar.getHost(); + brokerPort = pulsar.getMappedPort(6650); + client = + PulsarClient.builder() + .serviceUrl(pulsar.getPulsarBrokerUrl()) + .openTelemetry(OpenTelemetry.noop()) + .enableTransaction(true) + .build(); + admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build(); + } + + @AfterEach + void afterEach() throws PulsarClientException { + if (producer != null) { + producer.close(); + } + if (consumer != null) { + consumer.close(); + } + } + + @AfterAll + static void afterAll() throws PulsarClientException { + if (client != null) { + client.close(); + } + if (admin != null) { + admin.close(); + } + pulsar.close(); + } + + @Test + void testProduceBatch() throws Exception { + String topic = "persistent://public/default/testProduceBatch"; + admin.topics().createNonPartitionedTopic(topic); + producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(true).create(); + + String msg = "test"; + int sendCount = 10; + for (int i = 0; i < sendCount; i++) { + producer.send(msg); + } + + assertThat(testing.metrics()) + .satisfiesExactlyInAnyOrder( + metric -> + assertThat(metric) + .hasName("messaging.publish.duration") + .hasUnit("s") + .hasDescription("Measures the duration of publish operation.") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSumGreaterThan(0.0) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)) + .hasBucketBoundaries(DURATION_BUCKETS))), + metric -> + assertThat(metric) + .hasName("messaging.client.sent.messages") + .hasUnit("{message}") + .hasDescription("Number of messages producer attempted to send to the broker.") + .hasLongSumSatisfying( + sum -> { + sum.hasPointsSatisfying( + point -> { + point + .hasValue(sendCount) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)); + }); + })); + } +} diff --git a/instrumentation/pulsar/pulsar-common/javaagent/build.gradle.kts b/instrumentation/pulsar/pulsar-common/javaagent/build.gradle.kts new file mode 100644 index 000000000000..a2a38d2eb768 --- /dev/null +++ b/instrumentation/pulsar/pulsar-common/javaagent/build.gradle.kts @@ -0,0 +1,7 @@ +plugins { + id("otel.javaagent-instrumentation") +} + +dependencies { + library("org.apache.pulsar:pulsar-client:2.8.0") +} diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerData.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/ProducerData.java similarity index 85% rename from instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerData.java rename to instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/ProducerData.java index c7826e1f1293..65f0b80ea633 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerData.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/ProducerData.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8; +package io.opentelemetry.javaagent.instrumentation.pulsar.common; public final class ProducerData { public final String url; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackData.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/SendCallbackData.java similarity index 76% rename from instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackData.java rename to instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/SendCallbackData.java index 462843d70e31..21da6a76c7d4 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackData.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/SendCallbackData.java @@ -3,10 +3,10 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8; +package io.opentelemetry.javaagent.instrumentation.pulsar.common; import io.opentelemetry.context.Context; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; public final class SendCallbackData { public final Context context; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/UrlParser.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/UrlParser.java similarity index 95% rename from instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/UrlParser.java rename to instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/UrlParser.java index cc04d9083f6f..bcbb09515af3 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/UrlParser.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/UrlParser.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8; +package io.opentelemetry.javaagent.instrumentation.pulsar.common; public class UrlParser { diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/VirtualFieldStore.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/VirtualFieldStore.java similarity index 94% rename from instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/VirtualFieldStore.java rename to instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/VirtualFieldStore.java index 9ec84944feaf..fc830488680b 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/VirtualFieldStore.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/VirtualFieldStore.java @@ -3,11 +3,11 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8; +package io.opentelemetry.javaagent.instrumentation.pulsar.common; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.util.VirtualField; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/BasePulsarRequest.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/BasePulsarRequest.java similarity index 73% rename from instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/BasePulsarRequest.java rename to instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/BasePulsarRequest.java index 0d12e97019db..453684f15227 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/BasePulsarRequest.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/BasePulsarRequest.java @@ -3,9 +3,9 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry; +package io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.UrlParser.UrlData; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.UrlParser.UrlData; public class BasePulsarRequest { diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/ExperimentalProducerAttributesExtractor.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/ExperimentalProducerAttributesExtractor.java similarity index 90% rename from instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/ExperimentalProducerAttributesExtractor.java rename to instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/ExperimentalProducerAttributesExtractor.java index c3f4801b998f..1874f5d8f05a 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/ExperimentalProducerAttributesExtractor.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/ExperimentalProducerAttributesExtractor.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry; +package io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.AttributesBuilder; @@ -15,7 +15,7 @@ import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.api.proto.MessageMetadata; -enum ExperimentalProducerAttributesExtractor implements AttributesExtractor { +public enum ExperimentalProducerAttributesExtractor implements AttributesExtractor { INSTANCE; private static final AttributeKey MESSAGE_TYPE = diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/MessageListenerContext.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/MessageListenerContext.java similarity index 92% rename from instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/MessageListenerContext.java rename to instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/MessageListenerContext.java index 7bcea2245431..7828502b9701 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/MessageListenerContext.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/MessageListenerContext.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry; +package io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry; /** * Helper class used to determine whether message is going to be processed by a listener. If we know diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/MessageTextMapGetter.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/MessageTextMapGetter.java similarity index 76% rename from instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/MessageTextMapGetter.java rename to instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/MessageTextMapGetter.java index 931dfa11e33b..bff75aaf1eb1 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/MessageTextMapGetter.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/MessageTextMapGetter.java @@ -3,12 +3,12 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry; +package io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry; import io.opentelemetry.context.propagation.TextMapGetter; import javax.annotation.Nullable; -enum MessageTextMapGetter implements TextMapGetter { +public enum MessageTextMapGetter implements TextMapGetter { INSTANCE; @Override diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/MessageTextMapSetter.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/MessageTextMapSetter.java similarity index 79% rename from instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/MessageTextMapSetter.java rename to instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/MessageTextMapSetter.java index 8b892f02fbef..cc5d336fe4b4 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/MessageTextMapSetter.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/MessageTextMapSetter.java @@ -3,13 +3,14 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry; +package io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry; import io.opentelemetry.context.propagation.TextMapSetter; -import javax.annotation.Nullable; import org.apache.pulsar.client.impl.MessageImpl; -enum MessageTextMapSetter implements TextMapSetter { +import javax.annotation.Nullable; + +public enum MessageTextMapSetter implements TextMapSetter { INSTANCE; @Override diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarNetClientAttributesGetter.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/PulsarNetClientAttributesGetter.java similarity index 89% rename from instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarNetClientAttributesGetter.java rename to instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/PulsarNetClientAttributesGetter.java index 91391d088cac..e5cb68397ca0 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarNetClientAttributesGetter.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/PulsarNetClientAttributesGetter.java @@ -3,9 +3,10 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry; +package io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry; import io.opentelemetry.instrumentation.api.semconv.network.ServerAttributesGetter; + import javax.annotation.Nullable; public final class PulsarNetClientAttributesGetter diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarRequest.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/PulsarRequest.java similarity index 63% rename from instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarRequest.java rename to instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/PulsarRequest.java index efb9d34f3f53..cbc4d7b3229f 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarRequest.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/PulsarRequest.java @@ -3,18 +3,19 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry; +package io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry; -import static io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.UrlParser.parseUrl; - -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.ProducerData; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.UrlParser.UrlData; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.ProducerData; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.UrlParser; import org.apache.pulsar.client.api.Message; +import static io.opentelemetry.javaagent.instrumentation.pulsar.common.UrlParser.parseUrl; + public final class PulsarRequest extends BasePulsarRequest { private final Message message; + private int produceNumMessages; - private PulsarRequest(Message message, String destination, UrlData urlData) { + private PulsarRequest(Message message, String destination, UrlParser.UrlData urlData) { super(destination, urlData); this.message = message; } @@ -27,7 +28,7 @@ public static PulsarRequest create(Message message, String url) { return new PulsarRequest(message, message.getTopicName(), parseUrl(url)); } - public static PulsarRequest create(Message message, UrlData urlData) { + public static PulsarRequest create(Message message, UrlParser.UrlData urlData) { return new PulsarRequest(message, message.getTopicName(), urlData); } @@ -38,4 +39,13 @@ public static PulsarRequest create(Message message, ProducerData producerData public Message getMessage() { return message; } + + + public int getProduceNumMessages() { + return produceNumMessages; + } + + public void setProduceNumMessages(int produceNumMessages) { + this.produceNumMessages = produceNumMessages; + } } diff --git a/settings.gradle.kts b/settings.gradle.kts index 0bdfc02339ff..4c730d9cb47c 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -457,6 +457,8 @@ include(":instrumentation:play:play-ws:play-ws-common:testing") include(":instrumentation:powerjob-4.0:javaagent") include(":instrumentation:pulsar:pulsar-2.8:javaagent") include(":instrumentation:pulsar:pulsar-2.8:javaagent-unit-tests") +include(":instrumentation:pulsar:pulsar-4.0:javaagent") +include(":instrumentation:pulsar:pulsar-common:javaagent") include(":instrumentation:quarkus-resteasy-reactive:common-testing") include(":instrumentation:quarkus-resteasy-reactive:javaagent") include(":instrumentation:quarkus-resteasy-reactive:quarkus2-testing") From e25cde23bd286e49efef0978da079a2392773cb3 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Thu, 27 Mar 2025 16:53:18 +0800 Subject: [PATCH 13/20] fix spotlessApply --- .../api/incubator/semconv/messaging/MessagingMetricsAdvice.java | 1 + 1 file changed, 1 insertion(+) diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java index df4866f5f1ca..f2e1528f5b05 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java @@ -65,6 +65,7 @@ static void applyReceiveMessagesAdvice(LongCounterBuilder builder) { } ((ExtendedLongCounterBuilder) builder).setAttributesAdvice(MESSAGING_ATTRIBUTES); } + static void applyProduceMessagesAdvice(LongCounterBuilder builder) { if (!(builder instanceof ExtendedLongCounterBuilder)) { return; From 907d9669e40c0c498c324e55fe0d2a510bfb1d7f Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Thu, 27 Mar 2025 16:55:48 +0800 Subject: [PATCH 14/20] fix ci --- instrumentation/pulsar/pulsar-4.0/javaagent/build.gradle.kts | 1 - 1 file changed, 1 deletion(-) diff --git a/instrumentation/pulsar/pulsar-4.0/javaagent/build.gradle.kts b/instrumentation/pulsar/pulsar-4.0/javaagent/build.gradle.kts index 6a6e60cc59b0..ffa17b68ae40 100644 --- a/instrumentation/pulsar/pulsar-4.0/javaagent/build.gradle.kts +++ b/instrumentation/pulsar/pulsar-4.0/javaagent/build.gradle.kts @@ -13,7 +13,6 @@ muzzle { dependencies { library("org.apache.pulsar:pulsar-client:4.0.0") - implementation("com.google.code.findbugs:findbugs-annotations:3.0.1") implementation(project(":instrumentation:pulsar:pulsar-common:javaagent")) testImplementation("javax.annotation:javax.annotation-api:1.3.2") From a895286d5e41bd6423ff5969359e7a8749266862 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Thu, 27 Mar 2025 16:59:58 +0800 Subject: [PATCH 15/20] fix ci --- .fossa.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.fossa.yml b/.fossa.yml index ce66de0fded9..e6a2150ddaef 100644 --- a/.fossa.yml +++ b/.fossa.yml @@ -772,6 +772,12 @@ targets: - type: gradle path: ./ target: ':instrumentation:pulsar:pulsar-2.8:javaagent' + - type: gradle + path: ./ + target: ':instrumentation:pulsar:pulsar-4.0:javaagent' + - type: gradle + path: ./ + target: ':instrumentation:pulsar:pulsar-common:javaagent' - type: gradle path: ./ target: ':instrumentation:ratpack:ratpack-1.4:javaagent' From be4969c5072c24a1204a717ccc886d28f5d66ce4 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Thu, 27 Mar 2025 17:03:50 +0800 Subject: [PATCH 16/20] fix ci --- .../telemetry/ExperimentalProducerAttributesExtractor.java | 3 ++- .../pulsar/common/telemetry/MessageTextMapSetter.java | 3 +-- .../common/telemetry/PulsarNetClientAttributesGetter.java | 1 - .../pulsar/common/telemetry/PulsarRequest.java | 5 ++--- 4 files changed, 5 insertions(+), 7 deletions(-) diff --git a/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/ExperimentalProducerAttributesExtractor.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/ExperimentalProducerAttributesExtractor.java index 1874f5d8f05a..9ab1e287b342 100644 --- a/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/ExperimentalProducerAttributesExtractor.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/ExperimentalProducerAttributesExtractor.java @@ -15,7 +15,8 @@ import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.api.proto.MessageMetadata; -public enum ExperimentalProducerAttributesExtractor implements AttributesExtractor { +public enum ExperimentalProducerAttributesExtractor + implements AttributesExtractor { INSTANCE; private static final AttributeKey MESSAGE_TYPE = diff --git a/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/MessageTextMapSetter.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/MessageTextMapSetter.java index cc5d336fe4b4..b9b3d9af6bdc 100644 --- a/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/MessageTextMapSetter.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/MessageTextMapSetter.java @@ -6,9 +6,8 @@ package io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry; import io.opentelemetry.context.propagation.TextMapSetter; -import org.apache.pulsar.client.impl.MessageImpl; - import javax.annotation.Nullable; +import org.apache.pulsar.client.impl.MessageImpl; public enum MessageTextMapSetter implements TextMapSetter { INSTANCE; diff --git a/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/PulsarNetClientAttributesGetter.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/PulsarNetClientAttributesGetter.java index e5cb68397ca0..7b00c7972a9b 100644 --- a/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/PulsarNetClientAttributesGetter.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/PulsarNetClientAttributesGetter.java @@ -6,7 +6,6 @@ package io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry; import io.opentelemetry.instrumentation.api.semconv.network.ServerAttributesGetter; - import javax.annotation.Nullable; public final class PulsarNetClientAttributesGetter diff --git a/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/PulsarRequest.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/PulsarRequest.java index cbc4d7b3229f..e7d5850d4b24 100644 --- a/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/PulsarRequest.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/PulsarRequest.java @@ -5,12 +5,12 @@ package io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry; +import static io.opentelemetry.javaagent.instrumentation.pulsar.common.UrlParser.parseUrl; + import io.opentelemetry.javaagent.instrumentation.pulsar.common.ProducerData; import io.opentelemetry.javaagent.instrumentation.pulsar.common.UrlParser; import org.apache.pulsar.client.api.Message; -import static io.opentelemetry.javaagent.instrumentation.pulsar.common.UrlParser.parseUrl; - public final class PulsarRequest extends BasePulsarRequest { private final Message message; private int produceNumMessages; @@ -40,7 +40,6 @@ public Message getMessage() { return message; } - public int getProduceNumMessages() { return produceNumMessages; } From 69680a6e2feefde3e55ef6f4afff9f5850621015 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Thu, 27 Mar 2025 17:19:03 +0800 Subject: [PATCH 17/20] fix ci --- .../spring/spring-pulsar-1.0/javaagent/build.gradle.kts | 2 +- .../DefaultPulsarMessageListenerContainerInstrumentation.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/instrumentation/spring/spring-pulsar-1.0/javaagent/build.gradle.kts b/instrumentation/spring/spring-pulsar-1.0/javaagent/build.gradle.kts index 9364d6f73d9e..e7267a746cab 100644 --- a/instrumentation/spring/spring-pulsar-1.0/javaagent/build.gradle.kts +++ b/instrumentation/spring/spring-pulsar-1.0/javaagent/build.gradle.kts @@ -14,7 +14,7 @@ muzzle { dependencies { library("org.springframework.pulsar:spring-pulsar:1.0.0") - implementation(project(":instrumentation:pulsar:pulsar-2.8:javaagent")) + implementation(project(":instrumentation:pulsar:pulsar-common:javaagent")) testInstrumentation(project(":instrumentation:pulsar:pulsar-2.8:javaagent")) diff --git a/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/DefaultPulsarMessageListenerContainerInstrumentation.java b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/DefaultPulsarMessageListenerContainerInstrumentation.java index 5952cefaf5f0..e0b4d8ecbeaf 100644 --- a/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/DefaultPulsarMessageListenerContainerInstrumentation.java +++ b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/DefaultPulsarMessageListenerContainerInstrumentation.java @@ -14,7 +14,7 @@ import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.VirtualFieldStore; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.VirtualFieldStore; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; From b7f3b9382f7765b4636d9da84e134040e0a8bf0a Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Thu, 27 Mar 2025 17:36:01 +0800 Subject: [PATCH 18/20] fix ci --- .../pulsar/pulsar-2.8/javaagent-unit-tests/build.gradle.kts | 1 + .../javaagent/instrumentation/pulsar/v2_8/UrlParserTest.java | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent-unit-tests/build.gradle.kts b/instrumentation/pulsar/pulsar-2.8/javaagent-unit-tests/build.gradle.kts index 4a143d7886c7..53082fb85cdc 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent-unit-tests/build.gradle.kts +++ b/instrumentation/pulsar/pulsar-2.8/javaagent-unit-tests/build.gradle.kts @@ -4,4 +4,5 @@ plugins { dependencies { testImplementation(project(":instrumentation:pulsar:pulsar-2.8:javaagent")) + testImplementation(project(":instrumentation:pulsar:pulsar-common:javaagent")) } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent-unit-tests/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/UrlParserTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent-unit-tests/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/UrlParserTest.java index bb109a99c6f6..19f98c595eee 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent-unit-tests/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/UrlParserTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent-unit-tests/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/UrlParserTest.java @@ -7,7 +7,8 @@ import static org.assertj.core.api.Assertions.assertThat; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.UrlParser.UrlData; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.UrlParser; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.UrlParser.UrlData; import org.junit.jupiter.api.Test; public class UrlParserTest { From 0ca2c678b130eff20691b51b155efa4a2844d42c Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Thu, 27 Mar 2025 23:22:06 +0800 Subject: [PATCH 19/20] add jersey-client --- instrumentation/pulsar/pulsar-4.0/javaagent/build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/pulsar/pulsar-4.0/javaagent/build.gradle.kts b/instrumentation/pulsar/pulsar-4.0/javaagent/build.gradle.kts index ffa17b68ae40..8e6d59a9961e 100644 --- a/instrumentation/pulsar/pulsar-4.0/javaagent/build.gradle.kts +++ b/instrumentation/pulsar/pulsar-4.0/javaagent/build.gradle.kts @@ -14,8 +14,8 @@ muzzle { dependencies { library("org.apache.pulsar:pulsar-client:4.0.0") implementation(project(":instrumentation:pulsar:pulsar-common:javaagent")) + implementation("org.glassfish.jersey.core:jersey-client:2.42") - testImplementation("javax.annotation:javax.annotation-api:1.3.2") testImplementation("org.testcontainers:pulsar") testImplementation("org.apache.pulsar:pulsar-client-admin:4.0.0") } From e5baeb38f90bb8d6de33a98320b9ec84dc7e047c Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Thu, 27 Mar 2025 23:51:16 +0800 Subject: [PATCH 20/20] remove jersey-client --- instrumentation/pulsar/pulsar-4.0/javaagent/build.gradle.kts | 1 - 1 file changed, 1 deletion(-) diff --git a/instrumentation/pulsar/pulsar-4.0/javaagent/build.gradle.kts b/instrumentation/pulsar/pulsar-4.0/javaagent/build.gradle.kts index 8e6d59a9961e..0e5b63b28996 100644 --- a/instrumentation/pulsar/pulsar-4.0/javaagent/build.gradle.kts +++ b/instrumentation/pulsar/pulsar-4.0/javaagent/build.gradle.kts @@ -14,7 +14,6 @@ muzzle { dependencies { library("org.apache.pulsar:pulsar-client:4.0.0") implementation(project(":instrumentation:pulsar:pulsar-common:javaagent")) - implementation("org.glassfish.jersey.core:jersey-client:2.42") testImplementation("org.testcontainers:pulsar") testImplementation("org.apache.pulsar:pulsar-client-admin:4.0.0")