Skip to content

Commit

Permalink
RxJava2 autoinstrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
piotr-sumo committed Jan 27, 2021
1 parent 59c1034 commit 77bc145
Show file tree
Hide file tree
Showing 17 changed files with 1,069 additions and 5 deletions.
20 changes: 20 additions & 0 deletions instrumentation/rxjava-2.0/javaagent/rxjava-2.0-javaagent.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apply from: "$rootDir/gradle/instrumentation.gradle"

muzzle {
pass {
group = "io.reactivex.rxjava2"
module = "rxjava"
// Tomcat 10 is about servlet 5.0
// 7.0.4 added Request.isAsync, which is needed
versions = "[2.0.0,)"
}
}

dependencies {
implementation group: 'org.reactivestreams', name: 'reactive-streams', version: '1.0.0'
library group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.0.0'

testCompileOnly group: 'org.reactivestreams', name: 'reactive-streams', version: '1.0.0'
testCompile group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.0.5'
latestDepTestLibrary group: 'io.reactivex.rxjava2', name: 'rxjava', version: '+'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package io.opentelemetry.instrumentation.rxjava2;

import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import io.reactivex.Completable;
import io.reactivex.CompletableObserver;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class CompletableInstrumentation extends InstrumentationModule {

public CompletableInstrumentation() {
super("rxjava");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return Collections.singletonList(new Instrumentation());
}

public static class Instrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<? super TypeDescription> typeMatcher() {
return named("io.reactivex.Completable");
}

@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
Map<ElementMatcher<? super MethodDescription>, String> transformers = new HashMap<>();
transformers.put(isConstructor(), CaptureParentSpanAdvice.class.getName());
transformers.put(
isMethod()
.and(named("subscribe"))
.and(takesArguments(1))
.and(takesArgument(0, named("io.reactivex.CompletableObserver"))),
PropagateParentSpanAdvice.class.getName());
return transformers;
}
}

public static class CaptureParentSpanAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onConstruct(@Advice.This final Completable completable) {
Context parentSpan = Context.current();
if (parentSpan != null) {
InstrumentationContext.get(Completable.class, Context.class).put(completable, parentSpan);
}
}
}

public static class PropagateParentSpanAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope onSubscribe(
@Advice.This final Completable completable,
@Advice.Argument(value = 0, readOnly = false) CompletableObserver observer) {
if (observer != null) {
Context parentSpan =
InstrumentationContext.get(Completable.class, Context.class).get(completable);
if (parentSpan != null) {
// wrap the observer so spans from its events treat the captured span as their parent
observer = new TracingCompletableObserver(observer, parentSpan);
// activate the span here in case additional observers are created during subscribe
return parentSpan.makeCurrent();
}
}
return null;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void closeScope(@Advice.Enter final Scope scope) {
if (scope != null) {
scope.close();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package io.opentelemetry.instrumentation.rxjava2;

import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import io.reactivex.Flowable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.reactivestreams.Subscriber;

public class FlowableInstrumentation extends InstrumentationModule {

public FlowableInstrumentation() {
super("rxjava");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return Collections.singletonList(new Instrumentation());
}

public static class Instrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<? super TypeDescription> typeMatcher() {
return named("io.reactivex.Flowable");
}

@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
Map<ElementMatcher<? super MethodDescription>, String> transformers = new HashMap<>();
transformers.put(isConstructor(), CaptureParentSpanAdvice.class.getName());
transformers.put(
isMethod()
.and(named("subscribe"))
.and(takesArguments(1))
.and(takesArgument(0, named("org.reactivestreams.Subscriber"))),
PropagateParentSpanAdvice.class.getName());
return transformers;
}
}

public static class CaptureParentSpanAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onConstruct(@Advice.This final Flowable<?> flowable) {
Context parentSpan = Context.current();
if (parentSpan != null) {
InstrumentationContext.get(Flowable.class, Context.class).put(flowable, parentSpan);
}
}
}

public static class PropagateParentSpanAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope onSubscribe(
@Advice.This final Flowable<?> flowable,
@Advice.Argument(value = 0, readOnly = false) Subscriber<?> subscriber) {
if (subscriber != null) {
Context parentSpan =
InstrumentationContext.get(Flowable.class, Context.class).get(flowable);
if (parentSpan != null) {
// wrap the subscriber so spans from its events treat the captured span as their parent
subscriber = new TracingSubscriber<>(subscriber, parentSpan);
// activate the span here in case additional subscribers are created during subscribe
return parentSpan.makeCurrent();
}
}
return null;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void closeScope(@Advice.Enter final Scope scope) {
if (scope != null) {
scope.close();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package io.opentelemetry.instrumentation.rxjava2;

import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import io.reactivex.Maybe;
import io.reactivex.MaybeObserver;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class MaybeInstrumentation extends InstrumentationModule {

public MaybeInstrumentation() {
super("rxjava");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return Collections.singletonList(new Instrumentation());
}

public static class Instrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("io.reactivex.Maybe");
}

@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
Map<ElementMatcher<? super MethodDescription>, String> transformers = new HashMap<>();
transformers.put(isConstructor(), CaptureParentSpanAdvice.class.getName());
transformers.put(
isMethod()
.and(named("subscribe"))
.and(takesArguments(1))
.and(takesArgument(0, named("io.reactivex.MaybeObserver"))),
PropagateParentSpanAdvice.class.getName());
return transformers;
}
}

public static class CaptureParentSpanAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onConstruct(@Advice.This final Maybe<?> maybe) {
Context parentSpan = Context.current();
if (parentSpan != null) {
InstrumentationContext.get(Maybe.class, Context.class).put(maybe, parentSpan);
}
}
}

public static class PropagateParentSpanAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope onSubscribe(
@Advice.This final Maybe<?> maybe,
@Advice.Argument(value = 0, readOnly = false) MaybeObserver<?> observer) {
if (observer != null) {
Context parentSpan = InstrumentationContext.get(Maybe.class, Context.class).get(maybe);
if (parentSpan != null) {
// wrap the observer so spans from its events treat the captured span as their parent
observer = new TracingMaybeObserver<>(observer, parentSpan);
// activate the span here in case additional observers are created during subscribe
return parentSpan.makeCurrent();
}
}
return null;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void closeScope(@Advice.Enter final Scope scope) {
if (scope != null) {
scope.close();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package io.opentelemetry.instrumentation.rxjava2;

import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import io.reactivex.Observable;
import io.reactivex.Observer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class ObservableInstrumentation extends InstrumentationModule {

public ObservableInstrumentation() {
super("rxjava");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return Collections.singletonList(new Instrumentation());
}

public static class Instrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<? super TypeDescription> typeMatcher() {
return named("io.reactivex.Observable");
}

@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
Map<ElementMatcher<? super MethodDescription>, String> transformers = new HashMap<>();
transformers.put(isConstructor(), CaptureParentSpanAdvice.class.getName());
transformers.put(
isMethod()
.and(named("subscribe"))
.and(takesArguments(1))
.and(takesArgument(0, named("io.reactivex.Observer"))),
PropagateParentSpanAdvice.class.getName());
return transformers;
}
}

public static class CaptureParentSpanAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onConstruct(@Advice.This final Observable<?> observable) {
Context parentSpan = Context.current();
if (parentSpan != null) {
InstrumentationContext.get(Observable.class, Context.class).put(observable, parentSpan);
}
}
}

public static class PropagateParentSpanAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope onSubscribe(
@Advice.This final Observable<?> observable,
@Advice.Argument(value = 0, readOnly = false) Observer<?> observer) {
if (observer != null) {
Context parentSpan =
InstrumentationContext.get(Observable.class, Context.class).get(observable);
if (parentSpan != null) {
// wrap the observer so spans from its events treat the captured span as their parent
observer = new TracingObserver<>(observer, parentSpan);
// activate the span here in case additional observers are created during subscribe
return parentSpan.makeCurrent();
}
}
return null;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void closeScope(@Advice.Enter final Scope scope) {
if (scope != null) {
scope.close();
}
}
}
}

0 comments on commit 77bc145

Please sign in to comment.