From 18c991654d17d24fbe222445ee46a69c7028bd7f Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Wed, 31 Oct 2018 16:21:35 -0700 Subject: [PATCH 1/4] WIP : Optimize Reactor instrumentation - Use onLastOperator instead of onEachOperator - Do not instrument scalar publishers - Revisit ReactorSleuthMethodInvocationProcessor to reduce ops overhead --- ...eactorSleuthMethodInvocationProcessor.java | 309 ++++++------------ .../reactor/SpanSubscriberTests.java | 32 +- 2 files changed, 124 insertions(+), 217 deletions(-) diff --git a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/annotation/ReactorSleuthMethodInvocationProcessor.java b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/annotation/ReactorSleuthMethodInvocationProcessor.java index db52db5585..960d0792cf 100644 --- a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/annotation/ReactorSleuthMethodInvocationProcessor.java +++ b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/annotation/ReactorSleuthMethodInvocationProcessor.java @@ -17,22 +17,26 @@ package org.springframework.cloud.sleuth.annotation; import java.lang.reflect.Method; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; import brave.Span; import brave.Tracer; -import brave.Tracing; -import brave.propagation.CurrentTraceContext; import org.aopalliance.intercept.MethodInvocation; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscription; + +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.cloud.sleuth.instrument.reactor.ReactorSleuth; +import org.springframework.util.StringUtils; + import reactor.core.CoreSubscriber; import reactor.core.Scannable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; import reactor.util.annotation.Nullable; -import reactor.util.context.Context; - -import org.springframework.util.StringUtils; /** * Method Invocation Processor for Reactor. @@ -45,13 +49,12 @@ class ReactorSleuthMethodInvocationProcessor private NonReactorSleuthMethodInvocationProcessor nonReactorSleuthMethodInvocationProcessor; - Tracing tracing; + private Function, ? extends Publisher> spanSubscriberTransformer; - Tracing tracing() { - if (this.tracing == null) { - this.tracing = this.beanFactory.getBean(Tracing.class); - } - return this.tracing; + @Override + public void setBeanFactory(BeanFactory beanFactory) throws BeansException { + super.setBeanFactory(beanFactory); + spanSubscriberTransformer = ReactorSleuth.scopePassingSpanOperator(beanFactory); } @Override @@ -73,90 +76,79 @@ private Object proceedUnderReactorSpan(MethodInvocation invocation, NewSpan newS Span spanPrevious = tracer().currentSpan(); // in case of @ContinueSpan and no span in tracer we start new span and should // close it on completion + boolean startNewSpan = newSpan != null || spanPrevious == null; Span span; - if (newSpan != null || spanPrevious == null) { - span = null; + if (startNewSpan) { + span = tracer().nextSpan(); + newSpanParser().parse(invocation, newSpan, span); } else { span = spanPrevious; } String log = log(continueSpan); - Publisher publisher = (Publisher) invocation.proceed(); - - if (publisher instanceof Mono) { - return new MonoSpan((Mono) publisher, - this, - newSpan, - span, - invocation, - log); - } - else if (publisher instanceof Flux) { - return new FluxSpan((Flux) publisher, - this, - newSpan, - span, - invocation, - log); - } - else { - throw new IllegalArgumentException("Unexpected type of publisher: " + publisher.getClass()); + try (Tracer.SpanInScope ws = tracer().withSpanInScope(span)) { + Publisher publisher = (Publisher) invocation.proceed(); + + if (publisher instanceof Mono) { + return new MonoTrace((Mono)publisher, startNewSpan, span, invocation, log) + .transform(spanSubscriberTransformer) + // put span in context so it can be used by + // ScopePassingSpanSubscriber + .subscriberContext(context -> context.put(Span.class, span)); + } + else if (publisher instanceof Flux) { + return new FluxTrace((Flux)publisher, startNewSpan, span, invocation, log) + .transform(spanSubscriberTransformer) + // put span in context so it can be used by + // ScopePassingSpanSubscriber + .subscriberContext(context -> context.put(Span.class, span)); + } + else { + throw new IllegalArgumentException( + "Unexpected type of publisher: " + publisher.getClass()); + } } } - private static final class FluxSpan extends Flux implements Scannable { - - final Flux source; - final Span span; - final MethodInvocation invocation; - final String log; - final boolean hasLog; - final ReactorSleuthMethodInvocationProcessor processor; - final NewSpan newSpan; + private final class FluxTrace extends Flux implements Scannable { + final Flux source; + final boolean startNewSpan; + final Span span; + final MethodInvocation invocation; + final String log; + final boolean hasLog; - FluxSpan(Flux source, - ReactorSleuthMethodInvocationProcessor processor, - NewSpan newSpan, - @Nullable Span span, + FluxTrace(Flux source, + boolean startNewSpan, + Span span, MethodInvocation invocation, String log) { this.source = source; + this.startNewSpan = startNewSpan; this.span = span; - this.newSpan = newSpan; this.invocation = invocation; this.log = log; this.hasLog = StringUtils.hasText(log); - this.processor = processor; } @Override - public void subscribe(CoreSubscriber actual) { - Span span; - Tracer tracer = this.processor.tracer(); - if (this.span == null) { - span = tracer.nextSpan(); - this.processor.newSpanParser().parse(invocation, newSpan, span); - span.start(); - } - else { - span = this.span; - } - try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) { - this.source.subscribe(new SpanSubscriber(actual, - this.processor, - this.invocation, - this.span == null, - span, - this.log, - this.hasLog)); + public void subscribe(CoreSubscriber subscriber) { + try (Tracer.SpanInScope ws = tracer().withSpanInScope(span)) { + if (startNewSpan) { + span.start(); + } + before(invocation, span, log, hasLog); + source.doOnError(onFailureReactor(log, hasLog, span)) + .doFinally(afterReactor(startNewSpan, log, hasLog, span)) + .subscribe(subscriber); } } @Nullable public Object scanUnsafe(Attr key) { if (key == Attr.PARENT) { - return this.source; + return Scannable.from(source); } else { return null; @@ -164,25 +156,21 @@ public Object scanUnsafe(Attr key) { } } - private static final class MonoSpan extends Mono implements Scannable { - - final Mono source; - final Span span; - final MethodInvocation invocation; - final String log; - final boolean hasLog; - final ReactorSleuthMethodInvocationProcessor processor; - final NewSpan newSpan; + private final class MonoTrace extends Mono implements Scannable { + final Mono source; + final boolean startNewSpan; + final Span span; + final MethodInvocation invocation; + final String log; + final boolean hasLog; - MonoSpan(Mono source, - ReactorSleuthMethodInvocationProcessor processor, - NewSpan newSpan, - @Nullable Span span, + MonoTrace(Mono source, + boolean startNewSpan, + Span span, MethodInvocation invocation, String log) { this.source = source; - this.processor = processor; - this.newSpan = newSpan; + this.startNewSpan = startNewSpan; this.span = span; this.invocation = invocation; this.log = log; @@ -190,32 +178,24 @@ private static final class MonoSpan extends Mono implements Scannable { } @Override - public void subscribe(CoreSubscriber actual) { - Span span; - Tracer tracer = this.processor.tracer(); - if (this.span == null) { - span = tracer.nextSpan(); - this.processor.newSpanParser().parse(invocation, newSpan, span); - span.start(); - } - else { - span = this.span; - } - try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) { - this.source.subscribe(new SpanSubscriber(actual, - this.processor, - this.invocation, - this.span == null, - span, - this.log, - this.hasLog)); + public void subscribe(CoreSubscriber subscriber) { + try (Tracer.SpanInScope ws = tracer().withSpanInScope(span)) { + if (startNewSpan) { + span.start(); + } + before(invocation, span, log, hasLog); + source.doAfterSuccessOrError(afterMonoReactor(startNewSpan, + log, + hasLog, + span)) + .subscribe(subscriber); } } @Nullable public Object scanUnsafe(Attr key) { if (key == Attr.PARENT) { - return this.source; + return Scannable.from(source); } else { return null; @@ -223,115 +203,30 @@ public Object scanUnsafe(Attr key) { } } - private static final class SpanSubscriber implements CoreSubscriber, - Subscription, - Scannable { - - final CoreSubscriber actual; - final boolean isNewSpan; - final Span span; - final String log; - final boolean hasLog; - final CurrentTraceContext currentTraceContext; - final ReactorSleuthMethodInvocationProcessor processor; - final Context context; - - Subscription parent; - - SpanSubscriber(CoreSubscriber actual, - ReactorSleuthMethodInvocationProcessor processor, - MethodInvocation invocation, - boolean isNewSpan, - Span span, - String log, - boolean hasLog) { - this.actual = actual; - this.isNewSpan = isNewSpan; - this.span = span; - this.log = log; - this.hasLog = hasLog; - this.processor = processor; - - this.currentTraceContext = processor.tracing().currentTraceContext(); - this.context = actual.currentContext().put(Span.class, span); - - processor.before(invocation, this.span, this.log, this.hasLog); - } + private Consumer afterReactor(boolean isNewSpan, String log, + boolean hasLog, Span span) { + return signalType -> { + after(span, isNewSpan, log, hasLog); - @Override - public void request(long n) { - try (CurrentTraceContext.Scope scope = this.currentTraceContext - .maybeScope(this.span.context())) { - this.parent.request(n); - } - } - - @Override - public void cancel() { - try (CurrentTraceContext.Scope scope = this.currentTraceContext - .maybeScope(this.span.context())) { - this.parent.cancel(); - } - finally { - this.processor.after(this.span, this.isNewSpan, this.log, this.hasLog); - } - } - - @Override - public Context currentContext() { - return context; - } + }; + } - @Override - public void onSubscribe(Subscription subscription) { - this.parent = subscription; - try (CurrentTraceContext.Scope scope = this.currentTraceContext - .maybeScope(this.span.context())) { - this.actual.onSubscribe(this); - } - } + private Consumer onFailureReactor(String log, boolean hasLog, Span span) { + return throwable -> { + onFailure(span, log, hasLog, throwable); - @Override - public void onNext(Object o) { - try (CurrentTraceContext.Scope scope = this.currentTraceContext - .maybeScope(this.span.context())) { - this.actual.onNext(o); - } - } - - @Override - public void onError(Throwable error) { - try (CurrentTraceContext.Scope scope = this.currentTraceContext - .maybeScope(this.span.context())) { - this.processor.onFailure(this.span, this.log, this.hasLog, error); - this.actual.onError(error); - } - finally { - this.processor.after(this.span, this.isNewSpan, this.log, this.hasLog); - } - } + }; + } - @Override - public void onComplete() { - try (CurrentTraceContext.Scope scope = this.currentTraceContext - .maybeScope(this.span.context())) { - this.actual.onComplete(); + private BiConsumer afterMonoReactor(boolean isNewSpan, String log, + boolean hasLog, Span span) { + return (data, error) -> { + if (error != null) { + onFailure(span, log, hasLog, error); } - finally { - this.processor.after(this.span, this.isNewSpan, this.log, this.hasLog); - } - } + after(span, isNewSpan, log, hasLog); - @Override - public Object scanUnsafe(Attr key) { - if (key == Attr.ACTUAL) { - return this.actual; - } - if (key == Attr.PARENT) { - return this.parent; - } - return null; - } + }; } private boolean isReactorReturnType(Class returnType) { diff --git a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriberTests.java b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriberTests.java index 205884669f..3a488672f0 100644 --- a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriberTests.java +++ b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriberTests.java @@ -17,25 +17,31 @@ package org.springframework.cloud.sleuth.instrument.reactor; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Function; import brave.Span; import brave.Tracer; import brave.sampler.Sampler; +import org.reactivestreams.Subscriber; +import reactor.core.CoreSubscriber; +import reactor.core.Scannable; +import reactor.core.publisher.BaseSubscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Hooks; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Operators; +import reactor.core.scheduler.Schedulers; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.awaitility.Awaitility; +import org.junit.AfterClass; import org.junit.Test; import org.junit.runner.RunWith; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import reactor.core.CoreSubscriber; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Hooks; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; +import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; @@ -206,8 +212,8 @@ public void should_pass_tracing_info_when_using_reactor_async() { }).map(d -> d + 1).blockLast(); Awaitility.await().untilAsserted(() -> { - then(spanInOperation.get().context().traceId()) - .isEqualTo(span.context().traceId()); + then(spanInOperation.get().context().spanId()) + .isEqualTo(span.context().spanId()); }); then(this.tracer.currentSpan()).isEqualTo(span); } @@ -227,8 +233,8 @@ public void should_pass_tracing_info_when_using_reactor_async() { then(this.tracer.currentSpan()).isEqualTo(foo2); // parent cause there's an async span in the meantime - then(spanInOperation.get().context().traceId()) - .isEqualTo(foo2.context().traceId()); + then(spanInOperation.get().context().spanId()) + .isEqualTo(foo2.context().spanId()); } finally { foo2.finish(); @@ -303,6 +309,12 @@ public void checkTraceIdFromSubscriberContext() { then(spanInSubscriberContext).hasValue(initSpan.context().spanId()); // ok here } + @AfterClass + public static void cleanup() { + Hooks.resetOnLastOperator(); + Schedulers.resetFactory(); + } + @EnableAutoConfiguration @Configuration static class Config { From a9383f3305a85d68cd402aa00422c382a7b73804 Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Fri, 2 Nov 2018 08:59:34 -0700 Subject: [PATCH 2/4] Optimize Reactor instrumentation - Use only one operator for the MethodInvocationProcessor - Warning Behavior Change: @NewSpan will defer Span creation/context --- ...eactorSleuthMethodInvocationProcessor.java | 309 ++++++++++++------ .../reactor/SpanSubscriberTests.java | 32 +- 2 files changed, 217 insertions(+), 124 deletions(-) diff --git a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/annotation/ReactorSleuthMethodInvocationProcessor.java b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/annotation/ReactorSleuthMethodInvocationProcessor.java index 960d0792cf..db52db5585 100644 --- a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/annotation/ReactorSleuthMethodInvocationProcessor.java +++ b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/annotation/ReactorSleuthMethodInvocationProcessor.java @@ -17,26 +17,22 @@ package org.springframework.cloud.sleuth.annotation; import java.lang.reflect.Method; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.function.Function; import brave.Span; import brave.Tracer; +import brave.Tracing; +import brave.propagation.CurrentTraceContext; import org.aopalliance.intercept.MethodInvocation; import org.reactivestreams.Publisher; - -import org.springframework.beans.BeansException; -import org.springframework.beans.factory.BeanFactory; -import org.springframework.cloud.sleuth.instrument.reactor.ReactorSleuth; -import org.springframework.util.StringUtils; - +import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Scannable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.SignalType; import reactor.util.annotation.Nullable; +import reactor.util.context.Context; + +import org.springframework.util.StringUtils; /** * Method Invocation Processor for Reactor. @@ -49,12 +45,13 @@ class ReactorSleuthMethodInvocationProcessor private NonReactorSleuthMethodInvocationProcessor nonReactorSleuthMethodInvocationProcessor; - private Function, ? extends Publisher> spanSubscriberTransformer; + Tracing tracing; - @Override - public void setBeanFactory(BeanFactory beanFactory) throws BeansException { - super.setBeanFactory(beanFactory); - spanSubscriberTransformer = ReactorSleuth.scopePassingSpanOperator(beanFactory); + Tracing tracing() { + if (this.tracing == null) { + this.tracing = this.beanFactory.getBean(Tracing.class); + } + return this.tracing; } @Override @@ -76,79 +73,90 @@ private Object proceedUnderReactorSpan(MethodInvocation invocation, NewSpan newS Span spanPrevious = tracer().currentSpan(); // in case of @ContinueSpan and no span in tracer we start new span and should // close it on completion - boolean startNewSpan = newSpan != null || spanPrevious == null; Span span; - if (startNewSpan) { - span = tracer().nextSpan(); - newSpanParser().parse(invocation, newSpan, span); + if (newSpan != null || spanPrevious == null) { + span = null; } else { span = spanPrevious; } String log = log(continueSpan); - try (Tracer.SpanInScope ws = tracer().withSpanInScope(span)) { - Publisher publisher = (Publisher) invocation.proceed(); - - if (publisher instanceof Mono) { - return new MonoTrace((Mono)publisher, startNewSpan, span, invocation, log) - .transform(spanSubscriberTransformer) - // put span in context so it can be used by - // ScopePassingSpanSubscriber - .subscriberContext(context -> context.put(Span.class, span)); - } - else if (publisher instanceof Flux) { - return new FluxTrace((Flux)publisher, startNewSpan, span, invocation, log) - .transform(spanSubscriberTransformer) - // put span in context so it can be used by - // ScopePassingSpanSubscriber - .subscriberContext(context -> context.put(Span.class, span)); - } - else { - throw new IllegalArgumentException( - "Unexpected type of publisher: " + publisher.getClass()); - } + Publisher publisher = (Publisher) invocation.proceed(); + + if (publisher instanceof Mono) { + return new MonoSpan((Mono) publisher, + this, + newSpan, + span, + invocation, + log); + } + else if (publisher instanceof Flux) { + return new FluxSpan((Flux) publisher, + this, + newSpan, + span, + invocation, + log); + } + else { + throw new IllegalArgumentException("Unexpected type of publisher: " + publisher.getClass()); } } - private final class FluxTrace extends Flux implements Scannable { - final Flux source; - final boolean startNewSpan; - final Span span; - final MethodInvocation invocation; - final String log; - final boolean hasLog; + private static final class FluxSpan extends Flux implements Scannable { - FluxTrace(Flux source, - boolean startNewSpan, - Span span, + final Flux source; + final Span span; + final MethodInvocation invocation; + final String log; + final boolean hasLog; + final ReactorSleuthMethodInvocationProcessor processor; + final NewSpan newSpan; + + FluxSpan(Flux source, + ReactorSleuthMethodInvocationProcessor processor, + NewSpan newSpan, + @Nullable Span span, MethodInvocation invocation, String log) { this.source = source; - this.startNewSpan = startNewSpan; this.span = span; + this.newSpan = newSpan; this.invocation = invocation; this.log = log; this.hasLog = StringUtils.hasText(log); + this.processor = processor; } @Override - public void subscribe(CoreSubscriber subscriber) { - try (Tracer.SpanInScope ws = tracer().withSpanInScope(span)) { - if (startNewSpan) { - span.start(); - } - before(invocation, span, log, hasLog); - source.doOnError(onFailureReactor(log, hasLog, span)) - .doFinally(afterReactor(startNewSpan, log, hasLog, span)) - .subscribe(subscriber); + public void subscribe(CoreSubscriber actual) { + Span span; + Tracer tracer = this.processor.tracer(); + if (this.span == null) { + span = tracer.nextSpan(); + this.processor.newSpanParser().parse(invocation, newSpan, span); + span.start(); + } + else { + span = this.span; + } + try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) { + this.source.subscribe(new SpanSubscriber(actual, + this.processor, + this.invocation, + this.span == null, + span, + this.log, + this.hasLog)); } } @Nullable public Object scanUnsafe(Attr key) { if (key == Attr.PARENT) { - return Scannable.from(source); + return this.source; } else { return null; @@ -156,21 +164,25 @@ public Object scanUnsafe(Attr key) { } } - private final class MonoTrace extends Mono implements Scannable { - final Mono source; - final boolean startNewSpan; - final Span span; - final MethodInvocation invocation; - final String log; - final boolean hasLog; + private static final class MonoSpan extends Mono implements Scannable { - MonoTrace(Mono source, - boolean startNewSpan, - Span span, + final Mono source; + final Span span; + final MethodInvocation invocation; + final String log; + final boolean hasLog; + final ReactorSleuthMethodInvocationProcessor processor; + final NewSpan newSpan; + + MonoSpan(Mono source, + ReactorSleuthMethodInvocationProcessor processor, + NewSpan newSpan, + @Nullable Span span, MethodInvocation invocation, String log) { this.source = source; - this.startNewSpan = startNewSpan; + this.processor = processor; + this.newSpan = newSpan; this.span = span; this.invocation = invocation; this.log = log; @@ -178,24 +190,32 @@ private final class MonoTrace extends Mono implements Scannable { } @Override - public void subscribe(CoreSubscriber subscriber) { - try (Tracer.SpanInScope ws = tracer().withSpanInScope(span)) { - if (startNewSpan) { - span.start(); - } - before(invocation, span, log, hasLog); - source.doAfterSuccessOrError(afterMonoReactor(startNewSpan, - log, - hasLog, - span)) - .subscribe(subscriber); + public void subscribe(CoreSubscriber actual) { + Span span; + Tracer tracer = this.processor.tracer(); + if (this.span == null) { + span = tracer.nextSpan(); + this.processor.newSpanParser().parse(invocation, newSpan, span); + span.start(); + } + else { + span = this.span; + } + try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) { + this.source.subscribe(new SpanSubscriber(actual, + this.processor, + this.invocation, + this.span == null, + span, + this.log, + this.hasLog)); } } @Nullable public Object scanUnsafe(Attr key) { if (key == Attr.PARENT) { - return Scannable.from(source); + return this.source; } else { return null; @@ -203,30 +223,115 @@ public Object scanUnsafe(Attr key) { } } - private Consumer afterReactor(boolean isNewSpan, String log, - boolean hasLog, Span span) { - return signalType -> { - after(span, isNewSpan, log, hasLog); + private static final class SpanSubscriber implements CoreSubscriber, + Subscription, + Scannable { - }; - } + final CoreSubscriber actual; + final boolean isNewSpan; + final Span span; + final String log; + final boolean hasLog; + final CurrentTraceContext currentTraceContext; + final ReactorSleuthMethodInvocationProcessor processor; + final Context context; - private Consumer onFailureReactor(String log, boolean hasLog, Span span) { - return throwable -> { - onFailure(span, log, hasLog, throwable); + Subscription parent; - }; - } + SpanSubscriber(CoreSubscriber actual, + ReactorSleuthMethodInvocationProcessor processor, + MethodInvocation invocation, + boolean isNewSpan, + Span span, + String log, + boolean hasLog) { + this.actual = actual; + this.isNewSpan = isNewSpan; + this.span = span; + this.log = log; + this.hasLog = hasLog; + this.processor = processor; - private BiConsumer afterMonoReactor(boolean isNewSpan, String log, - boolean hasLog, Span span) { - return (data, error) -> { - if (error != null) { - onFailure(span, log, hasLog, error); + this.currentTraceContext = processor.tracing().currentTraceContext(); + this.context = actual.currentContext().put(Span.class, span); + + processor.before(invocation, this.span, this.log, this.hasLog); + } + + @Override + public void request(long n) { + try (CurrentTraceContext.Scope scope = this.currentTraceContext + .maybeScope(this.span.context())) { + this.parent.request(n); } - after(span, isNewSpan, log, hasLog); + } + + @Override + public void cancel() { + try (CurrentTraceContext.Scope scope = this.currentTraceContext + .maybeScope(this.span.context())) { + this.parent.cancel(); + } + finally { + this.processor.after(this.span, this.isNewSpan, this.log, this.hasLog); + } + } + + @Override + public Context currentContext() { + return context; + } + + @Override + public void onSubscribe(Subscription subscription) { + this.parent = subscription; + try (CurrentTraceContext.Scope scope = this.currentTraceContext + .maybeScope(this.span.context())) { + this.actual.onSubscribe(this); + } + } + + @Override + public void onNext(Object o) { + try (CurrentTraceContext.Scope scope = this.currentTraceContext + .maybeScope(this.span.context())) { + this.actual.onNext(o); + } + } - }; + @Override + public void onError(Throwable error) { + try (CurrentTraceContext.Scope scope = this.currentTraceContext + .maybeScope(this.span.context())) { + this.processor.onFailure(this.span, this.log, this.hasLog, error); + this.actual.onError(error); + } + finally { + this.processor.after(this.span, this.isNewSpan, this.log, this.hasLog); + } + } + + @Override + public void onComplete() { + try (CurrentTraceContext.Scope scope = this.currentTraceContext + .maybeScope(this.span.context())) { + this.actual.onComplete(); + } + finally { + this.processor.after(this.span, this.isNewSpan, this.log, this.hasLog); + } + } + + @Override + public Object scanUnsafe(Attr key) { + if (key == Attr.ACTUAL) { + return this.actual; + } + if (key == Attr.PARENT) { + return this.parent; + } + return null; + } } private boolean isReactorReturnType(Class returnType) { diff --git a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriberTests.java b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriberTests.java index 3a488672f0..205884669f 100644 --- a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriberTests.java +++ b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriberTests.java @@ -17,31 +17,25 @@ package org.springframework.cloud.sleuth.instrument.reactor; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.function.Function; import brave.Span; import brave.Tracer; import brave.sampler.Sampler; -import org.reactivestreams.Subscriber; -import reactor.core.CoreSubscriber; -import reactor.core.Scannable; -import reactor.core.publisher.BaseSubscriber; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Hooks; -import reactor.core.publisher.Mono; -import reactor.core.publisher.Operators; -import reactor.core.scheduler.Schedulers; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.awaitility.Awaitility; -import org.junit.AfterClass; import org.junit.Test; import org.junit.runner.RunWith; import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Hooks; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; -import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; @@ -212,8 +206,8 @@ public void should_pass_tracing_info_when_using_reactor_async() { }).map(d -> d + 1).blockLast(); Awaitility.await().untilAsserted(() -> { - then(spanInOperation.get().context().spanId()) - .isEqualTo(span.context().spanId()); + then(spanInOperation.get().context().traceId()) + .isEqualTo(span.context().traceId()); }); then(this.tracer.currentSpan()).isEqualTo(span); } @@ -233,8 +227,8 @@ public void should_pass_tracing_info_when_using_reactor_async() { then(this.tracer.currentSpan()).isEqualTo(foo2); // parent cause there's an async span in the meantime - then(spanInOperation.get().context().spanId()) - .isEqualTo(foo2.context().spanId()); + then(spanInOperation.get().context().traceId()) + .isEqualTo(foo2.context().traceId()); } finally { foo2.finish(); @@ -309,12 +303,6 @@ public void checkTraceIdFromSubscriberContext() { then(spanInSubscriberContext).hasValue(initSpan.context().spanId()); // ok here } - @AfterClass - public static void cleanup() { - Hooks.resetOnLastOperator(); - Schedulers.resetFactory(); - } - @EnableAutoConfiguration @Configuration static class Config { From 07cc3bdf9e7418fc1349793a10913cea8b63cafe Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Tue, 6 Nov 2018 13:24:50 -0800 Subject: [PATCH 3/4] Optimize Reactor Optimization - Reduce WebFilter instrumentation to 1 operator - Reduce WebClient instrumentation to 2 operators (1 resp, 1 body) --- ...eactorSleuthMethodInvocationProcessor.java | 139 ++++----- .../instrument/reactor/ReactorSleuth.java | 39 +-- .../reactor/ScopePassingSpanSubscriber.java | 4 +- .../sleuth/instrument/web/TraceWebFilter.java | 286 +++++++++++------- .../TraceWebClientBeanPostProcessor.java | 250 ++++++++++----- .../ScopePassingSpanSubscriberTests.java | 9 +- .../reactor/SpanSubscriberTests.java | 74 +++-- .../instrument/web/TraceWebFluxTests.java | 3 +- 8 files changed, 468 insertions(+), 336 deletions(-) diff --git a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/annotation/ReactorSleuthMethodInvocationProcessor.java b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/annotation/ReactorSleuthMethodInvocationProcessor.java index db52db5585..a5c66758da 100644 --- a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/annotation/ReactorSleuthMethodInvocationProcessor.java +++ b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/annotation/ReactorSleuthMethodInvocationProcessor.java @@ -28,7 +28,9 @@ import reactor.core.CoreSubscriber; import reactor.core.Scannable; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxOperator; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoOperator; import reactor.util.annotation.Nullable; import reactor.util.context.Context; @@ -85,43 +87,37 @@ private Object proceedUnderReactorSpan(MethodInvocation invocation, NewSpan newS Publisher publisher = (Publisher) invocation.proceed(); if (publisher instanceof Mono) { - return new MonoSpan((Mono) publisher, - this, - newSpan, - span, - invocation, + return new MonoSpan((Mono) publisher, this, newSpan, span, invocation, log); } else if (publisher instanceof Flux) { - return new FluxSpan((Flux) publisher, - this, - newSpan, - span, - invocation, + return new FluxSpan((Flux) publisher, this, newSpan, span, invocation, log); } else { - throw new IllegalArgumentException("Unexpected type of publisher: " + publisher.getClass()); + throw new IllegalArgumentException( + "Unexpected type of publisher: " + publisher.getClass()); } } - private static final class FluxSpan extends Flux implements Scannable { + private static final class FluxSpan extends FluxOperator { + + final Span span; + + final MethodInvocation invocation; + + final String log; + + final boolean hasLog; - final Flux source; - final Span span; - final MethodInvocation invocation; - final String log; - final boolean hasLog; final ReactorSleuthMethodInvocationProcessor processor; - final NewSpan newSpan; - FluxSpan(Flux source, - ReactorSleuthMethodInvocationProcessor processor, - NewSpan newSpan, - @Nullable Span span, - MethodInvocation invocation, + final NewSpan newSpan; + + FluxSpan(Flux source, ReactorSleuthMethodInvocationProcessor processor, + NewSpan newSpan, @Nullable Span span, MethodInvocation invocation, String log) { - this.source = source; + super(source); this.span = span; this.newSpan = newSpan; this.invocation = invocation; @@ -143,44 +139,31 @@ public void subscribe(CoreSubscriber actual) { span = this.span; } try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) { - this.source.subscribe(new SpanSubscriber(actual, - this.processor, - this.invocation, - this.span == null, - span, - this.log, - this.hasLog)); + this.source.subscribe(new SpanSubscriber(actual, this.processor, + this.invocation, this.span == null, span, this.log, this.hasLog)); } } - @Nullable - public Object scanUnsafe(Attr key) { - if (key == Attr.PARENT) { - return this.source; - } - else { - return null; - } - } } - private static final class MonoSpan extends Mono implements Scannable { + private static final class MonoSpan extends MonoOperator { + + final Span span; + + final MethodInvocation invocation; + + final String log; + + final boolean hasLog; - final Mono source; - final Span span; - final MethodInvocation invocation; - final String log; - final boolean hasLog; final ReactorSleuthMethodInvocationProcessor processor; - final NewSpan newSpan; - MonoSpan(Mono source, - ReactorSleuthMethodInvocationProcessor processor, - NewSpan newSpan, - @Nullable Span span, - MethodInvocation invocation, + final NewSpan newSpan; + + MonoSpan(Mono source, ReactorSleuthMethodInvocationProcessor processor, + NewSpan newSpan, @Nullable Span span, MethodInvocation invocation, String log) { - this.source = source; + super(source); this.processor = processor; this.newSpan = newSpan; this.span = span; @@ -202,48 +185,37 @@ public void subscribe(CoreSubscriber actual) { span = this.span; } try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) { - this.source.subscribe(new SpanSubscriber(actual, - this.processor, - this.invocation, - this.span == null, - span, - this.log, - this.hasLog)); + this.source.subscribe(new SpanSubscriber(actual, this.processor, + this.invocation, this.span == null, span, this.log, this.hasLog)); } } - @Nullable - public Object scanUnsafe(Attr key) { - if (key == Attr.PARENT) { - return this.source; - } - else { - return null; - } - } } - private static final class SpanSubscriber implements CoreSubscriber, - Subscription, - Scannable { + private static final class SpanSubscriber + implements CoreSubscriber, Subscription, Scannable { + + final CoreSubscriber actual; + + final boolean isNewSpan; + + final Span span; + + final String log; + + final boolean hasLog; + + final CurrentTraceContext currentTraceContext; - final CoreSubscriber actual; - final boolean isNewSpan; - final Span span; - final String log; - final boolean hasLog; - final CurrentTraceContext currentTraceContext; final ReactorSleuthMethodInvocationProcessor processor; - final Context context; + + final Context context; Subscription parent; SpanSubscriber(CoreSubscriber actual, ReactorSleuthMethodInvocationProcessor processor, - MethodInvocation invocation, - boolean isNewSpan, - Span span, - String log, + MethodInvocation invocation, boolean isNewSpan, Span span, String log, boolean hasLog) { this.actual = actual; this.isNewSpan = isNewSpan; @@ -332,6 +304,7 @@ public Object scanUnsafe(Attr key) { } return null; } + } private boolean isReactorReturnType(Class returnType) { diff --git a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/ReactorSleuth.java b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/ReactorSleuth.java index 1be32c46e9..82b5840b34 100644 --- a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/ReactorSleuth.java +++ b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/ReactorSleuth.java @@ -64,14 +64,12 @@ private ReactorSleuth() { log.trace("Scope passing operator [" + beanFactory + "]"); } - //Adapt if lazy bean factory - BooleanSupplier isActive = - beanFactory instanceof ConfigurableApplicationContext ? - ((ConfigurableApplicationContext) beanFactory)::isActive : - () -> true; + // Adapt if lazy bean factory + BooleanSupplier isActive = beanFactory instanceof ConfigurableApplicationContext + ? ((ConfigurableApplicationContext) beanFactory)::isActive : () -> true; return Operators.liftPublisher((p, sub) -> { - //if Flux/Mono #just, #empty, #error + // if Flux/Mono #just, #empty, #error if (p instanceof Fuseable.ScalarCallable) { return sub; } @@ -81,40 +79,43 @@ private ReactorSleuth() { if (log.isTraceEnabled()) { log.trace("Spring Context [" + beanFactory + "] already refreshed. Creating a scope " - + "passing span subscriber with Reactor Context " - + "[" + sub.currentContext() + "] and name [" - + scannable.name() + "]"); + + "passing span subscriber with Reactor Context " + "[" + + sub.currentContext() + "] and name [" + scannable.name() + + "]"); } - return scopePassingSpanSubscription(beanFactory.getBean(Tracing.class), sub); + return scopePassingSpanSubscription(beanFactory.getBean(Tracing.class), + sub); } if (log.isTraceEnabled()) { log.trace("Spring Context [" + beanFactory + "] is not yet refreshed, falling back to lazy span subscriber. Reactor Context is [" - + sub.currentContext() + "] and name is [" - + scannable.name() + "]"); + + sub.currentContext() + "] and name is [" + scannable.name() + + "]"); } - return new LazySpanSubscriber<>(lazyScopePassingSpanSubscription(beanFactory, scannable, sub)); + return new LazySpanSubscriber<>( + lazyScopePassingSpanSubscription(beanFactory, scannable, sub)); }); } static SpanSubscriptionProvider lazyScopePassingSpanSubscription( BeanFactory beanFactory, Scannable scannable, CoreSubscriber sub) { - return new SpanSubscriptionProvider<>(beanFactory, sub, sub.currentContext(), scannable.name()); + return new SpanSubscriptionProvider<>(beanFactory, sub, sub.currentContext(), + scannable.name()); } - - static CoreSubscriber scopePassingSpanSubscription( - Tracing tracing, CoreSubscriber sub) { + static CoreSubscriber scopePassingSpanSubscription(Tracing tracing, + CoreSubscriber sub) { Context context = sub.currentContext(); - Span root = context.hasKey(Span.class) ? context.get(Span.class) : tracing.tracer().currentSpan(); + Span root = context.hasKey(Span.class) ? context.get(Span.class) + : tracing.tracer().currentSpan(); if (root != null) { return new ScopePassingSpanSubscriber<>(sub, context, tracing, root); } else { - return sub; //no need to trace + return sub; // no need to trace } } diff --git a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/ScopePassingSpanSubscriber.java b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/ScopePassingSpanSubscriber.java index 8925cecf14..fc9b02d67a 100644 --- a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/ScopePassingSpanSubscriber.java +++ b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/ScopePassingSpanSubscriber.java @@ -123,8 +123,10 @@ public Context currentContext() { public Object scanUnsafe(Attr key) { if (key == Attr.PARENT) { return this.s; - } else { + } + else { return key == Attr.ACTUAL ? this.subscriber : null; } } + } diff --git a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFilter.java b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFilter.java index a676278433..c547e1d4cd 100644 --- a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFilter.java +++ b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFilter.java @@ -25,6 +25,13 @@ import brave.propagation.TraceContextOrSamplingFlags; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoOperator; +import reactor.util.annotation.Nullable; +import reactor.util.context.Context; + import org.springframework.beans.factory.BeanFactory; import org.springframework.core.Ordered; import org.springframework.http.HttpHeaders; @@ -36,8 +43,6 @@ import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilter; import org.springframework.web.server.WebFilterChain; -import reactor.core.publisher.Mono; -import reactor.util.context.Context; /** * A {@link WebFilter} that creates / continues / closes and detaches spans for a reactive @@ -139,134 +144,181 @@ public Mono filter(ServerWebExchange exchange, WebFilterChain chain) { if (log.isDebugEnabled()) { log.debug("Received a request to uri [" + uri + "]"); } - Span spanFromAttribute = getSpanFromAttribute(exchange); - final String CONTEXT_ERROR = "sleuth.webfilter.context.error"; - return chain.filter(exchange) - .compose(f -> f.then(Mono.subscriberContext()).onErrorResume( - t -> Mono.subscriberContext().map(c -> c.put(CONTEXT_ERROR, t))) - .flatMap(c -> { - // reactivate span from context - Span span = spanFromContext(c); - Mono continuation; - Throwable t = null; - if (c.hasKey(CONTEXT_ERROR)) { - t = c.get(CONTEXT_ERROR); - continuation = Mono.error(t); - } - else { - continuation = Mono.empty(); - } - String httpRoute = null; - Object attribute = exchange.getAttribute( - HandlerMapping.BEST_MATCHING_HANDLER_ATTRIBUTE); - if (attribute instanceof HandlerMethod) { - HandlerMethod handlerMethod = (HandlerMethod) attribute; - addClassMethodTag(handlerMethod, span); - addClassNameTag(handlerMethod, span); - Object pattern = exchange.getAttribute( - HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE); - httpRoute = pattern != null ? pattern.toString() : ""; - } - addResponseTagsForSpanWithoutParent(exchange, - exchange.getResponse(), span); - DecoratedServerHttpResponse delegate = new DecoratedServerHttpResponse( - exchange.getResponse(), - exchange.getRequest().getMethodValue(), httpRoute); - handler().handleSend(delegate, t, span); - if (log.isDebugEnabled()) { - log.debug("Handled send of " + span); - } - return continuation; - }).subscriberContext(c -> { - Span span; - if (c.hasKey(Span.class)) { - Span parent = c.get(Span.class); - span = tracer().nextSpan(TraceContextOrSamplingFlags - .create(parent.context())).start(); - if (log.isDebugEnabled()) { - log.debug("Found span in reactor context" + span); - } - } - else { - if (spanFromAttribute != null) { - span = spanFromAttribute; - if (log.isDebugEnabled()) { - log.debug("Found span in attribute " + span); - } - } - else { - span = handler().handleReceive(extractor(), - exchange.getRequest().getHeaders(), - exchange.getRequest()); - if (log.isDebugEnabled()) { - log.debug("Handled receive of span " + span); - } - } - exchange.getAttributes().put(TRACE_REQUEST_ATTR, span); - } - return c.put(Span.class, span); - })); + return new MonoWebFilterTrace(chain.filter(exchange), exchange, this); } - private Span spanFromContext(Context c) { - if (c.hasKey(Span.class)) { - Span span = c.get(Span.class); - if (log.isDebugEnabled()) { - log.debug("Found span in context " + span); - } - return span; + private static class MonoWebFilterTrace extends MonoOperator { + + final ServerWebExchange exchange; + + final Tracer tracer; + + final Span attrSpan; + + final HttpServerHandler handler; + + final TraceContext.Extractor extractor; + + MonoWebFilterTrace(Mono source, ServerWebExchange exchange, + TraceWebFilter parent) { + super(source); + this.tracer = parent.tracer(); + this.extractor = parent.extractor(); + this.handler = parent.handler(); + this.exchange = exchange; + this.attrSpan = exchange.getAttribute(TRACE_REQUEST_ATTR); } - Span span = defaultSpan(); - if (log.isDebugEnabled()) { - log.debug("No span found in context. Creating a new one " + span); + + @Override + public void subscribe(CoreSubscriber subscriber) { + Context context = subscriber.currentContext(); + this.source.subscribe(new WebFilterTraceSubscriber(subscriber, context, + findOrCreateSpan(context), this)); } - return span; - } - private Span defaultSpan() { - return tracer().nextSpan().start(); - } + static final class WebFilterTraceSubscriber implements CoreSubscriber { - private void addResponseTagsForSpanWithoutParent(ServerWebExchange exchange, - ServerHttpResponse response, Span span) { - if (spanWithoutParent(exchange) && response.getStatusCode() != null - && span != null) { - span.tag(STATUS_CODE_KEY, String.valueOf(response.getStatusCode().value())); - } - } + final CoreSubscriber actual; - private Span getSpanFromAttribute(ServerWebExchange exchange) { - return exchange.getAttribute(TRACE_REQUEST_ATTR); - } + final Context context; - private boolean spanWithoutParent(ServerWebExchange exchange) { - return exchange.getAttribute(TRACE_SPAN_WITHOUT_PARENT) != null; - } + final Span span; + + final ServerWebExchange exchange; + + final HttpServerHandler handler; - private void addClassMethodTag(Object handler, Span span) { - if (handler instanceof HandlerMethod) { - String methodName = ((HandlerMethod) handler).getMethod().getName(); - span.tag(MVC_CONTROLLER_METHOD_KEY, methodName); - if (log.isDebugEnabled()) { - log.debug("Adding a method tag with value [" + methodName + "] to a span " - + span); + WebFilterTraceSubscriber(CoreSubscriber actual, Context context, + Span span, MonoWebFilterTrace parent) { + this.actual = actual; + this.span = span; + this.context = context.put(Span.class, span); + this.exchange = parent.exchange; + this.handler = parent.handler; + } + + @Override + public void onSubscribe(Subscription subscription) { + this.actual.onSubscribe(subscription); + } + + @Override + public void onNext(Void aVoid) { + // IGNORE + } + + @Override + public void onError(Throwable t) { + terminateSpan(t); + this.actual.onError(t); + } + + @Override + public void onComplete() { + terminateSpan(null); + this.actual.onComplete(); + } + + @Override + public Context currentContext() { + return this.context; + } + + private void terminateSpan(@Nullable Throwable t) { + String httpRoute = null; + Object attribute = this.exchange + .getAttribute(HandlerMapping.BEST_MATCHING_HANDLER_ATTRIBUTE); + if (attribute instanceof HandlerMethod) { + HandlerMethod handlerMethod = (HandlerMethod) attribute; + addClassMethodTag(handlerMethod, this.span); + addClassNameTag(handlerMethod, this.span); + Object pattern = this.exchange + .getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE); + httpRoute = pattern != null ? pattern.toString() : ""; + } + addResponseTagsForSpanWithoutParent(this.exchange, + this.exchange.getResponse(), this.span); + DecoratedServerHttpResponse delegate = new DecoratedServerHttpResponse( + this.exchange.getResponse(), + this.exchange.getRequest().getMethodValue(), httpRoute); + this.handler.handleSend(delegate, t, this.span); + if (log.isDebugEnabled()) { + log.debug("Handled send of " + this.span); + } + } + + private void addClassMethodTag(Object handler, Span span) { + if (handler instanceof HandlerMethod) { + String methodName = ((HandlerMethod) handler).getMethod().getName(); + span.tag(MVC_CONTROLLER_METHOD_KEY, methodName); + if (log.isDebugEnabled()) { + log.debug("Adding a method tag with value [" + methodName + + "] to a span " + span); + } + } + } + + private void addClassNameTag(Object handler, Span span) { + String className; + if (handler instanceof HandlerMethod) { + className = ((HandlerMethod) handler).getBeanType().getSimpleName(); + } + else { + className = handler.getClass().getSimpleName(); + } + if (log.isDebugEnabled()) { + log.debug("Adding a class tag with value [" + className + + "] to a span " + span); + } + span.tag(MVC_CONTROLLER_CLASS_KEY, className); + } + + private void addResponseTagsForSpanWithoutParent(ServerWebExchange exchange, + ServerHttpResponse response, Span span) { + if (spanWithoutParent(exchange) && response.getStatusCode() != null + && span != null) { + span.tag(STATUS_CODE_KEY, + String.valueOf(response.getStatusCode().value())); + } + } + + private boolean spanWithoutParent(ServerWebExchange exchange) { + return exchange.getAttribute(TRACE_SPAN_WITHOUT_PARENT) != null; } - } - } - private void addClassNameTag(Object handler, Span span) { - String className; - if (handler instanceof HandlerMethod) { - className = ((HandlerMethod) handler).getBeanType().getSimpleName(); - } - else { - className = handler.getClass().getSimpleName(); } - if (log.isDebugEnabled()) { - log.debug("Adding a class tag with value [" + className + "] to a span " - + span); + + private Span findOrCreateSpan(Context c) { + Span span; + if (c.hasKey(Span.class)) { + Span parent = c.get(Span.class); + span = tracer + .nextSpan(TraceContextOrSamplingFlags.create(parent.context())) + .start(); + if (log.isDebugEnabled()) { + log.debug("Found span in reactor context" + span); + } + } + else { + if (this.attrSpan != null) { + span = this.attrSpan; + if (log.isDebugEnabled()) { + log.debug("Found span in attribute " + span); + } + } + else { + span = this.handler.handleReceive(this.extractor, + this.exchange.getRequest().getHeaders(), + this.exchange.getRequest()); + if (log.isDebugEnabled()) { + log.debug("Handled receive of span " + span); + } + } + this.exchange.getAttributes().put(TRACE_REQUEST_ATTR, span); + } + return span; } - span.tag(MVC_CONTROLLER_CLASS_KEY, className); + } @Override diff --git a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/web/client/TraceWebClientBeanPostProcessor.java b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/web/client/TraceWebClientBeanPostProcessor.java index c117661ae4..a51daca574 100644 --- a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/web/client/TraceWebClientBeanPostProcessor.java +++ b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/web/client/TraceWebClientBeanPostProcessor.java @@ -19,20 +19,29 @@ import java.util.Collections; import java.util.List; import java.util.function.Consumer; +import java.util.function.Function; import brave.Span; import brave.Tracer; +import brave.Tracing; import brave.http.HttpClientHandler; import brave.http.HttpTracing; import brave.propagation.Propagation; import brave.propagation.TraceContext; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; import reactor.core.publisher.Mono; +import reactor.util.annotation.Nullable; +import reactor.util.context.Context; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.cloud.sleuth.instrument.reactor.ReactorSleuth; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.web.client.RestClientException; import org.springframework.web.reactive.function.client.ClientRequest; import org.springframework.web.reactive.function.client.ClientResponse; @@ -47,7 +56,7 @@ * @author Marcin Grzejszczak * @since 2.0.0 */ -class TraceWebClientBeanPostProcessor implements BeanPostProcessor { +final class TraceWebClientBeanPostProcessor implements BeanPostProcessor { private final BeanFactory beanFactory; @@ -87,7 +96,7 @@ private Consumer> addTraceExchangeFilterFunctionIfN } -class TraceExchangeFilterFunction implements ExchangeFilterFunction { +final class TraceExchangeFilterFunction implements ExchangeFilterFunction { private static final Log log = LogFactory.getLog(TraceExchangeFilterFunction.class); @@ -117,6 +126,8 @@ public static ExchangeFilterFunction create(BeanFactory beanFactory) { final BeanFactory beanFactory; + final Function, ? extends Publisher> scopePassingTransformer; + Tracer tracer; HttpTracing httpTracing; @@ -127,86 +138,185 @@ public static ExchangeFilterFunction create(BeanFactory beanFactory) { TraceExchangeFilterFunction(BeanFactory beanFactory) { this.beanFactory = beanFactory; + this.scopePassingTransformer = ReactorSleuth + .scopePassingSpanOperator(beanFactory); } @Override public Mono filter(ClientRequest request, ExchangeFunction next) { - final ClientRequest.Builder builder = ClientRequest.from(request); - Mono exchange = Mono.defer(() -> next.exchange(builder.build())) - .cast(Object.class).onErrorResume(Mono::just) - .zipWith(Mono.subscriberContext()).flatMap(anyAndContext -> { + return new MonoWebClientTrace(next, request, this); + } + + private static final class MonoWebClientTrace extends Mono { + + final ExchangeFunction next; + + final ClientRequest request; + + final Tracer tracer; + + final HttpClientHandler handler; + + final TraceContext.Injector injector; + + final Tracing tracing; + + final Function, ? extends Publisher> scopePassingTransformer; + + MonoWebClientTrace(ExchangeFunction next, ClientRequest request, + TraceExchangeFilterFunction parent) { + this.next = next; + this.request = request; + this.tracer = parent.tracer(); + this.handler = parent.handler(); + this.injector = parent.injector(); + this.tracing = parent.httpTracing().tracing(); + this.scopePassingTransformer = parent.scopePassingTransformer; + } + + @Override + public void subscribe(CoreSubscriber subscriber) { + final ClientRequest.Builder builder = ClientRequest.from(this.request); + + Context context = subscriber.currentContext(); + + this.next.exchange(builder.build()).subscribe(new WebClientTracerSubscriber( + subscriber, context, findOrCreateSpan(builder), this)); + } + + private Span findOrCreateSpan(ClientRequest.Builder builder) { + if (log.isDebugEnabled()) { + log.debug("Instrumenting WebClient call"); + } + Span clientSpan = this.handler.handleSend(this.injector, builder, + this.request, this.tracer.nextSpan()); + if (log.isDebugEnabled()) { + log.debug("Handled send of " + clientSpan); + } + return clientSpan; + } + + static final class WebClientTracerSubscriber + implements CoreSubscriber { + + final CoreSubscriber actual; + + final Context context; + + final Span span; + + final Tracer.SpanInScope ws; + + final HttpClientHandler handler; + + final Function, ? extends Publisher> scopePassingTransformer; + + final Tracing tracing; + + boolean done; + + WebClientTracerSubscriber(CoreSubscriber actual, + Context context, Span span, MonoWebClientTrace parent) { + this.actual = actual; + this.span = span; + this.handler = parent.handler; + this.tracing = parent.tracing; + this.scopePassingTransformer = parent.scopePassingTransformer; + + if (!context.hasKey(Span.class)) { + context = context.put(Span.class, span); if (log.isDebugEnabled()) { - log.debug("Wrapping the context [" + anyAndContext + "]"); + log.debug("Reactor Context got injected with the client span " + + span); } - Object any = anyAndContext.getT1(); - Span clientSpan = anyAndContext.getT2().get(CLIENT_SPAN_KEY); - Mono continuation; - final Tracer.SpanInScope ws = tracer().withSpanInScope(clientSpan); - if (any instanceof Throwable) { - continuation = Mono.error((Throwable) any); - } - else { - continuation = Mono.just((ClientResponse) any); + } + + this.context = context.put(CLIENT_SPAN_KEY, span); + this.ws = parent.tracer.withSpanInScope(span); + + } + + @Override + public void onSubscribe(Subscription subscription) { + this.actual.onSubscribe(subscription); + } + + @Override + public void onNext(ClientResponse response) { + done = true; + try { + //decorate response body + this.actual.onNext(ClientResponse.from(response) + .body(response.bodyToFlux(DataBuffer.class) + .transform(scopePassingTransformer)) + .build()); + } + finally { + terminateSpan(response, null); + } + } + + @Override + public void onError(Throwable t) { + try { + this.actual.onError(t); + } + finally { + terminateSpan(null, t); + } + } + + @Override + public void onComplete() { + try { + this.actual.onComplete(); + } + finally { + if (!done) { + terminateSpan(null, null); } - return continuation - .doAfterSuccessOrError((clientResponse, throwable1) -> { - Throwable throwable = throwable1; - if (clientResponse == null - || clientResponse.statusCode() == null) { - if (log.isDebugEnabled()) { - log.debug( - "No response was returned. Will close the span [" - + clientSpan + "]"); - } - handleReceive(clientSpan, ws, clientResponse, - throwable); - return; - } - boolean error = clientResponse.statusCode() - .is4xxClientError() - || clientResponse.statusCode().is5xxServerError(); - if (error) { - if (log.isDebugEnabled()) { - log.debug( - "Non positive status code was returned from the call. Will close the span [" - + clientSpan + "]"); - } - throwable = new RestClientException( - "Status code of the response is [" - + clientResponse.statusCode().value() - + "] and the reason is [" - + clientResponse.statusCode() - .getReasonPhrase() - + "]"); - } - handleReceive(clientSpan, ws, clientResponse, throwable); - }); - }).subscriberContext(c -> { + } + } + + @Override + public Context currentContext() { + return this.context; + } + + void handleReceive(Span clientSpan, Tracer.SpanInScope ws, + ClientResponse clientResponse, Throwable throwable) { + this.handler.handleReceive(clientResponse, throwable, clientSpan); + ws.close(); + } + + void terminateSpan(@Nullable ClientResponse clientResponse, + @Nullable Throwable throwable) { + if (clientResponse == null || clientResponse.statusCode() == null) { if (log.isDebugEnabled()) { - log.debug("Instrumenting WebClient call"); + log.debug("No response was returned. Will close the span [" + span + + "]"); } - Span parent = c.getOrDefault(Span.class, null); - Span clientSpan = handler().handleSend(injector(), builder, request, - tracer().nextSpan()); + handleReceive(span, ws, clientResponse, throwable); + return; + } + boolean error = clientResponse.statusCode().is4xxClientError() + || clientResponse.statusCode().is5xxServerError(); + if (error) { if (log.isDebugEnabled()) { - log.debug("Handled send of " + clientSpan); - } - if (parent == null) { - c = c.put(Span.class, clientSpan); - if (log.isDebugEnabled()) { - log.debug("Reactor Context got injected with the client span " - + clientSpan); - } + log.debug( + "Non positive status code was returned from the call. Will close the span [" + + span + "]"); } - return c.put(CLIENT_SPAN_KEY, clientSpan); - }); - return exchange; - } + throwable = new RestClientException("Status code of the response is [" + + clientResponse.statusCode().value() + + "] and the reason is [" + + clientResponse.statusCode().getReasonPhrase() + "]"); + } + handleReceive(span, ws, clientResponse, throwable); + } + + } - private void handleReceive(Span clientSpan, Tracer.SpanInScope ws, - ClientResponse clientResponse, Throwable throwable) { - handler().handleReceive(clientResponse, throwable, clientSpan); - ws.close(); } @SuppressWarnings("unchecked") diff --git a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/ScopePassingSpanSubscriberTests.java b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/ScopePassingSpanSubscriberTests.java index 03c867e79f..361588f3ef 100644 --- a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/ScopePassingSpanSubscriberTests.java +++ b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/ScopePassingSpanSubscriberTests.java @@ -46,9 +46,8 @@ public void should_propagate_current_context() { @Test public void should_set_empty_context_when_context_is_null() { - ScopePassingSpanSubscriber subscriber = new ScopePassingSpanSubscriber<>(null - , null, - this.tracing, null); + ScopePassingSpanSubscriber subscriber = new ScopePassingSpanSubscriber<>(null, + null, this.tracing, null); then(subscriber.currentContext().isEmpty()).isTrue(); } @@ -58,8 +57,8 @@ public void should_put_current_span_to_context() { Span span = this.tracing.tracer().nextSpan(); try (Tracer.SpanInScope ws = this.tracing.tracer() .withSpanInScope(span.start())) { - CoreSubscriber subscriber = - ReactorSleuth.scopePassingSpanSubscription(tracing, new BaseSubscriber() { + CoreSubscriber subscriber = ReactorSleuth + .scopePassingSpanSubscription(tracing, new BaseSubscriber() { }); then(subscriber.currentContext().get(Span.class)).isEqualTo(span); diff --git a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriberTests.java b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriberTests.java index 205884669f..995d42694b 100644 --- a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriberTests.java +++ b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriberTests.java @@ -88,10 +88,10 @@ public void should_support_reactor_fusion_optimization() { try (Tracer.SpanInScope ws = this.tracer.withSpanInScope(span)) { Mono.just(1).flatMap(d -> Flux.just(d + 1).collectList().map(p -> p.get(0))) - .map(d -> d + 1).map((d) -> { - spanInOperation.set(this.tracer.currentSpan()); - return d + 1; - }).map(d -> d + 1).subscribe(System.out::println); + .map(d -> d + 1).map((d) -> { + spanInOperation.set(this.tracer.currentSpan()); + return d + 1; + }).map(d -> d + 1).subscribe(System.out::println); } finally { span.finish(); @@ -106,13 +106,13 @@ public void should_not_trace_scalar_flows() { Span span = this.tracer.nextSpan().name("foo").start(); log.info("Hello"); - //Disable global hooks for local hook testing + // Disable global hooks for local hook testing Hooks.resetOnLastOperator(); try (Tracer.SpanInScope ws = this.tracer.withSpanInScope(span)) { - Function, ? extends Publisher> transformer = - ReactorSleuth.scopePassingSpanOperator(factory); + Function, ? extends Publisher> transformer = ReactorSleuth + .scopePassingSpanOperator(factory); Subscriber assertNoSpanSubscriber = new CoreSubscriber() { @Override @@ -159,26 +159,20 @@ public void onComplete() { } }; - transformer.apply(Mono.just(1).hide()) - .subscribe(assertSpanSubscriber); + transformer.apply(Mono.just(1).hide()).subscribe(assertSpanSubscriber); - transformer.apply(Mono.just(1)) - .subscribe(assertNoSpanSubscriber); + transformer.apply(Mono.just(1)).subscribe(assertNoSpanSubscriber); transformer.apply(Mono.error(new Exception()).hide()) - .subscribe(assertSpanSubscriber); - + .subscribe(assertSpanSubscriber); transformer.apply(Mono.error(new Exception())) - .subscribe(assertNoSpanSubscriber); + .subscribe(assertNoSpanSubscriber); transformer.apply(Mono.empty().hide()) - .subscribe(assertSpanSubscriber); - - transformer.apply(Mono.empty()) - .subscribe(assertNoSpanSubscriber); - + .subscribe(assertSpanSubscriber); + transformer.apply(Mono.empty()).subscribe(assertNoSpanSubscriber); } finally { @@ -198,12 +192,12 @@ public void should_pass_tracing_info_when_using_reactor_async() { try (Tracer.SpanInScope ws = this.tracer.withSpanInScope(span)) { Flux.just(1, 2, 3).publishOn(Schedulers.single()).log("reactor.1") - .map(d -> d + 1).map(d -> d + 1) - .publishOn(Schedulers.newSingle("secondThread")).log("reactor.2") - .map((d) -> { - spanInOperation.set(this.tracer.currentSpan()); - return d + 1; - }).map(d -> d + 1).blockLast(); + .map(d -> d + 1).map(d -> d + 1) + .publishOn(Schedulers.newSingle("secondThread")).log("reactor.2") + .map((d) -> { + spanInOperation.set(this.tracer.currentSpan()); + return d + 1; + }).map(d -> d + 1).blockLast(); Awaitility.await().untilAsserted(() -> { then(spanInOperation.get().context().traceId()) @@ -220,10 +214,10 @@ public void should_pass_tracing_info_when_using_reactor_async() { try (Tracer.SpanInScope ws = this.tracer.withSpanInScope(foo2)) { Flux.just(1, 2, 3).publishOn(Schedulers.single()).log("reactor.") - .map(d -> d + 1).map(d -> d + 1).map((d) -> { - spanInOperation.set(this.tracer.currentSpan()); - return d + 1; - }).map(d -> d + 1).blockLast(); + .map(d -> d + 1).map(d -> d + 1).map((d) -> { + spanInOperation.set(this.tracer.currentSpan()); + return d + 1; + }).map(d -> d + 1).blockLast(); then(this.tracer.currentSpan()).isEqualTo(foo2); // parent cause there's an async span in the meantime @@ -243,11 +237,11 @@ public void checkSequenceOfOperations() { log.info("Hello"); try (Tracer.SpanInScope ws = this.tracer.withSpanInScope(parentSpan)) { final Long spanId = Mono.fromCallable(tracer::currentSpan) - .map(span -> span.context().spanId()).block(); + .map(span -> span.context().spanId()).block(); then(spanId).isNotNull(); final Long secondSpanId = Mono.fromCallable(tracer::currentSpan) - .map(span -> span.context().spanId()).block(); + .map(span -> span.context().spanId()).block(); then(secondSpanId).isEqualTo(spanId); // different trace ids here } } @@ -260,11 +254,11 @@ public void checkTraceIdDuringZipOperation() { try (Tracer.SpanInScope ws = this.tracer.withSpanInScope(initSpan)) { Mono.fromCallable(tracer::currentSpan).map(span -> span.context().spanId()) - .doOnNext(spanInOperation::set) - .zipWith(Mono.fromCallable(tracer::currentSpan) - .map(span -> span.context().spanId()) - .doOnNext(spanInZipOperation::set)) - .block(); + .doOnNext(spanInOperation::set) + .zipWith(Mono.fromCallable(tracer::currentSpan) + .map(span -> span.context().spanId()) + .doOnNext(spanInZipOperation::set)) + .block(); } then(spanInZipOperation).hasValue(initSpan.context().spanId()); // ok here @@ -283,8 +277,8 @@ public void should_work_for_mono_just_with_flat_map() { try (Tracer.SpanInScope ws = this.tracer.withSpanInScope(initSpan)) { Mono.just("value1") - .flatMap(request -> Mono.just("value2").then(Mono.just("foo"))) - .map(a -> "qwe").block(); + .flatMap(request -> Mono.just("value2").then(Mono.just("foo"))) + .map(a -> "qwe").block(); } } @@ -296,8 +290,8 @@ public void checkTraceIdFromSubscriberContext() { try (Tracer.SpanInScope ws = this.tracer.withSpanInScope(initSpan)) { Mono.subscriberContext() - .map(context -> tracer.currentSpan().context().spanId()) - .doOnNext(spanInSubscriberContext::set).block(); + .map(context -> tracer.currentSpan().context().spanId()) + .doOnNext(spanInSubscriberContext::set).block(); } then(spanInSubscriberContext).hasValue(initSpan.context().spanId()); // ok here diff --git a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFluxTests.java b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFluxTests.java index e1d556f59e..2d6ed06307 100644 --- a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFluxTests.java +++ b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFluxTests.java @@ -552,7 +552,8 @@ public Mono newSpanInSubscriberContext() { log.info("New Span in Subscriber Context"); return Mono.subscriberContext() .doOnSuccess(context -> log.info("New Span in deferred Trace Context")) - .flatMap(context -> Mono.defer(() -> Mono.just(tracer.currentSpan().context().spanId()))); + .flatMap(context -> Mono + .defer(() -> Mono.just(tracer.currentSpan().context().spanId()))); } } \ No newline at end of file From cf3394db570efc2067bb0f71330cc76ef07f81fc Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Tue, 6 Nov 2018 13:47:34 -0800 Subject: [PATCH 4/4] Late Formatting commit - because not enough coffee --- .../instrument/reactor/SpanSubscriptionProvider.java | 6 ++++-- .../web/client/TraceWebClientBeanPostProcessor.java | 4 ++-- .../instrument/reactor/sample/FlatMapTests.java | 11 ++++++----- .../TraceWebAsyncClientAutoConfigurationTests.java | 8 ++++---- .../web/client/integration/WebClientTests.java | 3 ++- 5 files changed, 18 insertions(+), 14 deletions(-) diff --git a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriptionProvider.java b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriptionProvider.java index be85c8b590..ee3b3b56b4 100644 --- a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriptionProvider.java +++ b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriptionProvider.java @@ -64,8 +64,10 @@ public SpanSubscription get() { } SpanSubscription newCoreSubscriber(Tracing tracing) { - Span root = context.hasKey(Span.class) ? context.get(Span.class) : tracing.tracer().currentSpan(); - return new ScopePassingSpanSubscriber<>(this.subscriber, this.context, tracing, root); + Span root = context.hasKey(Span.class) ? context.get(Span.class) + : tracing.tracer().currentSpan(); + return new ScopePassingSpanSubscriber<>(this.subscriber, this.context, tracing, + root); } private Tracing tracing() { diff --git a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/web/client/TraceWebClientBeanPostProcessor.java b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/web/client/TraceWebClientBeanPostProcessor.java index a51daca574..c30d91e7c8 100644 --- a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/web/client/TraceWebClientBeanPostProcessor.java +++ b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/web/client/TraceWebClientBeanPostProcessor.java @@ -245,10 +245,10 @@ public void onSubscribe(Subscription subscription) { public void onNext(ClientResponse response) { done = true; try { - //decorate response body + // decorate response body this.actual.onNext(ClientResponse.from(response) .body(response.bodyToFlux(DataBuffer.class) - .transform(scopePassingTransformer)) + .transform(scopePassingTransformer)) .build()); } finally { diff --git a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/sample/FlatMapTests.java b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/sample/FlatMapTests.java index 2fce840304..52915cc41b 100644 --- a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/sample/FlatMapTests.java +++ b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/sample/FlatMapTests.java @@ -116,14 +116,15 @@ public void should_work_with_flat_maps() { thenSpanInFooHasSameTraceId(secondTraceId, config); LOGGER.info("Span in Foo has same trace id"); // and - List requestUri = Arrays - .stream(capture.toString().split("\n")) + List requestUri = Arrays.stream(capture.toString().split("\n")) .filter(s -> s.contains("Received a request to uri")) .map(s -> s.split(",")[1]).collect(Collectors.toList()); - LOGGER.info("TracingFilter should not have any trace when receiving a request " + requestUri); + LOGGER.info( + "TracingFilter should not have any trace when receiving a request " + + requestUri); then(requestUri).as( - "TracingFilter should not have any trace when receiving a request") - .containsOnly(""); + "TracingFilter should not have any trace when receiving a request") + .containsOnly(""); // and #866 then(factoryUser.wasSchedulerWrapped).isTrue(); LOGGER.info("Factory was wrapped"); diff --git a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/web/client/TraceWebAsyncClientAutoConfigurationTests.java b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/web/client/TraceWebAsyncClientAutoConfigurationTests.java index 1a06c5bb25..0022de023b 100644 --- a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/web/client/TraceWebAsyncClientAutoConfigurationTests.java +++ b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/web/client/TraceWebAsyncClientAutoConfigurationTests.java @@ -90,10 +90,10 @@ public void should_close_span_upon_success_callback() } Awaitility.await().untilAsserted(() -> { - then(this.accumulator - .getSpans().stream().filter(span -> Span.Kind.CLIENT == span.kind()) - .findFirst().get()).matches( - span -> span.duration() >= TimeUnit.MILLISECONDS.toMicros(100)); + then(this.accumulator.getSpans().stream() + .filter(span -> Span.Kind.CLIENT == span.kind()).findFirst().get()) + .matches(span -> span.duration() >= TimeUnit.MILLISECONDS + .toMicros(100)); then(this.tracer.tracer().currentSpan()).isNull(); }); } diff --git a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/web/client/integration/WebClientTests.java b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/web/client/integration/WebClientTests.java index 79b2ca4b09..4720cdd2d0 100644 --- a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/web/client/integration/WebClientTests.java +++ b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/web/client/integration/WebClientTests.java @@ -296,7 +296,8 @@ public void shouldAttachTraceIdWhenCallingAnotherServiceForNettyHttpClient() Awaitility.await().untilAsserted(() -> { then(this.tracer.currentSpan()).isNull(); System.out.println("Collected span " + this.reporter.getSpans()); - then(this.reporter.getSpans()).isNotEmpty().extracting("traceId", String.class) + then(this.reporter.getSpans()).isNotEmpty() + .extracting("traceId", String.class) .containsOnly(span.context().traceIdString()); then(this.reporter.getSpans()).extracting("kind.name").contains("CLIENT"); });