Skip to content

Commit

Permalink
Adding RxJava Support
Browse files Browse the repository at this point in the history
1) Because RxJavaPlugins is protected does not expose `reset()`, a wrapper had to be introduced. More details can be found here: ReactiveX/RxJava#2297
2) The implementation (and testing) strategy was followed per HystrixConcurrencyStrategy implementation.
  • Loading branch information
Shivang Shah committed Apr 1, 2016
1 parent 9aba475 commit 9493c55
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 2 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Expand Up @@ -15,4 +15,6 @@ _site/
*.logtjmeter
.checkstyle
.DS_Store
*.log
*.log
/spring-cloud-sleuth-core/nb-configuration.xml
/spring-cloud-sleuth-core/nbactions.xml
@@ -0,0 +1,29 @@
package org.springframework.cloud.sleuth.instrument.rxjava;

import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.sleuth.TraceKeys;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.cloud.sleuth.autoconfig.TraceAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rx.plugins.RxJavaSchedulersHook;

/**
*
* @author Shivang Shah
*/
@Configuration
@AutoConfigureAfter(TraceAutoConfiguration.class)
@ConditionalOnBean(Tracer.class)
@ConditionalOnClass(RxJavaSchedulersHook.class)
@ConditionalOnProperty(value = "spring.sleuth.rxjava.schedulers.hook.enabled", matchIfMissing = true)
public class RxJavaAutoConfiguration {

@Bean
SleuthRxJavaSchedulersHook sleuthRxJavaSchedulersHook(Tracer tracer, TraceKeys traceKeys) {
return new SleuthRxJavaSchedulersHook(tracer, traceKeys);
}
}
@@ -0,0 +1,105 @@
package org.springframework.cloud.sleuth.instrument.rxjava;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.TraceKeys;
import org.springframework.cloud.sleuth.Tracer;
import rx.functions.Action0;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaSchedulersHook;
import rx.plugins.SleuthRxJavaPlugins;

class SleuthRxJavaSchedulersHook extends RxJavaSchedulersHook {

private static final Log log = LogFactory.getLog(SleuthRxJavaSchedulersHook.class);

private static final String RXJAVA_COMPONENT = "rxjava";
private final Tracer tracer;
private final TraceKeys traceKeys;
private RxJavaSchedulersHook delegate;

SleuthRxJavaSchedulersHook(Tracer tracer, TraceKeys traceKeys) {
this.tracer = tracer;
this.traceKeys = traceKeys;
try {
this.delegate = SleuthRxJavaPlugins.getInstance().getSchedulersHook();
if (this.delegate instanceof SleuthRxJavaSchedulersHook) {
return;
}
RxJavaErrorHandler errorHandler = SleuthRxJavaPlugins.getInstance().getErrorHandler();
RxJavaObservableExecutionHook observableExecutionHook
= SleuthRxJavaPlugins.getInstance().getObservableExecutionHook();
logCurrentStateOfRxJavaPlugins(errorHandler, observableExecutionHook);
SleuthRxJavaPlugins.resetPlugins();
SleuthRxJavaPlugins.getInstance().registerSchedulersHook(this);
SleuthRxJavaPlugins.getInstance().registerErrorHandler(errorHandler);
SleuthRxJavaPlugins.getInstance().registerObservableExecutionHook(observableExecutionHook);
} catch (Exception e) {
log.error("Failed to register Sleuth RxJava SchedulersHook", e);
}
}

private void logCurrentStateOfRxJavaPlugins(RxJavaErrorHandler errorHandler,
RxJavaObservableExecutionHook observableExecutionHook) {
log.debug("Current RxJava plugins configuration is ["
+ "schedulersHook [" + this.delegate + "],"
+ "errorHandler [" + errorHandler + "],"
+ "observableExecutionHook [" + observableExecutionHook + "],"
+ "]");
log.debug("Registering Sleuth RxJava Schedulers Hook.");
}

@Override
public Action0 onSchedule(Action0 action) {
if (action instanceof TraceAction) {
return action;
}
Action0 wrappedAction = this.delegate != null
? this.delegate.onSchedule(action) : action;
if (wrappedAction instanceof TraceAction) {
return action;
}
return super.onSchedule(new TraceAction(this.tracer, this.traceKeys, wrappedAction));
}

static class TraceAction implements Action0 {

private final Action0 actual;
private Tracer tracer;
private TraceKeys traceKeys;
private Span parent;

public TraceAction(Tracer tracer, TraceKeys traceKeys, Action0 actual) {
this.tracer = tracer;
this.traceKeys = traceKeys;
this.parent = tracer.getCurrentSpan();
this.actual = actual;
}

@Override
public void call() {
Span span = this.parent;
boolean created = false;
if (span != null) {
span = this.tracer.continueSpan(span);
} else {
span = this.tracer.createSpan(RXJAVA_COMPONENT);
this.tracer.addTag(Span.SPAN_LOCAL_COMPONENT_TAG_NAME, RXJAVA_COMPONENT);
this.tracer.addTag(this.traceKeys.getAsync().getPrefix()
+ this.traceKeys.getAsync().getThreadNameKey(), Thread.currentThread().getName());
created = true;
}
try {
this.actual.call();
} finally {
if (created) {
this.tracer.close(span);
} else {
this.tracer.detach(span);
}
}
}
}
}
@@ -0,0 +1,16 @@
package rx.plugins;

/**
*
* @author Shivang Shah
*/
public class SleuthRxJavaPlugins extends RxJavaPlugins {

SleuthRxJavaPlugins() {
super();
}

public static void resetPlugins() {
getInstance().reset();
}
}
Expand Up @@ -14,7 +14,8 @@ org.springframework.cloud.sleuth.instrument.web.TraceWebAutoConfiguration,\
org.springframework.cloud.sleuth.instrument.web.client.TraceWebClientAutoConfiguration,\
org.springframework.cloud.sleuth.instrument.web.client.TraceWebAsyncClientAutoConfiguration,\
org.springframework.cloud.sleuth.instrument.web.client.feign.TraceFeignClientAutoConfiguration,\
org.springframework.cloud.sleuth.instrument.zuul.TraceZuulAutoConfiguration
org.springframework.cloud.sleuth.instrument.zuul.TraceZuulAutoConfiguration,\
org.springframework.cloud.sleuth.instrument.rxjava.RxJavaAutoConfiguration

# Environment Post Processor
org.springframework.boot.env.EnvironmentPostProcessor=\
Expand Down
@@ -0,0 +1,75 @@
package org.springframework.cloud.sleuth.instrument.rxjava;

import static org.assertj.core.api.BDDAssertions.then;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.springframework.cloud.sleuth.TraceKeys;
import org.springframework.cloud.sleuth.Tracer;
import rx.functions.Action0;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaSchedulersHook;
import rx.plugins.SleuthRxJavaPlugins;

/**
*
* @author Shivang Shah
*/
@RunWith(MockitoJUnitRunner.class)
public class SleuthRxJavaSchedulersHookTest {

@Mock
Tracer tracer;
TraceKeys traceKeys = new TraceKeys();

private static StringBuilder caller;

@Before
@After
public void setup() {
SleuthRxJavaPlugins.resetPlugins();
caller = new StringBuilder();
}

@Test
public void should_not_override_existing_custom_hooks() {
SleuthRxJavaPlugins.getInstance().registerErrorHandler(new MyRxJavaErrorHandler());
SleuthRxJavaPlugins.getInstance().registerObservableExecutionHook(new MyRxJavaObservableExecutionHook());
new SleuthRxJavaSchedulersHook(tracer, traceKeys);
then(SleuthRxJavaPlugins.getInstance().getErrorHandler()).isExactlyInstanceOf(MyRxJavaErrorHandler.class);
then(SleuthRxJavaPlugins.getInstance().getObservableExecutionHook()).isExactlyInstanceOf(MyRxJavaObservableExecutionHook.class);
}

@Test
public void should_wrap_delegates_action_in_wrapped_action_when_delegate_is_present_on_schedule() {
SleuthRxJavaPlugins.getInstance().registerSchedulersHook(new MyRxJavaSchedulersHook());
SleuthRxJavaSchedulersHook schedulersHook = new SleuthRxJavaSchedulersHook(
this.tracer, this.traceKeys);
Action0 action = schedulersHook.onSchedule(() -> {
caller = new StringBuilder("hello");
});
action.call();
then(action).isInstanceOf(SleuthRxJavaSchedulersHook.TraceAction.class);
then(caller.toString()).isEqualTo("called_from_schedulers_hook");
}

static class MyRxJavaObservableExecutionHook extends RxJavaObservableExecutionHook {
}

static class MyRxJavaSchedulersHook extends RxJavaSchedulersHook {

@Override
public Action0 onSchedule(Action0 action) {
return () -> {
caller = new StringBuilder("called_from_schedulers_hook");
};
}
}

static class MyRxJavaErrorHandler extends RxJavaErrorHandler {
}
}

0 comments on commit 9493c55

Please sign in to comment.