Skip to content

Commit 0efd3dc

Browse files
committed
Bootstrap NATS instrumentation with Connection.publish(Message) tracing
1 parent 419eb50 commit 0efd3dc

File tree

20 files changed

+1260
-0
lines changed

20 files changed

+1260
-0
lines changed

.fossa.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,12 @@ targets:
691691
- type: gradle
692692
path: ./
693693
target: ':instrumentation:mongo:mongo-async-3.3:javaagent'
694+
- type: gradle
695+
path: ./
696+
target: ':instrumentation:nats:nats-2.21:javaagent'
697+
- type: gradle
698+
path: ./
699+
target: ':instrumentation:nats:nats-2.21:library'
694700
- type: gradle
695701
path: ./
696702
target: ':instrumentation:netty:netty-3.8:javaagent'

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ out/
4545
######################
4646
.vscode
4747
**/bin/
48+
.metals
4849

4950
# Others #
5051
##########
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
plugins {
2+
id("otel.javaagent-instrumentation")
3+
}
4+
5+
muzzle {
6+
pass {
7+
group.set("io.nats")
8+
module.set("jnats")
9+
versions.set("[2.21.0,)")
10+
assertInverse.set(true)
11+
}
12+
}
13+
14+
dependencies {
15+
library("io.nats:jnats:2.21.0")
16+
17+
implementation(project(":instrumentation:nats:nats-2.21:library"))
18+
}
19+
20+
tasks {
21+
test {
22+
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
23+
}
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.nats.v2_21;
7+
8+
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
9+
import static io.opentelemetry.javaagent.instrumentation.nats.v2_21.NatsSingletons.PRODUCER_INSTRUMENTER;
10+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
11+
import static net.bytebuddy.matcher.ElementMatchers.named;
12+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
13+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
14+
15+
import io.nats.client.Connection;
16+
import io.nats.client.Message;
17+
import io.opentelemetry.context.Context;
18+
import io.opentelemetry.context.Scope;
19+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
20+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
21+
import net.bytebuddy.asm.Advice;
22+
import net.bytebuddy.description.type.TypeDescription;
23+
import net.bytebuddy.matcher.ElementMatcher;
24+
25+
public class ConnectionInstrumentation implements TypeInstrumentation {
26+
27+
@Override
28+
public ElementMatcher<TypeDescription> typeMatcher() {
29+
return implementsInterface(named("io.nats.client.Connection"));
30+
}
31+
32+
@Override
33+
public void transform(TypeTransformer transformer) {
34+
transformer.applyAdviceToMethod(
35+
isPublic()
36+
.and(named("publish"))
37+
.and(takesArguments(1))
38+
.and(takesArgument(0, named("io.nats.client.Message"))),
39+
ConnectionInstrumentation.class.getName() + "$PublishAdvice");
40+
}
41+
42+
@SuppressWarnings("unused")
43+
public static class PublishAdvice {
44+
45+
@Advice.OnMethodEnter(suppress = Throwable.class)
46+
public static void onEnter(
47+
@Advice.This Connection connection,
48+
@Advice.Argument(0) Message message,
49+
@Advice.Local("otelContext") Context otelContext,
50+
@Advice.Local("otelScope") Scope otelScope) {
51+
Context parentContext = Context.current();
52+
53+
if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, message)) {
54+
return;
55+
}
56+
57+
otelContext = PRODUCER_INSTRUMENTER.start(parentContext, message);
58+
otelScope = otelContext.makeCurrent();
59+
}
60+
61+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
62+
public static void onExit(
63+
@Advice.Thrown Throwable throwable,
64+
@Advice.This Connection connection,
65+
@Advice.Argument(0) Message message,
66+
@Advice.Local("otelContext") Context otelContext,
67+
@Advice.Local("otelScope") Scope otelScope) {
68+
if (otelScope == null) {
69+
return;
70+
}
71+
72+
otelScope.close();
73+
PRODUCER_INSTRUMENTER.end(otelContext, message, null, throwable);
74+
}
75+
}
76+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.nats.v2_21;
7+
8+
import com.google.auto.service.AutoService;
9+
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
10+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
11+
import java.util.Collections;
12+
import java.util.List;
13+
14+
@AutoService(InstrumentationModule.class)
15+
public class NatsInstrumentationModule extends InstrumentationModule {
16+
17+
public NatsInstrumentationModule() {
18+
super("nats", "nats-2.21");
19+
}
20+
21+
// TODO classLoaderMatcher
22+
23+
@Override
24+
public List<TypeInstrumentation> typeInstrumentations() {
25+
return Collections.singletonList(new ConnectionInstrumentation());
26+
}
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.nats.v2_21;
7+
8+
import static io.opentelemetry.instrumentation.nats.v2_21.internal.NatsInstrumenterFactory.createProducerInstrumenter;
9+
10+
import io.nats.client.Message;
11+
import io.opentelemetry.api.GlobalOpenTelemetry;
12+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
13+
14+
public final class NatsSingletons {
15+
16+
public static final Instrumenter<Message, Void> PRODUCER_INSTRUMENTER =
17+
createProducerInstrumenter(GlobalOpenTelemetry.get());
18+
19+
private NatsSingletons() {}
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.nats.v2_21;
7+
8+
import io.nats.client.Connection;
9+
import io.nats.client.Nats;
10+
import io.nats.client.impl.NatsMessage;
11+
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
12+
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
13+
import java.io.IOException;
14+
import org.junit.jupiter.api.AfterAll;
15+
import org.junit.jupiter.api.BeforeAll;
16+
import org.junit.jupiter.api.Test;
17+
import org.junit.jupiter.api.extension.RegisterExtension;
18+
import org.testcontainers.containers.GenericContainer;
19+
import org.testcontainers.utility.DockerImageName;
20+
21+
class NatsInstrumentationTest {
22+
23+
@RegisterExtension
24+
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
25+
26+
static final DockerImageName natsImage = DockerImageName.parse("nats:2.11.2-alpine3.21");
27+
28+
static final GenericContainer<?> natsContainer =
29+
new GenericContainer<>(natsImage).withExposedPorts(4222);
30+
31+
static String natsUrl;
32+
static Connection connection;
33+
34+
@BeforeAll
35+
static void beforeAll() throws IOException, InterruptedException {
36+
natsContainer.start();
37+
natsUrl = "nats://" + natsContainer.getHost() + ":" + natsContainer.getMappedPort(4222);
38+
connection = Nats.connect(natsUrl);
39+
}
40+
41+
@AfterAll
42+
static void afterAll() throws InterruptedException {
43+
connection.close();
44+
natsContainer.close();
45+
}
46+
47+
@Test
48+
void testConnection() {
49+
// given
50+
NatsMessage message = NatsMessage.builder().subject("sub").build();
51+
52+
// when
53+
testing.runWithSpan("testConnection", () -> connection.publish(message));
54+
55+
// then
56+
testing.waitAndAssertTraces(
57+
trace ->
58+
trace.hasSpansSatisfyingExactly(
59+
span -> span.hasName("testConnection"), span -> span.hasName("sub publish")));
60+
}
61+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
plugins {
2+
id("otel.library-instrumentation")
3+
}
4+
5+
dependencies {
6+
library("io.nats:jnats:2.21.0")
7+
8+
testImplementation(project(":instrumentation:nats:nats-2.21:testing"))
9+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.nats.v2_21;
7+
8+
import io.nats.client.Connection;
9+
import io.nats.client.Message;
10+
import io.opentelemetry.api.OpenTelemetry;
11+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
12+
13+
public final class NatsTelemetry {
14+
15+
public static NatsTelemetry create(OpenTelemetry openTelemetry) {
16+
return new NatsTelemetryBuilder(openTelemetry).build();
17+
}
18+
19+
public static NatsTelemetryBuilder builder(OpenTelemetry openTelemetry) {
20+
return new NatsTelemetryBuilder(openTelemetry);
21+
}
22+
23+
private final Instrumenter<Message, Void> producerInstrumenter;
24+
25+
public NatsTelemetry(Instrumenter<Message, Void> producerInstrumenter) {
26+
this.producerInstrumenter = producerInstrumenter;
27+
}
28+
29+
public OpenTelemetryConnection wrap(Connection connection) {
30+
return new OpenTelemetryConnection(connection, this.producerInstrumenter);
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.nats.v2_21;
7+
8+
import io.opentelemetry.api.OpenTelemetry;
9+
import io.opentelemetry.instrumentation.nats.v2_21.internal.NatsInstrumenterFactory;
10+
11+
public final class NatsTelemetryBuilder {
12+
13+
private final OpenTelemetry openTelemetry;
14+
15+
NatsTelemetryBuilder(OpenTelemetry openTelemetry) {
16+
this.openTelemetry = openTelemetry;
17+
}
18+
19+
public NatsTelemetry build() {
20+
return new NatsTelemetry(NatsInstrumenterFactory.createProducerInstrumenter(openTelemetry));
21+
}
22+
}

0 commit comments

Comments
 (0)