Skip to content

Commit

Permalink
feat: add library instrumentation in reactor-netty
Browse files Browse the repository at this point in the history
  • Loading branch information
123liuziming committed Mar 4, 2024
1 parent ecc7c1a commit 0aad27b
Show file tree
Hide file tree
Showing 14 changed files with 262 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies {
implementation(project(":instrumentation:netty:netty-4-common:library"))
implementation(project(":instrumentation:netty:netty-common:library"))
implementation(project(":instrumentation:reactor:reactor-3.1:library"))
implementation(project(":instrumentation:reactor:reactor-netty:reactor-netty-1.0:library"))

library("io.projectreactor.netty:reactor-netty-http:1.0.0")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.opentelemetry.context.Scope;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import io.opentelemetry.instrumentation.reactornetty.v1_0.InstrumentationContexts;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClientInfos;
import reactor.util.context.ContextView;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;

import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorContextKeys.CONTEXTS_HOLDER_KEY;
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorNettySingletons.instrumenter;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.semconv.http.HttpClientRequestResendCount;
import io.opentelemetry.instrumentation.netty.v4_1.NettyClientTelemetry;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import io.opentelemetry.instrumentation.reactornetty.v1_0.InstrumentationContexts;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClient;
Expand All @@ -34,7 +36,7 @@ public static HttpClient.ResponseReceiver<?> instrument(HttpClient.ResponseRecei
HttpClient client = (HttpClient) receiver;
HttpClientConfig config = client.configuration();

InstrumentationContexts instrumentationContexts = new InstrumentationContexts();
InstrumentationContexts instrumentationContexts = new InstrumentationContexts(instrumenter());

HttpClient modified =
client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyClientInstrumenterFactory;
import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyConnectionInstrumentationFlag;
import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyConnectionInstrumenter;
import io.opentelemetry.instrumentation.reactornetty.v1_0.HttpClientRequestHeadersSetter;
import io.opentelemetry.instrumentation.reactornetty.v1_0.ReactorNettyHttpClientAttributesGetter;
import io.opentelemetry.javaagent.bootstrap.internal.CommonConfig;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import reactor.netty.http.client.HttpClientRequest;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
plugins {
id("otel.library-instrumentation")
}

dependencies {
implementation("io.projectreactor.netty:reactor-netty-http:1.1.0")
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
package io.opentelemetry.instrumentation.reactornetty.v1_0;

import io.opentelemetry.context.propagation.TextMapSetter;
import javax.annotation.Nullable;

import io.opentelemetry.context.propagation.TextMapSetter;
import reactor.netty.http.client.HttpClientRequest;

enum HttpClientRequestHeadersSetter implements TextMapSetter<HttpClientRequest> {
public enum HttpClientRequestHeadersSetter implements TextMapSetter<HttpClientRequest> {
INSTANCE;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;

import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorNettySingletons.instrumenter;
package io.opentelemetry.instrumentation.reactornetty.v1_0;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
import io.opentelemetry.instrumentation.api.internal.Timer;
import io.opentelemetry.instrumentation.api.semconv.http.HttpClientRequestResendCount;
Expand All @@ -19,14 +18,14 @@
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;

final class InstrumentationContexts {
public final class InstrumentationContexts {
private static final VirtualField<HttpClientRequest, Context> requestContextVirtualField =
VirtualField.find(HttpClientRequest.class, Context.class);

private static final AtomicReferenceFieldUpdater<InstrumentationContexts, Context>
parentContextUpdater =
AtomicReferenceFieldUpdater.newUpdater(
InstrumentationContexts.class, Context.class, "parentContext");
AtomicReferenceFieldUpdater.newUpdater(
InstrumentationContexts.class, Context.class, "parentContext");

private volatile Context parentContext;
private volatile Timer timer;
Expand All @@ -35,37 +34,43 @@ final class InstrumentationContexts {
// coexisting HTTP client spans
private final Queue<RequestAndContext> clientContexts = new LinkedBlockingQueue<>();

void initialize(Context parentContext) {
private final Instrumenter<HttpClientRequest, HttpClientResponse> instrumenter;

public InstrumentationContexts(Instrumenter<HttpClientRequest, HttpClientResponse> instrumenter) {
this.instrumenter = instrumenter;
}

public void initialize(Context parentContext) {
Context parentContextWithResends = HttpClientRequestResendCount.initialize(parentContext);
// make sure initialization happens only once
if (parentContextUpdater.compareAndSet(this, null, parentContextWithResends)) {
timer = Timer.start();
}
}

Context getParentContext() {
public Context getParentContext() {
return parentContext;
}

@Nullable
Context getClientContext() {
public Context getClientContext() {
RequestAndContext requestAndContext = clientContexts.peek();
return requestAndContext == null ? null : requestAndContext.context;
}

@Nullable
Context startClientSpan(HttpClientRequest request) {
public Context startClientSpan(HttpClientRequest request) {
Context parentContext = this.parentContext;
Context context = null;
if (instrumenter().shouldStart(parentContext, request)) {
context = instrumenter().start(parentContext, request);
if (instrumenter.shouldStart(parentContext, request)) {
context = instrumenter.start(parentContext, request);
requestContextVirtualField.set(request, context);
clientContexts.offer(new RequestAndContext(request, context));
}
return context;
}

void endClientSpan(@Nullable HttpClientResponse response, @Nullable Throwable error) {
public void endClientSpan(@Nullable HttpClientResponse response, @Nullable Throwable error) {
HttpClientRequest request = null;
Context context = null;
RequestAndContext requestAndContext = clientContexts.poll();
Expand All @@ -79,16 +84,16 @@ void endClientSpan(@Nullable HttpClientResponse response, @Nullable Throwable er
}

if (request != null && context != null) {
instrumenter().end(context, request, response, error);
instrumenter.end(context, request, response, error);
}
}

void startAndEndConnectionErrorSpan(HttpClientRequest request, Throwable error) {
public void startAndEndConnectionErrorSpan(HttpClientRequest request, Throwable error) {
Context parentContext = this.parentContext;
if (instrumenter().shouldStart(parentContext, request)) {
if (instrumenter.shouldStart(parentContext, request)) {
Timer timer = this.timer;
InstrumenterUtil.startAndEnd(
instrumenter(), parentContext, request, null, error, timer.startTime(), timer.now());
instrumenter, parentContext, request, null, error, timer.startTime(), timer.now());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
package io.opentelemetry.instrumentation.reactornetty.v1_0;

import io.netty.handler.codec.http.HttpVersion;
import io.opentelemetry.instrumentation.api.semconv.http.HttpClientAttributesGetter;
Expand All @@ -15,9 +15,11 @@
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;

final class ReactorNettyHttpClientAttributesGetter
public final class ReactorNettyHttpClientAttributesGetter
implements HttpClientAttributesGetter<HttpClientRequest, HttpClientResponse> {

public static final ReactorNettyHttpClientAttributesGetter INSTANCE = new ReactorNettyHttpClientAttributesGetter();

@Override
public String getUrlFull(HttpClientRequest request) {
return request.resourceUrl();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.opentelemetry.instrumentation.reactornetty.v1_0;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators;
import java.util.concurrent.atomic.AtomicReference;
import reactor.netty.http.client.HttpClient;

public class ReactorNettyTelemetry {

private final InstrumentationContexts contexts;

private final ContextPropagators propagators;

ReactorNettyTelemetry(InstrumentationContexts contexts, ContextPropagators propagators) {
this.contexts = contexts;
this.propagators = propagators;
}

public HttpClient tracingHttpClient(HttpClient httpClient) {
AtomicReference<Context> clientContext = new AtomicReference<>();
return httpClient.doOnRequest((request, connection) -> {
// create span
contexts.startClientSpan(request);
propagators.getTextMapPropagator().inject(clientContext.get(), request, HttpClientRequestHeadersSetter.INSTANCE);
}).doOnResponse((response, connection) -> {
contexts.endClientSpan(response, null);
}).doOnResponseError(contexts::endClientSpan);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.reactornetty.v1_0;

import static io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor.alwaysClient;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.incubator.semconv.http.HttpClientExperimentalMetrics;
import io.opentelemetry.instrumentation.api.incubator.semconv.http.HttpExperimentalAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.semconv.http.HttpClientAttributesExtractor;
import io.opentelemetry.instrumentation.api.semconv.http.HttpClientAttributesExtractorBuilder;
import io.opentelemetry.instrumentation.api.semconv.http.HttpClientMetrics;
import io.opentelemetry.instrumentation.api.semconv.http.HttpSpanNameExtractor;
import io.opentelemetry.instrumentation.api.semconv.http.HttpSpanNameExtractorBuilder;
import io.opentelemetry.instrumentation.api.semconv.http.HttpSpanStatusExtractor;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;

/** A builder of {@link ReactorNettyTelemetry}. */
public final class ReactorNettyTelemetryBuilder {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.reactor-netty-1.0";

private final List<AttributesExtractor<HttpClientRequest, HttpClientResponse>>
clientAdditionalExtractors = new ArrayList<>();

private final OpenTelemetry openTelemetry;

private boolean emitExperimentalHttpClientTelemetry = false;

private Consumer<HttpClientAttributesExtractorBuilder<HttpClientRequest, HttpClientResponse>>
clientExtractorConfigurer = builder -> {};
private Consumer<HttpSpanNameExtractorBuilder<HttpClientRequest>> clientSpanNameExtractorConfigurer =
builder -> {};

private final HttpClientAttributesExtractorBuilder<HttpClientRequest, HttpClientResponse>
httpClientAttributesExtractorBuilder = HttpClientAttributesExtractor.builder(ReactorNettyHttpClientAttributesGetter.INSTANCE);

public ReactorNettyTelemetryBuilder(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
}

/**
* Adds an additional {@link AttributesExtractor} to invoke to set attributes to instrumented
* items for HttpClient.
*/
@CanIgnoreReturnValue
public ReactorNettyTelemetryBuilder addClientAttributesExtractor(
AttributesExtractor<HttpClientRequest, HttpClientResponse> attributesExtractor) {
clientAdditionalExtractors.add(attributesExtractor);
return this;
}

/**
* Configures the HTTP HttpClient request headers that will be captured as span attributes.
*
* @param requestHeaders A list of HTTP header names.
*/
@CanIgnoreReturnValue
public ReactorNettyTelemetryBuilder setCapturedClientRequestHeaders(
List<String> requestHeaders) {
clientExtractorConfigurer =
clientExtractorConfigurer.andThen(
builder -> builder.setCapturedRequestHeaders(requestHeaders));
return this;
}

/**
* Configures the HTTP HttpClient response headers that will be captured as span attributes.
*
* @param responseHeaders A list of HTTP header names.
*/
@CanIgnoreReturnValue
public ReactorNettyTelemetryBuilder setCapturedClientResponseHeaders(
List<String> responseHeaders) {
clientExtractorConfigurer =
clientExtractorConfigurer.andThen(
builder -> builder.setCapturedResponseHeaders(responseHeaders));
return this;
}


/**
* Configures the instrumentation to recognize an alternative set of HTTP request methods.
*
* <p>By default, this instrumentation defines "known" methods as the ones listed in <a
* href="https://www.rfc-editor.org/rfc/rfc9110.html#name-methods">RFC9110</a> and the PATCH
* method defined in <a href="https://www.rfc-editor.org/rfc/rfc5789.html">RFC5789</a>.
*
* <p>Note: calling this method <b>overrides</b> the default known method sets completely; it does
* not supplement it.
*
* @param knownMethods A set of recognized HTTP request methods.
* @see HttpClientAttributesExtractorBuilder#setKnownMethods(Set)
*/
@CanIgnoreReturnValue
public ReactorNettyTelemetryBuilder setKnownMethods(Set<String> knownMethods) {
clientExtractorConfigurer =
clientExtractorConfigurer.andThen(builder -> builder.setKnownMethods(knownMethods));
clientSpanNameExtractorConfigurer =
clientSpanNameExtractorConfigurer.andThen(builder -> builder.setKnownMethods(knownMethods));
return this;
}

/**
* Configures the instrumentation to emit experimental HTTP client metrics.
*
* @param emitExperimentalHttpClientTelemetry {@code true} if the experimental HTTP client metrics
* are to be emitted.
*/
@CanIgnoreReturnValue
public ReactorNettyTelemetryBuilder setEmitExperimentalHttpClientTelemetry(
boolean emitExperimentalHttpClientTelemetry) {
this.emitExperimentalHttpClientTelemetry = emitExperimentalHttpClientTelemetry;
return this;
}

/**
* Returns a new {@link ReactorNettyTelemetry} with the settings of this {@link
* ReactorNettyTelemetryBuilder}.
*/
public ReactorNettyTelemetry build() {
ReactorNettyHttpClientAttributesGetter httpAttributesGetter = ReactorNettyHttpClientAttributesGetter.INSTANCE;

HttpClientAttributesExtractorBuilder<HttpClientRequest, HttpClientResponse> extractorBuilder =
HttpClientAttributesExtractor.builder(httpAttributesGetter);

HttpSpanNameExtractorBuilder<HttpClientRequest> httpSpanNameExtractorBuilder =
HttpSpanNameExtractor.builder(httpAttributesGetter);

InstrumenterBuilder<HttpClientRequest, HttpClientResponse> clientBuilder =
Instrumenter.<HttpClientRequest, HttpClientResponse>builder(
openTelemetry, INSTRUMENTATION_NAME, httpSpanNameExtractorBuilder.build())
.setSpanStatusExtractor(HttpSpanStatusExtractor.create(httpAttributesGetter))
.addAttributesExtractor(extractorBuilder.build())
.addOperationMetrics(HttpClientMetrics.get());

if (emitExperimentalHttpClientTelemetry) {
clientBuilder
.addAttributesExtractor(HttpExperimentalAttributesExtractor.create(httpAttributesGetter))
.addOperationMetrics(HttpClientExperimentalMetrics.get());
}

// headers are injected elsewhere; ClientRequest is immutable
return new ReactorNettyTelemetry(new InstrumentationContexts(clientBuilder.buildInstrumenter(alwaysClient())), openTelemetry.getPropagators());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
package io.opentelemetry.instrumentation.reactornetty.v1_0;

import javax.annotation.Nullable;

Expand Down
Loading

0 comments on commit 0aad27b

Please sign in to comment.