Skip to content

Bootstrap NATS instrumentation with Connection.publish(Message) tracing #13806

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .fossa.yml
Original file line number Diff line number Diff line change
@@ -691,6 +691,12 @@ targets:
- type: gradle
path: ./
target: ':instrumentation:mongo:mongo-async-3.3:javaagent'
- type: gradle
path: ./
target: ':instrumentation:nats:nats-2.21:javaagent'
- type: gradle
path: ./
target: ':instrumentation:nats:nats-2.21:library'
- type: gradle
path: ./
target: ':instrumentation:netty:netty-3.8:javaagent'
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -45,6 +45,7 @@ out/
######################
.vscode
**/bin/
.metals

# Others #
##########
23 changes: 23 additions & 0 deletions instrumentation/nats/nats-2.21/javaagent/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
plugins {
id("otel.javaagent-instrumentation")
}

muzzle {
pass {
group.set("io.nats")
module.set("jnats")
versions.set("[2.21.0,)")
}
}

dependencies {
library("io.nats:jnats:2.21.0")

implementation(project(":instrumentation:nats:nats-2.21:library"))
}

tasks {
test {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.nats.v2_21;

import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
import static io.opentelemetry.javaagent.instrumentation.nats.v2_21.NatsSingletons.PRODUCER_INSTRUMENTER;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.nats.client.Connection;
import io.nats.client.Message;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.nats.v2_21.internal.NatsRequest;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class ConnectionInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return implementsInterface(named("io.nats.client.Connection"));
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isPublic()
.and(named("publish"))
.and(takesArguments(1))
.and(takesArgument(0, named("io.nats.client.Message"))),
ConnectionInstrumentation.class.getName() + "$PublishMessageAdvice");
}

@SuppressWarnings("unused")
public static class PublishMessageAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.This Connection connection,
@Advice.Argument(0) Message message,
@Advice.Local("otelContext") Context otelContext,
@Advice.Local("otelScope") Scope otelScope,
@Advice.Local("natsRequest") NatsRequest natsRequest) {
Context parentContext = Context.current();
natsRequest = NatsRequest.create(connection, message);

if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
return;
}

otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest);
otelScope = otelContext.makeCurrent();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(
@Advice.Thrown Throwable throwable,
@Advice.This Connection connection,
@Advice.Argument(0) Message message,
@Advice.Local("otelContext") Context otelContext,
@Advice.Local("otelScope") Scope otelScope,
@Advice.Local("natsRequest") NatsRequest natsRequest) {
if (otelScope == null) {
return;
}

otelScope.close();
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.nats.v2_21;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.Collections;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class NatsInstrumentationModule extends InstrumentationModule {

public NatsInstrumentationModule() {
super("nats", "nats-2.21");
}

// TODO classLoaderMatcher

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return Collections.singletonList(new ConnectionInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.nats.v2_21;

import static io.opentelemetry.instrumentation.nats.v2_21.internal.NatsInstrumenterFactory.createProducerInstrumenter;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.nats.v2_21.internal.NatsRequest;

public final class NatsSingletons {

public static final Instrumenter<NatsRequest, Void> PRODUCER_INSTRUMENTER =
createProducerInstrumenter(GlobalOpenTelemetry.get());

private NatsSingletons() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.nats.v2_21;

import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
import static org.assertj.core.api.Assertions.assertThat;

import io.nats.client.Connection;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.nats.client.Subscription;
import io.nats.client.impl.Headers;
import io.nats.client.impl.NatsMessage;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import java.io.IOException;
import java.time.Duration;
import java.util.LinkedList;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;

@SuppressWarnings("deprecation") // using deprecated semconv
class NatsInstrumentationTest {

@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();

static final DockerImageName natsImage = DockerImageName.parse("nats:2.11.2-alpine3.21");

static final GenericContainer<?> natsContainer =
new GenericContainer<>(natsImage).withExposedPorts(4222);

static String natsUrl;
static Connection connection;
static Subscription subscription;

static LinkedList<Message> publishedMessages = new LinkedList<>();

@BeforeAll
static void beforeAll() throws IOException, InterruptedException {
natsContainer.start();
natsUrl = "nats://" + natsContainer.getHost() + ":" + natsContainer.getMappedPort(4222);
connection = Nats.connect(natsUrl);
subscription = connection.subscribe("*");
}

@AfterAll
static void afterAll() throws InterruptedException {
subscription.drain(Duration.ofSeconds(10));
connection.close();
natsContainer.close();
}

@Test
void testPublishMessageNoHeaders() throws InterruptedException {
// given
int clientId = connection.getServerInfo().getClientId();
NatsMessage message = NatsMessage.builder().subject("sub").data("x").build();

// when
testing.runWithSpan("testPublishMessage", () -> connection.publish(message));

// then
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("testPublishMessage").hasNoParent(),
span ->
span.hasName("sub publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(MESSAGING_OPERATION, "publish"),
equalTo(MESSAGING_SYSTEM, "nats"),
equalTo(MESSAGING_DESTINATION_NAME, "sub"),
equalTo(MESSAGING_MESSAGE_BODY_SIZE, 1),
equalTo(
AttributeKey.stringKey("messaging.client_id"),
String.valueOf(clientId)))));

// and
Message published = subscription.nextMessage(Duration.ofSeconds(1));
assertThat(published.getHeaders()).isNull();
}

@Test
void testPublishMessageWithHeaders() throws InterruptedException {
// given
int clientId = connection.getServerInfo().getClientId();
NatsMessage message =
NatsMessage.builder().subject("sub").data("x").headers(new Headers()).build();

// when
testing.runWithSpan("testPublishMessage", () -> connection.publish(message));

// then
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("testPublishMessage").hasNoParent(),
span ->
span.hasName("sub publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(MESSAGING_OPERATION, "publish"),
equalTo(MESSAGING_SYSTEM, "nats"),
equalTo(MESSAGING_DESTINATION_NAME, "sub"),
equalTo(MESSAGING_MESSAGE_BODY_SIZE, 1),
equalTo(
AttributeKey.stringKey("messaging.client_id"),
String.valueOf(clientId)))));

// and
Message published = subscription.nextMessage(Duration.ofSeconds(1));
assertThat(published.getHeaders().get("traceparent")).isNotEmpty();
}
}
12 changes: 12 additions & 0 deletions instrumentation/nats/nats-2.21/library/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
plugins {
id("otel.library-instrumentation")
}

dependencies {
library("io.nats:jnats:2.21.0")

compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")

testImplementation(project(":instrumentation:nats:nats-2.21:testing"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.nats.v2_21;

import io.nats.client.Connection;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.nats.v2_21.internal.NatsRequest;

public final class NatsTelemetry {

public static NatsTelemetry create(OpenTelemetry openTelemetry) {
return new NatsTelemetryBuilder(openTelemetry).build();
}

public static NatsTelemetryBuilder builder(OpenTelemetry openTelemetry) {
return new NatsTelemetryBuilder(openTelemetry);
}

private final Instrumenter<NatsRequest, Void> producerInstrumenter;

public NatsTelemetry(Instrumenter<NatsRequest, Void> producerInstrumenter) {
this.producerInstrumenter = producerInstrumenter;
}

public OpenTelemetryConnection wrap(Connection connection) {
return new OpenTelemetryConnection(connection, this.producerInstrumenter);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.nats.v2_21;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.nats.v2_21.internal.NatsInstrumenterFactory;

public final class NatsTelemetryBuilder {

private final OpenTelemetry openTelemetry;

NatsTelemetryBuilder(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
}

public NatsTelemetry build() {
return new NatsTelemetry(NatsInstrumenterFactory.createProducerInstrumenter(openTelemetry));
}
}
Loading
Oops, something went wrong.
Loading
Oops, something went wrong.