Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,39 @@
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Map;

class AbstractTracingObserver<T> implements Observer<T> {
final class RxTracer {

static final String COMPONENT_NAME = "rxjava-2";

private final String operationName;
private final Tracer tracer;
private volatile Span span;

AbstractTracingObserver(String operationName, Tracer tracer) {
RxTracer(String operationName, Tracer tracer) {
this.operationName = operationName;
this.tracer = tracer;
}

@Override
public void onSubscribe(Disposable d) {
void onSubscribe() {
span = tracer.buildSpan(operationName)
.withTag(Tags.COMPONENT.getKey(), COMPONENT_NAME).start();
.withTag(Tags.COMPONENT.getKey(), COMPONENT_NAME).start();
Scope scope = tracer.activateSpan(span);
SpanHolder.set(scope, span);
}

@Override
public void onNext(T t) {
}

@Override
public void onError(Throwable t) {
void onError(Throwable t) {
onError(t, span);
span.finish();
SpanHolder.clear();
}

@Override
public void onComplete() {
void onComplete() {
span.finish();
SpanHolder.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.opentracing.rxjava2;

import io.opentracing.Tracer;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
Expand All @@ -23,8 +24,9 @@
/**
* Tracing decorator for RxJava {@link Consumer}
*/
public class TracingConsumer<T> extends AbstractTracingObserver<T> implements Disposable {
public class TracingConsumer<T> implements Observer<T>, Disposable {

private final RxTracer rxTracer;
private final LambdaObserver<T> lambdaObserver;

public TracingConsumer(String operationName, Tracer tracer) {
Expand All @@ -50,7 +52,7 @@ public TracingConsumer(Consumer<? super T> onNext, Consumer<? super Throwable> o
Action onComplete, Consumer<? super Disposable> onSubscribe, String operationName,
Tracer tracer) {

super(operationName, tracer);
rxTracer = new RxTracer(operationName, tracer);

requireNonNull(onNext, "onNext can not be null");
requireNonNull(onError, "onError can not be null");
Expand All @@ -66,7 +68,7 @@ public void onSubscribe(Disposable d) {
try {
lambdaObserver.onSubscribe(d);
} finally {
super.onSubscribe(d);
rxTracer.onSubscribe();
}
}

Expand All @@ -80,17 +82,16 @@ public void onError(Throwable t) {
try {
lambdaObserver.onError(t);
} finally {
super.onError(t);
rxTracer.onError(t);
}

}

@Override
public void onComplete() {
try {
lambdaObserver.onComplete();
} finally {
super.onComplete();
rxTracer.onComplete();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
/**
* Tracing decorator for RxJava {@link Observer}
*/
public class TracingObserver<T> extends AbstractTracingObserver<T> implements Disposable {
public class TracingObserver<T> implements Observer<T>, Disposable {

private Disposable upstream;
private final RxTracer rxTracer;
private final Observer<T> observer;


public TracingObserver(Observer<T> observer, String operationName, Tracer tracer) {
super(operationName, tracer);
rxTracer = new RxTracer(operationName, tracer);
this.observer = observer;
}

Expand All @@ -47,7 +47,7 @@ public void onSubscribe(Disposable d) {
try {
observer.onSubscribe(this);
} finally {
super.onSubscribe(d);
rxTracer.onSubscribe();
}
}

Expand All @@ -61,7 +61,7 @@ public void onError(Throwable t) {
try {
observer.onError(t);
} finally {
super.onError(t);
rxTracer.onError(t);
}
}

Expand All @@ -70,7 +70,7 @@ public void onComplete() {
try {
observer.onComplete();
} finally {
super.onComplete();
rxTracer.onComplete();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright 2017-2019 The OpenTracing Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.opentracing.rxjava2;

import io.opentracing.Tracer;
import io.reactivex.FlowableSubscriber;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.operators.flowable.FlowableInternalHelper;
import io.reactivex.internal.subscribers.LambdaSubscriber;
import org.reactivestreams.Subscription;

/**
* Tracing decorator for RxJava {@link FlowableSubscriber}
*/
public class TracingSubscriber<T> implements FlowableSubscriber<T>, Subscription {

private Subscription upstream;
private final RxTracer rxTracer;
private final FlowableSubscriber<T> subscriber;

private TracingSubscriber(FlowableSubscriber<T> subscriber, String operationName, Tracer tracer) {
rxTracer = new RxTracer(operationName, tracer);
this.subscriber = subscriber;
}

@Override
public void request(long l) {
upstream.request(l);
}

@Override
public void cancel() {
upstream.cancel();
}

@Override
public void onSubscribe(Subscription s) {
upstream = s;
try {
subscriber.onSubscribe(this);
} finally {
rxTracer.onSubscribe();
}
}

@Override
public void onNext(T o) {
subscriber.onNext(o);
}

@Override
public void onError(Throwable t) {
try {
subscriber.onError(t);
} finally {
rxTracer.onError(t);
}
}

@Override
public void onComplete() {
try {
subscriber.onComplete();
} finally {
rxTracer.onComplete();
}
}

public static <T> FlowableSubscriber<T> create(
String operationName,
Tracer tracer) {
return create(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION,
FlowableInternalHelper.RequestMax.INSTANCE, operationName, tracer);
}

public static <T> FlowableSubscriber<T> create(
Consumer<? super T> onNext,
String operationName,
Tracer tracer) {
return create(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION,
FlowableInternalHelper.RequestMax.INSTANCE, operationName, tracer);
}

public static <T> FlowableSubscriber<T> create(
Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
String operationName,
Tracer tracer) {
return create(onNext, onError, Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE,
operationName, tracer);
}

public static <T> FlowableSubscriber<T> create(
Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Action onComplete,
String operationName,
Tracer tracer) {
return create(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE, operationName, tracer);
}

public static <T> FlowableSubscriber<T> create(
Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Action onComplete,
Consumer<? super Subscription> onSubscribe,
String operationName,
Tracer tracer) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
ObjectHelper.requireNonNull(tracer, "tracer can not be null");

return create(new LambdaSubscriber<>(onNext, onError, onComplete, onSubscribe), operationName, tracer);
}

public static <T> FlowableSubscriber<T> create(
FlowableSubscriber<T> subscriber,
String operationName,
Tracer tracer) {

return new TracingSubscriber<>(subscriber, operationName, tracer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,24 @@
*/
package io.opentracing.rxjava2;

import static io.opentracing.rxjava2.AbstractTracingObserver.COMPONENT_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import io.opentracing.mock.MockSpan;
import io.opentracing.mock.MockTracer;
import io.opentracing.tag.Tags;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import static io.opentracing.rxjava2.RxTracer.COMPONENT_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

class TestUtils {

static Observable<Integer> createSequentialObservable(final MockTracer mockTracer) {
Expand Down Expand Up @@ -70,6 +73,14 @@ public boolean test(Integer integer) {
});
}

static Flowable<Integer> createSequentialFlowable(final MockTracer mockTracer) {
return createSequentialObservable(mockTracer).toFlowable(BackpressureStrategy.ERROR);
}

static Flowable<Integer> createParallelFlowable(final MockTracer mockTracer) {
return createParallelObservable(mockTracer).toFlowable(BackpressureStrategy.ERROR);
}

private static void sleep() {
try {
TimeUnit.MILLISECONDS.sleep(200L);
Expand Down
Loading