From 5819c44a020d27c42df766b01f75117b4b7e332d Mon Sep 17 00:00:00 2001 From: Ross Anderson Date: Sat, 14 Dec 2019 14:27:42 +0000 Subject: [PATCH 1/2] Add TracingSubscriber for RxJava2 Flowable usecases --- ...ractTracingObserver.java => RxTracer.java} | 22 +-- .../opentracing/rxjava2/TracingConsumer.java | 13 +- .../opentracing/rxjava2/TracingObserver.java | 12 +- .../rxjava2/TracingSubscriber.java | 136 +++++++++++++ .../io/opentracing/rxjava2/TestUtils.java | 19 +- .../rxjava2/TracingSubscriberTest.java | 179 ++++++++++++++++++ 6 files changed, 350 insertions(+), 31 deletions(-) rename opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/{AbstractTracingObserver.java => RxTracer.java} (81%) create mode 100644 opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/TracingSubscriber.java create mode 100644 opentracing-rxjava-2/src/test/java/io/opentracing/rxjava2/TracingSubscriberTest.java diff --git a/opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/AbstractTracingObserver.java b/opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/RxTracer.java similarity index 81% rename from opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/AbstractTracingObserver.java rename to opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/RxTracer.java index 8b78cb1..1ff8517 100644 --- a/opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/AbstractTracingObserver.java +++ b/opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/RxTracer.java @@ -17,14 +17,13 @@ import io.opentracing.Span; import io.opentracing.Tracer; import io.opentracing.tag.Tags; -import io.reactivex.Observer; -import io.reactivex.disposables.Disposable; + import java.io.PrintWriter; import java.io.StringWriter; import java.util.HashMap; import java.util.Map; -class AbstractTracingObserver implements Observer { +final class RxTracer { static final String COMPONENT_NAME = "rxjava-2"; @@ -32,32 +31,25 @@ class AbstractTracingObserver implements Observer { private final Tracer tracer; private volatile Span span; - AbstractTracingObserver(String operationName, Tracer tracer) { + RxTracer(String operationName, Tracer tracer) { this.operationName = operationName; this.tracer = tracer; } - @Override - public void onSubscribe(Disposable d) { + void onSubscribe() { span = tracer.buildSpan(operationName) - .withTag(Tags.COMPONENT.getKey(), COMPONENT_NAME).start(); + .withTag(Tags.COMPONENT.getKey(), COMPONENT_NAME).start(); Scope scope = tracer.activateSpan(span); SpanHolder.set(scope, span); } - @Override - public void onNext(T t) { - } - - @Override - public void onError(Throwable t) { + void onError(Throwable t) { onError(t, span); span.finish(); SpanHolder.clear(); } - @Override - public void onComplete() { + void onComplete() { span.finish(); SpanHolder.clear(); } diff --git a/opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/TracingConsumer.java b/opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/TracingConsumer.java index 6e31f6b..d8366a5 100644 --- a/opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/TracingConsumer.java +++ b/opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/TracingConsumer.java @@ -14,6 +14,7 @@ package io.opentracing.rxjava2; import io.opentracing.Tracer; +import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.functions.Action; import io.reactivex.functions.Consumer; @@ -23,8 +24,9 @@ /** * Tracing decorator for RxJava {@link Consumer} */ -public class TracingConsumer extends AbstractTracingObserver implements Disposable { +public class TracingConsumer implements Observer, Disposable { + private final RxTracer rxTracer; private final LambdaObserver lambdaObserver; public TracingConsumer(String operationName, Tracer tracer) { @@ -50,7 +52,7 @@ public TracingConsumer(Consumer onNext, Consumer o Action onComplete, Consumer onSubscribe, String operationName, Tracer tracer) { - super(operationName, tracer); + rxTracer = new RxTracer(operationName, tracer); requireNonNull(onNext, "onNext can not be null"); requireNonNull(onError, "onError can not be null"); @@ -66,7 +68,7 @@ public void onSubscribe(Disposable d) { try { lambdaObserver.onSubscribe(d); } finally { - super.onSubscribe(d); + rxTracer.onSubscribe(); } } @@ -80,9 +82,8 @@ public void onError(Throwable t) { try { lambdaObserver.onError(t); } finally { - super.onError(t); + rxTracer.onError(t); } - } @Override @@ -90,7 +91,7 @@ public void onComplete() { try { lambdaObserver.onComplete(); } finally { - super.onComplete(); + rxTracer.onComplete(); } } diff --git a/opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/TracingObserver.java b/opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/TracingObserver.java index 395526a..087157a 100644 --- a/opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/TracingObserver.java +++ b/opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/TracingObserver.java @@ -20,14 +20,14 @@ /** * Tracing decorator for RxJava {@link Observer} */ -public class TracingObserver extends AbstractTracingObserver implements Disposable { +public class TracingObserver implements Observer, Disposable { private Disposable upstream; + private final RxTracer rxTracer; private final Observer observer; - public TracingObserver(Observer observer, String operationName, Tracer tracer) { - super(operationName, tracer); + rxTracer = new RxTracer(operationName, tracer); this.observer = observer; } @@ -47,7 +47,7 @@ public void onSubscribe(Disposable d) { try { observer.onSubscribe(this); } finally { - super.onSubscribe(d); + rxTracer.onSubscribe(); } } @@ -61,7 +61,7 @@ public void onError(Throwable t) { try { observer.onError(t); } finally { - super.onError(t); + rxTracer.onError(t); } } @@ -70,7 +70,7 @@ public void onComplete() { try { observer.onComplete(); } finally { - super.onComplete(); + rxTracer.onComplete(); } } } diff --git a/opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/TracingSubscriber.java b/opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/TracingSubscriber.java new file mode 100644 index 0000000..4d6da69 --- /dev/null +++ b/opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/TracingSubscriber.java @@ -0,0 +1,136 @@ +/* + * Copyright 2017-2019 The OpenTracing Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.opentracing.rxjava2; + +import io.opentracing.Tracer; +import io.reactivex.FlowableSubscriber; +import io.reactivex.functions.Action; +import io.reactivex.functions.Consumer; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.operators.flowable.FlowableInternalHelper; +import io.reactivex.internal.subscribers.LambdaSubscriber; +import org.reactivestreams.Subscription; + +public class TracingSubscriber implements FlowableSubscriber, Subscription { + + private Subscription upstream; + private final RxTracer rxTracer; + private final FlowableSubscriber subscriber; + + private TracingSubscriber(FlowableSubscriber subscriber, String operationName, Tracer tracer) { + rxTracer = new RxTracer(operationName, tracer); + this.subscriber = subscriber; + } + + @Override + public void request(long l) { + upstream.request(l); + } + + @Override + public void cancel() { + upstream.cancel(); + } + + @Override + public void onSubscribe(Subscription s) { + upstream = s; + try { + subscriber.onSubscribe(this); + } finally { + rxTracer.onSubscribe(); + } + } + + @Override + public void onNext(T o) { + subscriber.onNext(o); + } + + @Override + public void onError(Throwable t) { + try { + subscriber.onError(t); + } finally { + rxTracer.onError(t); + } + } + + @Override + public void onComplete() { + try { + subscriber.onComplete(); + } finally { + rxTracer.onComplete(); + } + } + + public static FlowableSubscriber create( + String operationName, + Tracer tracer) { + return create(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, + FlowableInternalHelper.RequestMax.INSTANCE, operationName, tracer); + } + + public static FlowableSubscriber create( + Consumer onNext, + String operationName, + Tracer tracer) { + return create(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, + FlowableInternalHelper.RequestMax.INSTANCE, operationName, tracer); + } + + public static FlowableSubscriber create( + Consumer onNext, + Consumer onError, + String operationName, + Tracer tracer) { + return create(onNext, onError, Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE, + operationName, tracer); + } + + public static FlowableSubscriber create( + Consumer onNext, + Consumer onError, + Action onComplete, + String operationName, + Tracer tracer) { + return create(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE, operationName, tracer); + } + + public static FlowableSubscriber create( + Consumer onNext, + Consumer onError, + Action onComplete, + Consumer onSubscribe, + String operationName, + Tracer tracer) { + ObjectHelper.requireNonNull(onNext, "onNext is null"); + ObjectHelper.requireNonNull(onError, "onError is null"); + ObjectHelper.requireNonNull(onComplete, "onComplete is null"); + ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"); + ObjectHelper.requireNonNull(tracer, "tracer can not be null"); + + return create(new LambdaSubscriber<>(onNext, onError, onComplete, onSubscribe), operationName, tracer); + } + + public static FlowableSubscriber create( + FlowableSubscriber subscriber, + String operationName, + Tracer tracer) { + + return new TracingSubscriber<>(subscriber, operationName, tracer); + } +} diff --git a/opentracing-rxjava-2/src/test/java/io/opentracing/rxjava2/TestUtils.java b/opentracing-rxjava-2/src/test/java/io/opentracing/rxjava2/TestUtils.java index 106a87b..eaa840a 100644 --- a/opentracing-rxjava-2/src/test/java/io/opentracing/rxjava2/TestUtils.java +++ b/opentracing-rxjava-2/src/test/java/io/opentracing/rxjava2/TestUtils.java @@ -13,21 +13,24 @@ */ package io.opentracing.rxjava2; -import static io.opentracing.rxjava2.AbstractTracingObserver.COMPONENT_NAME; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - import io.opentracing.mock.MockSpan; import io.opentracing.mock.MockTracer; import io.opentracing.tag.Tags; +import io.reactivex.BackpressureStrategy; +import io.reactivex.Flowable; import io.reactivex.Observable; import io.reactivex.functions.Function; import io.reactivex.functions.Predicate; import io.reactivex.schedulers.Schedulers; + import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import static io.opentracing.rxjava2.RxTracer.COMPONENT_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + class TestUtils { static Observable createSequentialObservable(final MockTracer mockTracer) { @@ -70,6 +73,14 @@ public boolean test(Integer integer) { }); } + static Flowable createSequentialFlowable(final MockTracer mockTracer) { + return createSequentialObservable(mockTracer).toFlowable(BackpressureStrategy.ERROR); + } + + static Flowable createParallelFlowable(final MockTracer mockTracer) { + return createParallelObservable(mockTracer).toFlowable(BackpressureStrategy.ERROR); + } + private static void sleep() { try { TimeUnit.MILLISECONDS.sleep(200L); diff --git a/opentracing-rxjava-2/src/test/java/io/opentracing/rxjava2/TracingSubscriberTest.java b/opentracing-rxjava-2/src/test/java/io/opentracing/rxjava2/TracingSubscriberTest.java new file mode 100644 index 0000000..943626f --- /dev/null +++ b/opentracing-rxjava-2/src/test/java/io/opentracing/rxjava2/TracingSubscriberTest.java @@ -0,0 +1,179 @@ +/* + * Copyright 2017-2019 The OpenTracing Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.opentracing.rxjava2; + +import io.opentracing.Scope; +import io.opentracing.mock.MockSpan; +import io.opentracing.mock.MockTracer; +import io.reactivex.Flowable; +import io.reactivex.subscribers.TestSubscriber; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static io.opentracing.rxjava2.TestUtils.*; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.*; + +public class TracingSubscriberTest { + + private static final MockTracer mockTracer = new MockTracer(); + + @Before + public void before() { + mockTracer.reset(); + TracingRxJava2Utils.enableTracing(mockTracer); + } + + @Test + public void sequential() { + TestSubscriber testSubscriber = executeSequentialFlowable(); + + assertEquals(5, testSubscriber.valueCount()); + + List spans = mockTracer.finishedSpans(); + assertEquals(1, spans.size()); + checkSpans(spans); + + assertNull(mockTracer.scopeManager().activeSpan()); + } + + @Test + public void two_sequential() { + TestSubscriber testSubscriber1 = executeSequentialFlowable(); + TestSubscriber testSubscriber2 = executeSequentialFlowable(); + + assertEquals(5, testSubscriber1.valueCount()); + assertEquals(5, testSubscriber2.valueCount()); + + List spans = mockTracer.finishedSpans(); + assertEquals(2, spans.size()); + + assertNotEquals(spans.get(0).context().traceId(), spans.get(1).context().traceId()); + + assertNull(mockTracer.scopeManager().activeSpan()); + } + + @Test + public void sequential_with_parent() { + final MockSpan parent = mockTracer.buildSpan("parent").start(); + try (Scope ignored = mockTracer.activateSpan(parent)) { + TestSubscriber testSubscriber1 = executeSequentialFlowable(); + TestSubscriber testSubscriber2 = executeSequentialFlowable(); + + assertEquals(5, testSubscriber1.valueCount()); + assertEquals(5, testSubscriber2.valueCount()); + } + parent.finish(); + + List spans = mockTracer.finishedSpans(); + assertEquals(3, spans.size()); + + assertNotNull(parent); + + for (MockSpan span : spans) { + assertEquals(parent.context().traceId(), span.context().traceId()); + } + + assertNull(mockTracer.scopeManager().activeSpan()); + } + + @Test + public void parallel() { + TestSubscriber testSubscriber = executeParallelFlowable(); + + await().atMost(15, TimeUnit.SECONDS).until(reportedSpansSize(mockTracer), equalTo(1)); + + assertEquals(5, testSubscriber.valueCount()); + + List spans = mockTracer.finishedSpans(); + assertEquals(1, spans.size()); + checkSpans(spans); + + assertNull(mockTracer.scopeManager().activeSpan()); + } + + @Test + public void two_parallel() { + TestSubscriber testSubscriber1 = executeParallelFlowable(); + TestSubscriber testSubscriber2 = executeParallelFlowable(); + + testSubscriber1.awaitTerminalEvent(); + testSubscriber2.awaitTerminalEvent(); + + await().atMost(15, TimeUnit.SECONDS).until(reportedSpansSize(mockTracer), equalTo(2)); + + assertEquals(5, testSubscriber1.valueCount()); + assertEquals(5, testSubscriber2.valueCount()); + + List spans = mockTracer.finishedSpans(); + assertEquals(2, spans.size()); + + assertNotEquals(spans.get(0).context().traceId(), spans.get(1).context().traceId()); + + assertNull(mockTracer.scopeManager().activeSpan()); + } + + @Test + public void parallel_with_parent() { + final MockSpan parent = mockTracer.buildSpan("parallel_parent").start(); + try (Scope ignored = mockTracer.activateSpan(parent)) { + TestSubscriber testSubscriber1 = executeParallelFlowable(); + TestSubscriber testSubscriber2 = executeParallelFlowable(); + + testSubscriber1.awaitTerminalEvent(); + testSubscriber2.awaitTerminalEvent(); + + assertEquals(5, testSubscriber1.valueCount()); + assertEquals(5, testSubscriber2.valueCount()); + } + parent.finish(); + + await().atMost(15, TimeUnit.SECONDS).until(reportedSpansSize(mockTracer), equalTo(3)); + + List spans = mockTracer.finishedSpans(); + assertEquals(3, spans.size()); + + assertNotNull(parent); + + for (MockSpan span : spans) { + assertEquals(parent.context().traceId(), span.context().traceId()); + } + + assertNull(mockTracer.scopeManager().activeSpan()); + } + + private TestSubscriber executeSequentialFlowable() { + Flowable Flowable = createSequentialFlowable(mockTracer); + + TestSubscriber subscriber = new TestSubscriber<>(); + + Flowable.subscribe(TracingSubscriber.create(subscriber, "sequential", mockTracer)); + + return subscriber; + } + + private TestSubscriber executeParallelFlowable() { + Flowable flowable = createParallelFlowable(mockTracer); + + TestSubscriber subscriber = new TestSubscriber<>(); + + flowable.subscribe(TracingSubscriber.create(subscriber, "parallel", mockTracer)); + + return subscriber; + } +} From d23d9d448fb3f8217ed277f5eaf7d098b6400a35 Mon Sep 17 00:00:00 2001 From: Ross Anderson Date: Sat, 14 Dec 2019 14:38:47 +0000 Subject: [PATCH 2/2] Add matching documentation --- .../main/java/io/opentracing/rxjava2/TracingSubscriber.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/TracingSubscriber.java b/opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/TracingSubscriber.java index 4d6da69..e06bc06 100644 --- a/opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/TracingSubscriber.java +++ b/opentracing-rxjava-2/src/main/java/io/opentracing/rxjava2/TracingSubscriber.java @@ -23,6 +23,9 @@ import io.reactivex.internal.subscribers.LambdaSubscriber; import org.reactivestreams.Subscription; +/** + * Tracing decorator for RxJava {@link FlowableSubscriber} + */ public class TracingSubscriber implements FlowableSubscriber, Subscription { private Subscription upstream;