From 1a8aa9c1242fdb0ddeb18d6ea838b841e6b344ac Mon Sep 17 00:00:00 2001 From: "hash.jang" Date: Mon, 6 Sep 2021 12:08:43 +0000 Subject: [PATCH 1/4] fix #2004 --- .../sleuth/instrument/web/TraceWebFilter.java | 71 ++++++++++++++++++- 1 file changed, 69 insertions(+), 2 deletions(-) diff --git a/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFilter.java b/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFilter.java index 8237da9f26..6fdff20b72 100644 --- a/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFilter.java +++ b/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFilter.java @@ -21,8 +21,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.http.server.reactive.ServerHttpRequestDecorator; import reactor.core.CoreSubscriber; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoOperator; import reactor.util.annotation.Nullable; @@ -96,12 +100,18 @@ public TraceWebFilter(Tracer tracer, HttpServerHandler handler, CurrentTraceCont @Override public Mono filter(ServerWebExchange exchange, WebFilterChain chain) { String uri = exchange.getRequest().getPath().pathWithinApplication().value(); - Mono source = chain.filter(exchange); + ServerWebExchange finalExchange = exchange.mutate().request(new ServerHttpRequestDecorator(exchange.getRequest()) { + @Override + public Flux getBody() { + return new TracedFlux<>(super.getBody(), exchange.getAttribute(TRACE_REQUEST_ATTR), currentTraceContext()); + } + }).build(); + Mono source = chain.filter(finalExchange); boolean tracePresent = isTracePresent(); if (log.isDebugEnabled()) { log.debug("Received a request to uri [" + uri + "]"); } - return new MonoWebFilterTrace(source, exchange, tracePresent, this, spanFromContextRetriever()); + return new MonoWebFilterTrace(source, finalExchange, tracePresent, this, spanFromContextRetriever()); } private boolean isTracePresent() { @@ -143,6 +153,63 @@ private SpanFromContextRetriever spanFromContextRetriever() { return this.spanFromContextRetriever; } + private static class TracedFlux extends Flux { + private final Flux delegate; + private final Span span; + private final CurrentTraceContext currentTraceContext; + + public TracedFlux(Flux delegate, Span span, CurrentTraceContext currentTraceContext) { + this.delegate = delegate; + this.span = span; + this.currentTraceContext = currentTraceContext; + } + + @Override + public void subscribe(CoreSubscriber actual) { + delegate.subscribe(new TracedCoreSubscriber(actual, span, currentTraceContext)); + } + } + + private static class TracedCoreSubscriber implements Subscriber { + private final Subscriber delegate; + private final Span span; + private final CurrentTraceContext currentTraceContext; + + TracedCoreSubscriber(Subscriber delegate, Span span, CurrentTraceContext currentTraceContext) { + this.delegate = delegate; + this.span = span; + this.currentTraceContext = currentTraceContext; + } + + @Override + public void onSubscribe(Subscription s) { + try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(span.context())) { + delegate.onSubscribe(s); + } + } + + @Override + public void onError(Throwable t) { + try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(span.context())) { + delegate.onError(t); + } + } + + @Override + public void onComplete() { + try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(span.context())) { + delegate.onComplete(); + } + } + + @Override + public void onNext(T o) { + try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(span.context())) { + delegate.onNext(o); + } + } + } + private static class MonoWebFilterTrace extends MonoOperator implements TraceContextPropagator { final ServerWebExchange exchange; From db85fa56a4db91eb2c09b73d5c635e0b25958543 Mon Sep 17 00:00:00 2001 From: "hash.jang" Date: Thu, 9 Sep 2021 09:45:18 +0000 Subject: [PATCH 2/4] minor reformat for fixing #2004 --- .../sleuth/instrument/web/TraceWebFilter.java | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFilter.java b/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFilter.java index 6fdff20b72..2c9e636b92 100644 --- a/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFilter.java +++ b/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFilter.java @@ -183,29 +183,39 @@ private static class TracedCoreSubscriber implements Subscriber { @Override public void onSubscribe(Subscription s) { - try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(span.context())) { + executeWithinScope(() -> { delegate.onSubscribe(s); - } + }); } @Override public void onError(Throwable t) { - try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(span.context())) { + executeWithinScope(() -> { delegate.onError(t); - } + }); } @Override public void onComplete() { - try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(span.context())) { + executeWithinScope(() -> { delegate.onComplete(); - } + }); } @Override public void onNext(T o) { - try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(span.context())) { + executeWithinScope(() -> { delegate.onNext(o); + }); + } + + private void executeWithinScope(Runnable runnable) { + if (currentTraceContext.context() == null) { + try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(this.span.context())) { + runnable.run(); + } + } else { + runnable.run(); } } } From 5a5d36215366fd30e77a4dc509704e3ebd3d4d66 Mon Sep 17 00:00:00 2001 From: "hash.jang" Date: Fri, 10 Sep 2021 02:07:58 +0000 Subject: [PATCH 3/4] fix format --- .../cloud/sleuth/instrument/web/TraceWebFilter.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFilter.java b/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFilter.java index 2c9e636b92..95cf14f49c 100644 --- a/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFilter.java +++ b/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFilter.java @@ -23,8 +23,6 @@ import org.apache.commons.logging.LogFactory; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.http.server.reactive.ServerHttpRequestDecorator; import reactor.core.CoreSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -45,9 +43,11 @@ import org.springframework.cloud.sleuth.instrument.reactor.TraceContextPropagator; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.Ordered; import org.springframework.http.HttpStatus; import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpRequestDecorator; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.web.method.HandlerMethod; import org.springframework.web.reactive.HandlerMapping; @@ -158,7 +158,7 @@ private static class TracedFlux extends Flux { private final Span span; private final CurrentTraceContext currentTraceContext; - public TracedFlux(Flux delegate, Span span, CurrentTraceContext currentTraceContext) { + TracedFlux(Flux delegate, Span span, CurrentTraceContext currentTraceContext) { this.delegate = delegate; this.span = span; this.currentTraceContext = currentTraceContext; From e86347e77ee24aa2fe8ee15835cacc0a4e89897a Mon Sep 17 00:00:00 2001 From: "hash.jang" Date: Sun, 10 Oct 2021 12:31:12 +0000 Subject: [PATCH 4/4] modify import --- .../cloud/sleuth/instrument/web/TraceWebFilter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFilter.java b/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFilter.java index 95cf14f49c..00f09d17fe 100644 --- a/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFilter.java +++ b/spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFilter.java @@ -43,8 +43,8 @@ import org.springframework.cloud.sleuth.instrument.reactor.TraceContextPropagator; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; -import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.Ordered; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.HttpStatus; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequestDecorator;