diff --git a/.gitignore b/.gitignore index aaa1e66e45..5f1adab5bf 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,6 @@ _site/ *.logtjmeter .checkstyle .DS_Store -*.log \ No newline at end of file +*.log +/spring-cloud-sleuth-core/nb-configuration.xml +/spring-cloud-sleuth-core/nbactions.xml \ No newline at end of file diff --git a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/rxjava/RxJavaAutoConfiguration.java b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/rxjava/RxJavaAutoConfiguration.java new file mode 100644 index 0000000000..e1d0eb8cd9 --- /dev/null +++ b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/rxjava/RxJavaAutoConfiguration.java @@ -0,0 +1,29 @@ +package org.springframework.cloud.sleuth.instrument.rxjava; + +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.cloud.sleuth.TraceKeys; +import org.springframework.cloud.sleuth.Tracer; +import org.springframework.cloud.sleuth.autoconfig.TraceAutoConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import rx.plugins.RxJavaSchedulersHook; + +/** + * + * @author Shivang Shah + */ +@Configuration +@AutoConfigureAfter(TraceAutoConfiguration.class) +@ConditionalOnBean(Tracer.class) +@ConditionalOnClass(RxJavaSchedulersHook.class) +@ConditionalOnProperty(value = "spring.sleuth.rxjava.schedulers.hook.enabled", matchIfMissing = true) +public class RxJavaAutoConfiguration { + + @Bean + SleuthRxJavaSchedulersHook sleuthRxJavaSchedulersHook(Tracer tracer, TraceKeys traceKeys) { + return new SleuthRxJavaSchedulersHook(tracer, traceKeys); + } +} diff --git a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/rxjava/SleuthRxJavaSchedulersHook.java b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/rxjava/SleuthRxJavaSchedulersHook.java new file mode 100644 index 0000000000..9eec081c94 --- /dev/null +++ b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/rxjava/SleuthRxJavaSchedulersHook.java @@ -0,0 +1,105 @@ +package org.springframework.cloud.sleuth.instrument.rxjava; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.cloud.sleuth.Span; +import org.springframework.cloud.sleuth.TraceKeys; +import org.springframework.cloud.sleuth.Tracer; +import rx.functions.Action0; +import rx.plugins.RxJavaErrorHandler; +import rx.plugins.RxJavaObservableExecutionHook; +import rx.plugins.RxJavaSchedulersHook; +import rx.plugins.SleuthRxJavaPlugins; + +class SleuthRxJavaSchedulersHook extends RxJavaSchedulersHook { + + private static final Log log = LogFactory.getLog(SleuthRxJavaSchedulersHook.class); + + private static final String RXJAVA_COMPONENT = "rxjava"; + private final Tracer tracer; + private final TraceKeys traceKeys; + private RxJavaSchedulersHook delegate; + + SleuthRxJavaSchedulersHook(Tracer tracer, TraceKeys traceKeys) { + this.tracer = tracer; + this.traceKeys = traceKeys; + try { + this.delegate = SleuthRxJavaPlugins.getInstance().getSchedulersHook(); + if (this.delegate instanceof SleuthRxJavaSchedulersHook) { + return; + } + RxJavaErrorHandler errorHandler = SleuthRxJavaPlugins.getInstance().getErrorHandler(); + RxJavaObservableExecutionHook observableExecutionHook + = SleuthRxJavaPlugins.getInstance().getObservableExecutionHook(); + logCurrentStateOfRxJavaPlugins(errorHandler, observableExecutionHook); + SleuthRxJavaPlugins.resetPlugins(); + SleuthRxJavaPlugins.getInstance().registerSchedulersHook(this); + SleuthRxJavaPlugins.getInstance().registerErrorHandler(errorHandler); + SleuthRxJavaPlugins.getInstance().registerObservableExecutionHook(observableExecutionHook); + } catch (Exception e) { + log.error("Failed to register Sleuth RxJava SchedulersHook", e); + } + } + + private void logCurrentStateOfRxJavaPlugins(RxJavaErrorHandler errorHandler, + RxJavaObservableExecutionHook observableExecutionHook) { + log.debug("Current RxJava plugins configuration is [" + + "schedulersHook [" + this.delegate + "]," + + "errorHandler [" + errorHandler + "]," + + "observableExecutionHook [" + observableExecutionHook + "]," + + "]"); + log.debug("Registering Sleuth RxJava Schedulers Hook."); + } + + @Override + public Action0 onSchedule(Action0 action) { + if (action instanceof TraceAction) { + return action; + } + Action0 wrappedAction = this.delegate != null + ? this.delegate.onSchedule(action) : action; + if (wrappedAction instanceof TraceAction) { + return action; + } + return super.onSchedule(new TraceAction(this.tracer, this.traceKeys, wrappedAction)); + } + + static class TraceAction implements Action0 { + + private final Action0 actual; + private Tracer tracer; + private TraceKeys traceKeys; + private Span parent; + + public TraceAction(Tracer tracer, TraceKeys traceKeys, Action0 actual) { + this.tracer = tracer; + this.traceKeys = traceKeys; + this.parent = tracer.getCurrentSpan(); + this.actual = actual; + } + + @Override + public void call() { + Span span = this.parent; + boolean created = false; + if (span != null) { + span = this.tracer.continueSpan(span); + } else { + span = this.tracer.createSpan(RXJAVA_COMPONENT); + this.tracer.addTag(Span.SPAN_LOCAL_COMPONENT_TAG_NAME, RXJAVA_COMPONENT); + this.tracer.addTag(this.traceKeys.getAsync().getPrefix() + + this.traceKeys.getAsync().getThreadNameKey(), Thread.currentThread().getName()); + created = true; + } + try { + this.actual.call(); + } finally { + if (created) { + this.tracer.close(span); + } else { + this.tracer.detach(span); + } + } + } + } +} diff --git a/spring-cloud-sleuth-core/src/main/java/rx/plugins/SleuthRxJavaPlugins.java b/spring-cloud-sleuth-core/src/main/java/rx/plugins/SleuthRxJavaPlugins.java new file mode 100644 index 0000000000..1669512575 --- /dev/null +++ b/spring-cloud-sleuth-core/src/main/java/rx/plugins/SleuthRxJavaPlugins.java @@ -0,0 +1,16 @@ +package rx.plugins; + +/** + * + * @author Shivang Shah + */ +public class SleuthRxJavaPlugins extends RxJavaPlugins { + + SleuthRxJavaPlugins() { + super(); + } + + public static void resetPlugins() { + getInstance().reset(); + } +} diff --git a/spring-cloud-sleuth-core/src/main/resources/META-INF/spring.factories b/spring-cloud-sleuth-core/src/main/resources/META-INF/spring.factories index f4b94cc8a1..54f0d68a84 100644 --- a/spring-cloud-sleuth-core/src/main/resources/META-INF/spring.factories +++ b/spring-cloud-sleuth-core/src/main/resources/META-INF/spring.factories @@ -14,7 +14,8 @@ org.springframework.cloud.sleuth.instrument.web.TraceWebAutoConfiguration,\ org.springframework.cloud.sleuth.instrument.web.client.TraceWebClientAutoConfiguration,\ org.springframework.cloud.sleuth.instrument.web.client.TraceWebAsyncClientAutoConfiguration,\ org.springframework.cloud.sleuth.instrument.web.client.feign.TraceFeignClientAutoConfiguration,\ -org.springframework.cloud.sleuth.instrument.zuul.TraceZuulAutoConfiguration +org.springframework.cloud.sleuth.instrument.zuul.TraceZuulAutoConfiguration,\ +org.springframework.cloud.sleuth.instrument.rxjava.RxJavaAutoConfiguration # Environment Post Processor org.springframework.boot.env.EnvironmentPostProcessor=\ diff --git a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/rxjava/SleuthRxJavaSchedulersHookTest.java b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/rxjava/SleuthRxJavaSchedulersHookTest.java new file mode 100644 index 0000000000..27e5c779cb --- /dev/null +++ b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/rxjava/SleuthRxJavaSchedulersHookTest.java @@ -0,0 +1,75 @@ +package org.springframework.cloud.sleuth.instrument.rxjava; + +import static org.assertj.core.api.BDDAssertions.then; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.springframework.cloud.sleuth.TraceKeys; +import org.springframework.cloud.sleuth.Tracer; +import rx.functions.Action0; +import rx.plugins.RxJavaErrorHandler; +import rx.plugins.RxJavaObservableExecutionHook; +import rx.plugins.RxJavaSchedulersHook; +import rx.plugins.SleuthRxJavaPlugins; + +/** + * + * @author Shivang Shah + */ +@RunWith(MockitoJUnitRunner.class) +public class SleuthRxJavaSchedulersHookTest { + + @Mock + Tracer tracer; + TraceKeys traceKeys = new TraceKeys(); + + private static StringBuilder caller; + + @Before + @After + public void setup() { + SleuthRxJavaPlugins.resetPlugins(); + caller = new StringBuilder(); + } + + @Test + public void should_not_override_existing_custom_hooks() { + SleuthRxJavaPlugins.getInstance().registerErrorHandler(new MyRxJavaErrorHandler()); + SleuthRxJavaPlugins.getInstance().registerObservableExecutionHook(new MyRxJavaObservableExecutionHook()); + new SleuthRxJavaSchedulersHook(tracer, traceKeys); + then(SleuthRxJavaPlugins.getInstance().getErrorHandler()).isExactlyInstanceOf(MyRxJavaErrorHandler.class); + then(SleuthRxJavaPlugins.getInstance().getObservableExecutionHook()).isExactlyInstanceOf(MyRxJavaObservableExecutionHook.class); + } + + @Test + public void should_wrap_delegates_action_in_wrapped_action_when_delegate_is_present_on_schedule() { + SleuthRxJavaPlugins.getInstance().registerSchedulersHook(new MyRxJavaSchedulersHook()); + SleuthRxJavaSchedulersHook schedulersHook = new SleuthRxJavaSchedulersHook( + this.tracer, this.traceKeys); + Action0 action = schedulersHook.onSchedule(() -> { + caller = new StringBuilder("hello"); + }); + action.call(); + then(action).isInstanceOf(SleuthRxJavaSchedulersHook.TraceAction.class); + then(caller.toString()).isEqualTo("called_from_schedulers_hook"); + } + + static class MyRxJavaObservableExecutionHook extends RxJavaObservableExecutionHook { + } + + static class MyRxJavaSchedulersHook extends RxJavaSchedulersHook { + + @Override + public Action0 onSchedule(Action0 action) { + return () -> { + caller = new StringBuilder("called_from_schedulers_hook"); + }; + } + } + + static class MyRxJavaErrorHandler extends RxJavaErrorHandler { + } +}