Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

End span on cancellation of subscription to reactive publishers #3153

Merged
merged 12 commits into from
Jun 3, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ import io.lettuce.core.RedisClient
import io.lettuce.core.resource.ClientResources
import io.opentelemetry.instrumentation.reactor.TracingOperator
import io.opentelemetry.instrumentation.test.LibraryTestTrait
import spock.lang.Shared

class LettuceReactiveClientTest extends AbstractLettuceReactiveClientTest implements LibraryTestTrait {
@Shared
TracingOperator tracingOperator = TracingOperator.create()

@Override
RedisClient createClient(String uri) {
return RedisClient.create(
Expand All @@ -21,10 +25,10 @@ class LettuceReactiveClientTest extends AbstractLettuceReactiveClientTest implem
}

def setupSpec() {
TracingOperator.registerOnEachOperator()
tracingOperator.registerOnEachOperator()
}

def cleanupSpec() {
TracingOperator.resetOnEachOperator()
tracingOperator.resetOnEachOperator()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ muzzle {
}
}

tasks.withType(Test).configureEach {
// TODO run tests both with and without experimental span attributes
jvmArgs "-Dotel.instrumentation.reactor.experimental-span-attributes=true"
}

dependencies {
implementation project(':instrumentation:reactor-3.1:library')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static net.bytebuddy.matcher.ElementMatchers.isTypeInitializer;
import static net.bytebuddy.matcher.ElementMatchers.named;

import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.reactor.TracingOperator;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
Expand All @@ -31,7 +32,13 @@ public void transform(TypeTransformer transformer) {
public static class ResetOnEachOperatorAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void postStaticInitializer() {
TracingOperator.registerOnEachOperator();
TracingOperator.newBuilder()
.setCaptureExperimentalSpanAttributes(
Config.get()
.getBooleanProperty(
"otel.instrumentation.reactor.experimental-span-attributes", false))
.build()
.registerOnEachOperator();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,35 @@ class ReactorWithSpanInstrumentationTest extends AgentInstrumentationSpecificati
}
}

def "should capture span for canceled Mono"() {
setup:
def source = UnicastProcessor.<String>create()
def mono = source.singleOrEmpty()
def result = new TracedWithSpan()
.mono(mono)
def verifier = StepVerifier.create(result)
.expectSubscription()

expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}

verifier.thenCancel().verify()

assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.mono"
kind SpanKind.INTERNAL
hasNoParent()
attributes {
"reactor.canceled" true
}
}
}
}
}

def "should capture span for already completed Flux"() {
setup:
def source = Flux.just("Value")
Expand Down Expand Up @@ -242,4 +271,35 @@ class ReactorWithSpanInstrumentationTest extends AgentInstrumentationSpecificati
}
}
}

def "should capture span for canceled Flux"() {
setup:
def error = new IllegalArgumentException("Boom")
def source = UnicastProcessor.<String>create()
def result = new TracedWithSpan()
.flux(source)
def verifier = StepVerifier.create(result)
.expectSubscription()

expect:
Thread.sleep(500) // sleep a bit just to make sure no span is captured
assertTraces(0) {}

source.onError(error)

verifier.thenCancel().verify()

assertTraces(1) {
trace(0, 1) {
span(0) {
name "TracedWithSpan.flux"
kind SpanKind.INTERNAL
hasNoParent()
attributes {
"reactor.canceled" true
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package io.opentelemetry.instrumentation.reactor;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategy;
Expand All @@ -14,8 +16,23 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public enum ReactorAsyncSpanEndStrategy implements AsyncSpanEndStrategy {
INSTANCE;
public final class ReactorAsyncSpanEndStrategy implements AsyncSpanEndStrategy {
private static final AttributeKey<Boolean> CANCELED_ATTRIBUTE_KEY =
AttributeKey.booleanKey("reactor.canceled");

public static ReactorAsyncSpanEndStrategy create() {
return newBuilder().build();
}

public static ReactorAsyncSpanEndStrategyBuilder newBuilder() {
return new ReactorAsyncSpanEndStrategyBuilder();
}
Comment on lines +27 to +29
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, do we need the builder for this class? Is it supposed to be possible to use it without TracingOperator?
If we can we can treat it as an internal part of the library instrumentation then we can make it package-private and remove the builder and use a simple ReactorAsyncSpanEndStrategy(boolean) constructor instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I had not considered them as being internal to the instrumentation. Would it be problematic to allow them to be used directly? They were already public even though they were singletons.

As of right now I am using the ReactorAsyncSpanEndStrategy without the TracingOperator in my Spring WebFlux projects. I've been slowly migrating away from our own instrumentation but I've not yet considered replacing our own Reactor operators, partially because they do more than propagate the OTel Context. I've combined several operations into a single operator because every individual operator effectively doubles the stack trace as they are added for every reactive operation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I had not considered them as being internal to the instrumentation. Would it be problematic to allow them to be used directly? They were already public even though they were singletons.

Hmm, it would be a bit better to expose less API surface, and they seemed like they could be internal. I wouldn't say that it's a hard requirement for them to be internal; I feel like they should be, but not have to. If you're using them right now then it's probably fine to keep them public for a while.


private final boolean captureExperimentalSpanAttributes;

ReactorAsyncSpanEndStrategy(boolean captureExperimentalSpanAttributes) {
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
}

@Override
public boolean supports(Class<?> returnType) {
Expand All @@ -29,10 +46,14 @@ public Object end(BaseTracer tracer, Context context, Object returnValue) {
new EndOnFirstNotificationConsumer(tracer, context);
if (returnValue instanceof Mono) {
Mono<?> mono = (Mono<?>) returnValue;
return mono.doOnError(notificationConsumer).doOnSuccess(notificationConsumer::onSuccess);
return mono.doOnError(notificationConsumer)
.doOnSuccess(notificationConsumer::onSuccess)
.doOnCancel(notificationConsumer::onCancel);
} else {
Flux<?> flux = Flux.from((Publisher<?>) returnValue);
return flux.doOnError(notificationConsumer).doOnComplete(notificationConsumer);
return flux.doOnError(notificationConsumer)
.doOnComplete(notificationConsumer)
.doOnCancel(notificationConsumer::onCancel);
}
}

Expand All @@ -41,7 +62,7 @@ public Object end(BaseTracer tracer, Context context, Object returnValue) {
* OnError notifications are received. Multiple notifications can happen anytime multiple
* subscribers subscribe to the same publisher.
*/
private static final class EndOnFirstNotificationConsumer extends AtomicBoolean
private final class EndOnFirstNotificationConsumer extends AtomicBoolean
implements Runnable, Consumer<Throwable> {

private final BaseTracer tracer;
Expand All @@ -57,6 +78,15 @@ public <T> void onSuccess(T ignored) {
accept(null);
}

public void onCancel() {
if (compareAndSet(false, true)) {
if (captureExperimentalSpanAttributes) {
Span.fromContext(context).setAttribute(CANCELED_ATTRIBUTE_KEY, true);
}
tracer.end(context);
}
}

@Override
public void run() {
accept(null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.reactor;

public final class ReactorAsyncSpanEndStrategyBuilder {
private boolean captureExperimentalSpanAttributes;

ReactorAsyncSpanEndStrategyBuilder() {}

public ReactorAsyncSpanEndStrategyBuilder setCaptureExperimentalSpanAttributes(
boolean captureExperimentalSpanAttributes) {
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
return this;
}

public ReactorAsyncSpanEndStrategy build() {
return new ReactorAsyncSpanEndStrategy(captureExperimentalSpanAttributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,39 @@
/** Based on Spring Sleuth's Reactor instrumentation. */
public final class TracingOperator {

public static TracingOperator create() {
return newBuilder().build();
}

public static TracingOperatorBuilder newBuilder() {
return new TracingOperatorBuilder();
}

private final boolean captureExperimentalSpanAttributes;

TracingOperator(boolean captureExperimentalSpanAttributes) {
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
}

/**
* Registers a hook that applies to every operator, propagating {@link Context} to downstream
* callbacks to ensure spans in the {@link Context} are available throughout the lifetime of a
* reactive stream. This should generally be called in a static initializer block in your
* application.
*/
public static void registerOnEachOperator() {
public void registerOnEachOperator() {
Hooks.onEachOperator(TracingSubscriber.class.getName(), tracingLift());
AsyncSpanEndStrategies.getInstance().registerStrategy(ReactorAsyncSpanEndStrategy.INSTANCE);
AsyncSpanEndStrategies.getInstance()
.registerStrategy(
ReactorAsyncSpanEndStrategy.newBuilder()
.setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes)
.build());
}

/** Unregisters the hook registered by {@link #registerOnEachOperator()}. */
public static void resetOnEachOperator() {
public void resetOnEachOperator() {
Hooks.resetOnEachOperator(TracingSubscriber.class.getName());
AsyncSpanEndStrategies.getInstance().unregisterStrategy(ReactorAsyncSpanEndStrategy.INSTANCE);
AsyncSpanEndStrategies.getInstance().unregisterStrategy(ReactorAsyncSpanEndStrategy.class);
}

private static <T> Function<? super Publisher<T>, ? extends Publisher<T>> tracingLift() {
Expand All @@ -69,6 +87,4 @@ public CoreSubscriber<? super T> apply(Scannable publisher, CoreSubscriber<? sup
return new TracingSubscriber<>(sub, sub.currentContext());
}
}

private TracingOperator() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.reactor;

public final class TracingOperatorBuilder {
private boolean captureExperimentalSpanAttributes;

TracingOperatorBuilder() {}

public TracingOperatorBuilder setCaptureExperimentalSpanAttributes(
boolean captureExperimentalSpanAttributes) {
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
return this;
}

public TracingOperator build() {
return new TracingOperator(captureExperimentalSpanAttributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class HooksTest extends LibraryInstrumentationSpecification {

def "can reset out hooks"() {
setup:
def underTest = TracingOperator.create()
AtomicReference<CoreSubscriber> subscriber = new AtomicReference<>()

when: "no hook registered"
Expand All @@ -23,14 +24,14 @@ class HooksTest extends LibraryInstrumentationSpecification {
!(subscriber.get() instanceof TracingSubscriber)

when: "hook registered"
TracingOperator.registerOnEachOperator()
underTest.registerOnEachOperator()
new CapturingMono(subscriber).map { it + 1 }.subscribe()

then:
subscriber.get() instanceof TracingSubscriber

when: "hook reset"
TracingOperator.resetOnEachOperator()
underTest.resetOnEachOperator()
new CapturingMono(subscriber).map { it + 1 }.subscribe()

then:
Expand Down
Loading