Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can make ParalleFlux.subscribe(CoreSubscriber<? super T>[]) public? #2328

Closed
minwoox opened this issue Aug 14, 2020 · 3 comments
Closed

Can make ParalleFlux.subscribe(CoreSubscriber<? super T>[]) public? #2328

minwoox opened this issue Aug 14, 2020 · 3 comments
Labels
type/enhancement A general enhancement
Milestone

Comments

@minwoox
Copy link

minwoox commented Aug 14, 2020

Motivation

Hello! 馃槃
I'm making hooks that propagate thread-local contexts in our framework during the Reactor operations.
We already have a one for RxJava, but not for Reactor.
I have almost implemented the hooks for Flux and Mono, but I couldn't do it for ParallelFlux.
It's because I couldn't just delegate this method to the original ParallelFlux.

protected abstract void subscribe(CoreSubscriber<? super T>[] subscribers);

https://github.com/reactor/reactor-core/blob/v3.4.0-M2/reactor-core/src/main/java/reactor/core/publisher/ParallelFlux.java#L954
Is there any chance that we can make this method as public, please?

Desired solution

Make ParalleFlux.subscribe(CoreSubscriber<? super T>[]) public.

@reactorbot reactorbot added the 鉂搉eed-triage This issue needs triage, hasn't been looked at by a team member yet label Aug 14, 2020
@simonbasle
Copy link
Member

It is protected though. Couldn't you create a wrapper by inheriting from ParallelFlux?

@minwoox
Copy link
Author

minwoox commented Aug 18, 2020

Thanks for the reply!
Yeah, I'm trying to make a wrapper:

class ParallelFluxWrapper extends ParallelFlux<Object> {

    private final ParallelFlux<Object> source;

    ContextAwareParallelFlux(ParallelFlux<Object> source, ...) {
        this.source = source;
        ...
    }

    ...

    @Override
    protected void subscribe(CoreSubscriber<? super Object>[] subscribers) {
        // Can't just do this because the method is not public.
        source.subscribe(subscribers);
    }
}

However, I can't just delegate subscribe(...) call to the original ParallelFlux<Object> because it's protected.
Also, it's hard for me to just implement the method since there are several ParallelFlux implementations.
So I thought changing the modifier of the method was the best option. 馃槃

minwoox added a commit to minwoox/armeria that referenced this issue Aug 18, 2020
Motivation:
We have hooks for `RxJava` which automatically propagate `RequestContext`.
But we don't have one for Reactor.

Modifications:
- Add `RequestContextHooks.enable()` to enable propagating `RequestContext` automatically.

Result:
- You can now enable automatic `RequestContext` propatation during Reactor operations.

To-do:
- `ParallelFlux` is not supported at the moment. See reactor/reactor-core#2328
minwoox added a commit to minwoox/armeria that referenced this issue Aug 18, 2020
Motivation:
We have hooks for `RxJava` which automatically propagate `RequestContext`.
But we don't have one for Reactor.

Modifications:
- Add `RequestContextHooks.enable()` to enable propagating `RequestContext` automatically.

Result:
- You can now enable automatic `RequestContext` propatation during Reactor operations.

To-do:
- `ParallelFlux` is not supported at the moment. See reactor/reactor-core#2328
trustin pushed a commit to line/armeria that referenced this issue Aug 19, 2020
Motivation:
We have hooks for `RxJava` which automatically propagates `RequestContext`.
But we don't have one for Reactor.

Modifications:
- Add `RequestContextHooks.enable()` to enable propagating `RequestContext` automatically.
- Reactor 3.3.8.RELEASE -> 3.3.9.RELEASE

Result:
- You can now enable automatic `RequestContext` propagation during Reactor operations.

To-do:
- `ParallelFlux` is not supported at the moment. See reactor/reactor-core#2328
@simonbasle simonbasle added type/enhancement A general enhancement and removed 鉂搉eed-triage This issue needs triage, hasn't been looked at by a team member yet labels Sep 3, 2020
@simonbasle simonbasle added this to the 3.4.0-RC1 milestone Sep 3, 2020
simonbasle added a commit that referenced this issue Sep 3, 2020
This commit switches the `subscribe(CoreSubscriber[])` method from
protected to public, in order to allow overloading methods to delegate
to any existing `ParallelFlux`.
@minwoox
Copy link
Author

minwoox commented Sep 3, 2020

Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

3 participants