-
Notifications
You must be signed in to change notification settings - Fork 38.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mechanism to access request bound objects in WebClient filter in servlet env #25710
Comments
@ttddyy, I'm wondering if you've seen the option to add attributes which a filter can then check via client.get().uri("https://example.org/")
.attribute("myAttribute", "...")
.retrieve() This can be set up at the WebClient builder level to automate the extraction of ThreadLocal context: WebClient client = this.builder
.defaultRequest(spec -> spec.attribute("foo", FooContext.get()))
.filter(filter)
.build(); Spring Security uses this and its filter exposes a Consumer that can be used to configure a WebClient builder to extract and pass the information the filter needs. I believe the mechanism is there for applications to extract and pass exactly what is needed. The mechanics are straight forward and I don't see much value and adding further conventions around that. We can certainly do better to address this need more specifically in the documentation. There is a mention of the attributes but nothing to address the use case and to tie all of the above together. |
@rstoyanchev Oh, I wasn't aware of I have simulated couple of our usecases trying with However, I found one case hard to do with Sample code: ExchangeFilterFunction filter = (request, next) -> {
log.info("filter map=" + request.attributes());
return next.exchange(request);
};
WebClient webClient = WebClient.builder().filter(filter).defaultRequest(spec -> {
spec.attribute("bar", "BAR");
spec.attribute("tid", Thread.currentThread().getId());
spec.attribute("tname", Thread.currentThread().getName());
}).build();
webClient.get().uri("http://service1.com") // first call
.attribute("foo", "FOO")
.retrieve()
.bodyToMono(String.class)
.flatMap(str -> {
return webClient.get().uri("http://service2.com") // second call
.retrieve()
.bodyToMono(String.class);
})
.subscribeOn(Schedulers.boundedElastic()) // on different thread...
.block(); When making the first call, attributes In the real environment, these two calls are service-to-service calls and when I call another service, I need to propagate some values stored in caller's thread local. (very much similar to Workaround can be, when constructing the second call, explicitly add Currently, I use subscriber context approach as mentioned in the description. (reusing Also, interesting thing is that the So, |
If FooContext.set("main-thread"); // setting to threadlocal
// a filter uses "foo" from subscriber context
ExchangeFilterFunction filter = (request, next) ->
Mono.subscriberContext()
.map(context -> (String) context.get("foo"))
.map(foo -> ClientRequest.from(request).header("foo", foo).build())
.flatMap(next::exchange);
WebClient webClient = WebClient.builder()
.filter(filter)
// populate subscriber context
.defaultContext(context -> context.hasKey("foo") ? context : context.put("foo", FooContext.get()))
.build();
webClient.get()
.uri("http://foo.com")
.retrieve()
.bodyToMono(String.class)
.flatMap(body ->
webClient.get()
.uri("http://bar.com")
.retrieve()
.bodyToMono(String.class)
)
.subscribeOn(Schedulers.boundedElastic()) // on different thread...
....
;
// both calls will have header "foo=main-thread" When nested Not sure it is even possible, but an idea. |
Thanks for the feedback. I've scheduled this since we'll update the docs in the very least. Indeed the use of attributes does not propagate to nested requests. We could explore a |
This sounds a nice solution. It may also be good to auto attach servlet request, etc to the reactor context if in servlet environment; So that, |
We can aim to make it as easy as possible but we can't make the assumption that it is always needed. |
Ok, thanks. |
On further thought a general solution for inheriting attributes from outer requests is challenging without control over how to merge outer and inner attributes. That means attributes remain scoped to a specific request only. For anything more global it would have to be the Reactor context. This is already be possible and I don't see anything further we can do in the framework. For |
I ended adding a Thanks for creating this request! |
it seem there is a problem with current approach |
@robotmrv you're right, the context can only propagate from downstream to upstream which means the new context method can only apply to the current request but that's no different from what you can already do with request attributes, except perhaps for any extra operations initiated within the filter chain that can benefit from the context. This brings it back to where it's up to the (Spring MVC) application to do something like below to ensure the context is available throughout the entire request handling chain for a given request: @GetMapping(...)
public String handle(...) {
return client.get().uri("https://example.org/")
.retrieve()
.bodyToMono(String.class)
.flatMap(body -> {
// perform nested request...
})
.contextWrite(context -> ...);
} I'm re-opening to consider further steps, possibly deprecating the new method, and documenting the above as the approach for Spring MVC applications to follow. |
@rstoyanchev Are there any plans to provide context propagation functionality in |
Hi, Can the Something like: static class ContextPropagatingSubscriber<T> implements CoreSubscriber<T> {
private final CoreSubscriber<T> delegate;
private final Context context;
public ContextPropagatingSubscriber(CoreSubscriber<T> delegate) {
this.delegate = delegate;
this.context = this.delegate.currentContext();
}
@Override
public Context currentContext() {
return this.context;
}
@Override
public void onSubscribe(Subscription s) {
this.delegate.onSubscribe(s);
}
@Override
public void onNext(T t) {
this.delegate.onNext(t);
}
@Override
public void onError(Throwable t) {
this.delegate.onError(t);
}
@Override
public void onComplete() {
this.delegate.onComplete();
}
} Function<? super Mono<String>, ? extends Publisher<Object>> transformer =
Operators.liftPublisher((pub, subscriber) ->
new ContextPropagatingSubscriber(subscriber));
webClient.get().uri("http://example.com")
.retrieve()
.bodyToMono(String.class)
.publishOn(Schedulers.boundedElastic())
.flatMap(str -> {
return Mono.deferContextual(view -> {
return webClient.get().uri("http://example.org")
.retrieve()
.bodyToMono(String.class);
});
})
.transform(transformer) // <=== so, "context" method may do this
.subscribeOn(Schedulers.boundedElastic())
.block(); Sorry, I wrote this in hurry, so didn't verify much, but this is something similar to what I do for MDC propagation. So, something like custom subscriber may work to propagate context. |
@ttddyy webClient.get().uri("http://example.com")
.retrieve()
.bodyToMono(String.class)
.publishOn(Schedulers.boundedElastic())
.flatMap(str -> {
return Mono.deferContextual(view -> {
return webClient.get().uri("http://example.org")
.retrieve()
.bodyToMono(String.class);
});
})
.transform(propagateToMono())
.subscribeOn(Schedulers.boundedElastic())
.block()
...
public static <T> Function<? super Mono<T>, ? extends Mono<T>> propagateToMono() {
return it -> it.contextWrite(enrichCtx());
}
public static <T> Function<? super Flux<T>, ? extends Flux<T>> propagateToFlux() {
return it -> it.contextWrite(enrichCtx());
}
public static Function<Context, Context> enrichCtx() {
return ctx -> ctx.put("attr1", "value1");
} |
Ah ok, I misunderstood. The usage of It is very common, in servlet environment, devs convert using In my case, I found nested webClient calls and a filter was referencing |
@robotmrv as far as I can see Microprofile Context Propagation is about passing context across a declarative chain of For the transfer of data on a blocking callstack from To automate such such a transfer, there needs to be some a place in framework code that handles the result of a reactive chain on a blocking stack. Spring MVC controller methods are one such example. We could expose an extra config option, e.g. under To take the example from my previous comment, it would become slightly simpler: @GetMapping(...)
public Mono<String> handle(...) {
return client.get().uri("https://example.org/")
.retrieve()
.bodyToMono(String.class)
.flatMap(body -> {
// perform nested request...
})
// Drop the below and declare in global config (to be applied on the return value)
// .contextWrite(context -> ...);
} |
@rstoyanchev Here is the case @GetMapping()
public Mono<String> getData() {
return webClient.get()
.uri("https://example.org/")
.retrieve()
.toBodilessEntity()
.publishOn(Schedulers.boundedElastic())
.map(data -> processDataByBlockingLibrary(data));//(1) uses ThreadLocal context,
} So there is a need of something that can transfer context data into reactive context and back to the |
I made a concrete suggestion. Please, consider it.
This part doesn't make sense in a Spring MVC controller method. When you return a |
It seems there is some misunderstanding about second part. I mean that //...
.flatMap(data -> Mono.deferContextual(contextView -> {
fillThreadLocalContext(contextView);
try {
String result = processDataByBlockingLibrary(data);//uses ThreadLocal context
return Mono.just(result);
} finally {
clearThreadLocalContext();
}
}))
.contextWrite(captureContext());//as far as I understand this is going to be moved to some global config
update:
yes |
Okay I see now, although this is a digression from the current issue which is about populating a Reactor As for blocking calls within a reactive chain, as a framework we don't know where those are and we can't incur the overhead of switching ThreadLocal values in and out at every stage just because there may or may not be a blocking call. It has to be a little more explicit and more targeted. My earlier suggestion for what we can do in Spring MVC could be extended with some convention for a special Reactor In the mean time an application can set up something like below: private static <T> Mono<T> invokeBlocking(Callable<T> callable) {
return Mono.deferContextual(view -> {
Object someAttribute = view.get(SOME_ATTRIBUTE_NAME);
// set ThreadLocal
try {
return Mono.just(callable.call());
}
catch (Exception ex) {
return Mono.error(ex);
}
finally {
// clear ThreadLocal
}
});
} and apply it manually: //...
.flatMap(data -> invokeBlocking(() -> processDataByBlockingLibrary(data))))
.contextWrite(captureContext()); This doesn't seem too different from Microprofile's Context Propagation which also enables this sort of wrapping of Callable, Runnable, and the like. |
The In the mean time the conversations here have given me ideas about improving the support in Spring MVC for transferring data from ThreadLocal's to Reactor Context and likewise to then re-establish ThreadLocal context within the reactive chain where imperative code is invoked that relies on it. That goes beyond the current issue and will be explored separately. |
Thanks, looking forward to seeing how it will look like. |
Very late to the party here @rstoyanchev. What is the status for accessing RequestContext and other thread bound data in a WebClient used in a vanilla web-mvc/non-reactive controller? |
Do we've any solution for it, i'm also having same problem. ServletRequest not able to inject in thread scope Webclient Request |
The solution is taking shape as a small, independent Context Propagation library. It happens to be under the Micrometer org because it's more generally useful, but does not depend on it. Support is being built into Reactor for version 3.5 and likewise for Spring Framework in version 6 but generally, there isn't much that we need to do in the Spring Framework, i.e. the library itself provides all the functionality and it can be used directly. |
There is now also #29056. |
In servlet environment, when migrating
RestTemplate
toWebClient
, one of the challenges people face is to access http request bound objects withinWebClient
filters(ExchangeFilterFunction
).I think this is an area currently lacking a support.
For example,
SecurityContext
,Locale/LocaleContext
(LocaleContextHolder
),HttpServletRequest
(RequestContextHolder
) and any values associated to the request(ThreadLocal
) cannot easily be retrieved inExchangeFilterFunction
.I think common solution for this is to use
Hooks
to populate the values to subscriber's context.For example, in Spring Security, here defines and registers a hook that populates
SecurityContext
.Since this mechanism also populates
HttpServletRequest
andHttpServletResponse
, I leverage it in my exchange filter functions to retrieve request bound values.I also have similar logic for MDC and
LocaleContext
.I think this mechanism should be supported in Spring Framework itself; so that, all downstream libraries and application can leverage it.
For implementation perspective, for example, define a class
ReactorContextAttribute
which is a map kept in thread local. Then, framework guarantees to populate this map in subscriber's context. So that, users or downstream libraries can simply populate this map in request thread, then retrieves values from the map in subscriber context.In addition,
FrameworkServlet
/DispatcherServlet
resolved value/context, such asLocaleContext
, can be placed in this map to make them accessible in exchange filter functions.If such mechanism exists, for example, Spring Security can simply add a logic to populate
SecurityContext
.I think this is a big missing piece to use
WebClient
in servlet environment along with supporting MDC.The text was updated successfully, but these errors were encountered: