Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix rxjava2 NoSuchFieldError #2836

Merged
merged 2 commits into from
Apr 21, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,18 @@ private static void enableFlowable() {

@SuppressWarnings({"rawtypes", "unchecked"})
private static void enableObservable() {
oldOnObservableSubscribe = RxJavaPlugins.getOnObservableSubscribe();
RxJavaPlugins.setOnObservableSubscribe(
biCompose(
oldOnObservableSubscribe,
(observable, observer) -> {
final Context context = Context.current();
try (Scope ignored = context.makeCurrent()) {
return new TracingObserver(observer, context);
}
}));
if (TracingObserver.canEnable()) {
oldOnObservableSubscribe = RxJavaPlugins.getOnObservableSubscribe();
RxJavaPlugins.setOnObservableSubscribe(
biCompose(
oldOnObservableSubscribe,
(observable, observer) -> {
final Context context = Context.current();
try (Scope ignored = context.makeCurrent()) {
return new TracingObserver(observer, context);
}
}));
}
}

@SuppressWarnings({"rawtypes", "unchecked"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
import io.reactivex.Observer;
import io.reactivex.internal.fuseable.QueueDisposable;
import io.reactivex.internal.observers.BasicFuseableObserver;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;

class TracingObserver<T> extends BasicFuseableObserver<T, T> {
private static final MethodHandle queueDisposableGetter = getQueueDisposableGetter();

// BasicFuseableObserver#actual has been renamed to downstream in newer versions, we can't use it
// in this class
Expand Down Expand Up @@ -64,7 +67,7 @@ public void onComplete() {

@Override
public int requestFusion(int mode) {
final QueueDisposable<T> qd = this.qs;
final QueueDisposable<T> qd = getQueueDisposable();
if (qd != null) {
final int m = qd.requestFusion(mode);
sourceMode = m;
Expand All @@ -75,6 +78,36 @@ public int requestFusion(int mode) {

@Override
public T poll() throws Exception {
return qs.poll();
return getQueueDisposable().poll();
}

private QueueDisposable<T> getQueueDisposable() {
try {
return (QueueDisposable<T>) queueDisposableGetter.invoke(this);
} catch (Throwable throwable) {
throw new IllegalStateException(throwable);
}
}

private static MethodHandle getGetterHandle(String fieldName) {
try {
return MethodHandles.lookup()
.findGetter(BasicFuseableObserver.class, fieldName, QueueDisposable.class);
} catch (NoSuchFieldException | IllegalAccessException ignored) {
}
return null;
}

private static MethodHandle getQueueDisposableGetter() {
MethodHandle getter = getGetterHandle("qd");
if (getter == null) {
// in versions before 2.2.21 field was named "qs"
getter = getGetterHandle("qs");
}
return getter;
}

public static boolean canEnable() {
return queueDisposableGetter != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@
package io.opentelemetry.instrumentation.rxjava2

import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runInternalSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.reactivex.Observable
import io.reactivex.Single
import io.reactivex.functions.Consumer

import java.util.concurrent.CountDownLatch

abstract class AbstractRxJava2SubscriptionTest extends InstrumentationSpecification {

def "subscription test"() {
def "subscribe single test"() {
when:
CountDownLatch latch = new CountDownLatch(1)
runUnderTrace("parent") {
Expand All @@ -43,6 +45,32 @@ abstract class AbstractRxJava2SubscriptionTest extends InstrumentationSpecificat
}
}

def "test observable fusion"() {
when:
CountDownLatch latch = new CountDownLatch(1)
runUnderTrace("parent") {
Observable<Integer> integerObservable = Observable.just(1, 2, 3, 4)
integerObservable.concatMap({
return Observable.just(it)
}).count().subscribe(new Consumer<Long>() {
@Override
void accept(Long count) {
runInternalSpan("child")
latch.countDown()
}
})
}
latch.await()

then:
assertTraces(1) {
trace(0, 2) {
basicSpan(it, 0, "parent")
basicSpan(it, 1, "child", span(0))
}
}
}

static class Connection {
static int query() {
def span = GlobalOpenTelemetry.getTracer("test").spanBuilder("Connection.query").startSpan()
Expand Down