Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark FluxProcessor and MonoProcessor for deprecation, remove fromSink, document alternatives #2431

Closed
2 of 4 tasks
simonbasle opened this issue Oct 9, 2020 · 2 comments
Closed
2 of 4 tasks
Labels
type/documentation A documentation update type/enhancement A general enhancement warn/api-change Breaking change with compilation errors
Milestone

Comments

@simonbasle
Copy link
Member

simonbasle commented Oct 9, 2020

  • Remove FluxProcessor#fromSink and MonoProcessor#fromSink (and underlying adapters)
  • Mark FluxProcessor and MonoProcessor as deprecated themselves (see Do we even need multiple Processor implementations ? Discussing sinks again #2179)
  • Document typical usage patterns of MonoProcessor and how to replace them in a wiki page
  • Reference the wiki page and provide guidance on how to raise an issue if a case is encountered where there is now way around using MonoProcessor

Motivation

Desired solution

Considered alternatives

Additional context

@simonbasle simonbasle added type/enhancement A general enhancement warn/api-change Breaking change with compilation errors type/documentation A documentation update labels Oct 9, 2020
@simonbasle simonbasle added this to the 3.4.0-RC2 milestone Oct 9, 2020
@simonbasle
Copy link
Member Author

simonbasle commented Oct 9, 2020

If you come to this issue because you noticed a usage of FluxProcessor or MonoProcessor, which are deprecated, please ensure you try these in order:

  1. can you simplify the code so that a processor isn't needed anymore? (see Mark FluxProcessor and MonoProcessor for deprecation, remove fromSink, document alternatives #2431 (comment) below)
  2. if 100% sure you absolutely need a Processor, please raise an issue and in the meantime use the deprecated processor APIs directly

@simonbasle
Copy link
Member Author

Temporary notes on migrating, pending a more streamlined documentation (tbd, probably a wiki page)

Here are quick notes on MonoProcessor usage patterns we've encountered and hints at how to replace them:

as a "flip switch" used by a third-party to (programmatically) mark a completion, consumed as a Mono<Void>

Where MonoProcessor<Void> processor is instantiated and passed as a Mono<Void> to a third party, and then the code calls processor.onComplete().

Sinks.Empty was made for this:

  • instantiate a Sinks.Empty: Sinks.Empty<Void> sink = Sinks.empty();
  • flip the switch by calling tryEmitEmpty() (think about the possible EmitResult failures there, but chances are that with a Sinks.Empty, even concurrent attempts at completing are ok, which might mean it is safe to explicitly ignore)
  • pass the switch to downstream component using sink.asMono()

subscribing to a source (Processor usage)

In that case, the goal is more to drop all the bells and whistles (and limitations) of the FluxProcessor API and fall back to the pure definition of a Processor: something that is both a Publisher and a Subscriber. But if you don't really need to expose the Processor to clients of your library, it is fine to use a sink, expose its asFlux()/asMono() view to clients and automatically feed it by bridging with .subscribe(v -> {...}, e -> {...}, () -> {...}) (the lambda-based subscribe).

If the same sink is only used for a single subscription, then tryEmitXxx API can be used in a simplified manner since we're within RS spec serialization guarantees.

More specifically, for Sinks.Many<T>:

upstream.subscribe(
  //you could use `sink::tryEmitNext` below, but it can return FAIL_CANCELLED
  //emitNext has the benefit of discarding the `v` in that case, but with `tryEmitNext`
  //you could check the result and explicitly dispose the value if that is relevant
  v -> sink.emitNext(v, FAIL_FAST), 
  e -> sink.tryEmitError,
  //ok not to check return value below, only because we're inside `subscribe`.
  //consider making that fact explicit in your codebase with a comment
  sink::tryEmitComplete 
);

For Sinks.One<T>:

upstream.subscribe(
  //you could use `sink::tryEmitValue` below, but it can return FAIL_CANCELLED
  //emitValue has the benefit of discarding the `v` in that case, but with `tryEmitValue`
  //you could check the result and explicitly dispose the value if that is relevant
  v -> sink.emitValue(v, FAIL_FAST), 
  e -> sink.tryEmitError,
  //ok not to check return value below, only because we're inside `subscribe`.
  //even if we triggered `emitValue` above (which implies an onComplete), this
  //would just return `EmitResult.TERMINATED` which can also be ignored here.
  //consider making that fact explicit in your codebase with a comment though
  sink::tryEmitEmpty 
);

And for Sinks.Empty<Void>:

upstream.subscribe(
  //that one is tricky. `Sinks.Empty` cannot deal with a value, so be careful
  //not to subscribe to something that can produce `onNext`. Typically a `Publisher<Void>`.
  v -> {}, //consider discarding any unexpected `v` here if relevant 
  e -> sink.tryEmitError,
  //ok not to check return value below, only because we're inside `subscribe`
  //and the source is a Publisher<Void>: no onNext, RS guarantees no competing signals
  //like onError, and even FAIL_CANCELLED can be ignored since we're not dealing with any value...
  //consider making that fact explicit in your codebase with a comment though
  sink::tryEmitEmpty 
);

as a way to circumvent block() restriction when you know the mono will resolve without latency

⚠️ This one should be only used when you absolutely know what you are doing! Don't use it as an excuse to liberally block.

The pattern is this: MonoProcessor is used to subscribe to a source, and then MonoProcessor#peek() is used to retrieve the value pushed by the source (that is expected to be instantaneously populated into the processor) or throw in case of an error.

The most straightforward way to emulate that is actually to just convert the source to a CompletableFuture using toFuture(), and then get() or getNow(). Beware, Blockhound might bark at that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/documentation A documentation update type/enhancement A general enhancement warn/api-change Breaking change with compilation errors
Projects
None yet
Development

No branches or pull requests

1 participant