Skip to content

Commit

Permalink
Fix rxjava2 NoSuchFieldError (#2836)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit committed Apr 21, 2021
1 parent f956a58 commit 3cb210a
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,18 @@ private static void enableFlowable() {

@SuppressWarnings({"rawtypes", "unchecked"})
private static void enableObservable() {
oldOnObservableSubscribe = RxJavaPlugins.getOnObservableSubscribe();
RxJavaPlugins.setOnObservableSubscribe(
biCompose(
oldOnObservableSubscribe,
(observable, observer) -> {
final Context context = Context.current();
try (Scope ignored = context.makeCurrent()) {
return new TracingObserver(observer, context);
}
}));
if (TracingObserver.canEnable()) {
oldOnObservableSubscribe = RxJavaPlugins.getOnObservableSubscribe();
RxJavaPlugins.setOnObservableSubscribe(
biCompose(
oldOnObservableSubscribe,
(observable, observer) -> {
final Context context = Context.current();
try (Scope ignored = context.makeCurrent()) {
return new TracingObserver(observer, context);
}
}));
}
}

@SuppressWarnings({"rawtypes", "unchecked"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
import io.reactivex.Observer;
import io.reactivex.internal.fuseable.QueueDisposable;
import io.reactivex.internal.observers.BasicFuseableObserver;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;

class TracingObserver<T> extends BasicFuseableObserver<T, T> {
private static final MethodHandle queueDisposableGetter = getQueueDisposableGetter();

// BasicFuseableObserver#actual has been renamed to downstream in newer versions, we can't use it
// in this class
Expand Down Expand Up @@ -64,7 +67,7 @@ public void onComplete() {

@Override
public int requestFusion(int mode) {
final QueueDisposable<T> qd = this.qs;
final QueueDisposable<T> qd = getQueueDisposable();
if (qd != null) {
final int m = qd.requestFusion(mode);
sourceMode = m;
Expand All @@ -75,6 +78,36 @@ public int requestFusion(int mode) {

@Override
public T poll() throws Exception {
return qs.poll();
return getQueueDisposable().poll();
}

private QueueDisposable<T> getQueueDisposable() {
try {
return (QueueDisposable<T>) queueDisposableGetter.invoke(this);
} catch (Throwable throwable) {
throw new IllegalStateException(throwable);
}
}

private static MethodHandle getGetterHandle(String fieldName) {
try {
return MethodHandles.lookup()
.findGetter(BasicFuseableObserver.class, fieldName, QueueDisposable.class);
} catch (NoSuchFieldException | IllegalAccessException ignored) {
}
return null;
}

private static MethodHandle getQueueDisposableGetter() {
MethodHandle getter = getGetterHandle("qd");
if (getter == null) {
// in versions before 2.2.1 field was named "qs"
getter = getGetterHandle("qs");
}
return getter;
}

public static boolean canEnable() {
return queueDisposableGetter != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@
package io.opentelemetry.instrumentation.rxjava2

import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runInternalSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.reactivex.Observable
import io.reactivex.Single
import io.reactivex.functions.Consumer

import java.util.concurrent.CountDownLatch

abstract class AbstractRxJava2SubscriptionTest extends InstrumentationSpecification {

def "subscription test"() {
def "subscribe single test"() {
when:
CountDownLatch latch = new CountDownLatch(1)
runUnderTrace("parent") {
Expand All @@ -43,6 +45,32 @@ abstract class AbstractRxJava2SubscriptionTest extends InstrumentationSpecificat
}
}

def "test observable fusion"() {
when:
CountDownLatch latch = new CountDownLatch(1)
runUnderTrace("parent") {
Observable<Integer> integerObservable = Observable.just(1, 2, 3, 4)
integerObservable.concatMap({
return Observable.just(it)
}).count().subscribe(new Consumer<Long>() {
@Override
void accept(Long count) {
runInternalSpan("child")
latch.countDown()
}
})
}
latch.await()

then:
assertTraces(1) {
trace(0, 2) {
basicSpan(it, 0, "parent")
basicSpan(it, 1, "child", span(0))
}
}
}

static class Connection {
static int query() {
def span = GlobalOpenTelemetry.getTracer("test").spanBuilder("Connection.query").startSpan()
Expand Down

0 comments on commit 3cb210a

Please sign in to comment.