Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoOperator;
import reactor.util.annotation.Nullable;
Expand All @@ -42,8 +44,10 @@
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.reactive.HandlerMapping;
Expand Down Expand Up @@ -96,12 +100,18 @@ public TraceWebFilter(Tracer tracer, HttpServerHandler handler, CurrentTraceCont
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
String uri = exchange.getRequest().getPath().pathWithinApplication().value();
Mono<Void> source = chain.filter(exchange);
ServerWebExchange finalExchange = exchange.mutate().request(new ServerHttpRequestDecorator(exchange.getRequest()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain what exactly you're trying to achieve here?

@Override
public Flux<DataBuffer> getBody() {
return new TracedFlux<>(super.getBody(), exchange.getAttribute(TRACE_REQUEST_ATTR), currentTraceContext());
}
}).build();
Mono<Void> source = chain.filter(finalExchange);
boolean tracePresent = isTracePresent();
if (log.isDebugEnabled()) {
log.debug("Received a request to uri [" + uri + "]");
}
return new MonoWebFilterTrace(source, exchange, tracePresent, this, spanFromContextRetriever());
return new MonoWebFilterTrace(source, finalExchange, tracePresent, this, spanFromContextRetriever());
}

private boolean isTracePresent() {
Expand Down Expand Up @@ -143,6 +153,73 @@ private SpanFromContextRetriever spanFromContextRetriever() {
return this.spanFromContextRetriever;
}

private static class TracedFlux<T> extends Flux<T> {
private final Flux<T> delegate;
private final Span span;
private final CurrentTraceContext currentTraceContext;

TracedFlux(Flux<T> delegate, Span span, CurrentTraceContext currentTraceContext) {
this.delegate = delegate;
this.span = span;
this.currentTraceContext = currentTraceContext;
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
delegate.subscribe(new TracedCoreSubscriber(actual, span, currentTraceContext));
}
}

private static class TracedCoreSubscriber<T> implements Subscriber<T> {
private final Subscriber<T> delegate;
private final Span span;
private final CurrentTraceContext currentTraceContext;

TracedCoreSubscriber(Subscriber<T> delegate, Span span, CurrentTraceContext currentTraceContext) {
this.delegate = delegate;
this.span = span;
this.currentTraceContext = currentTraceContext;
}

@Override
public void onSubscribe(Subscription s) {
executeWithinScope(() -> {
delegate.onSubscribe(s);
});
}

@Override
public void onError(Throwable t) {
executeWithinScope(() -> {
delegate.onError(t);
});
}

@Override
public void onComplete() {
executeWithinScope(() -> {
delegate.onComplete();
});
}

@Override
public void onNext(T o) {
executeWithinScope(() -> {
delegate.onNext(o);
});
}

private void executeWithinScope(Runnable runnable) {
if (currentTraceContext.context() == null) {
try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(this.span.context())) {
runnable.run();
}
} else {
runnable.run();
}
}
}

private static class MonoWebFilterTrace extends MonoOperator<Void, Void> implements TraceContextPropagator {

final ServerWebExchange exchange;
Expand Down