Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Need an error handler method on Flux when processing a stream of elements #49

Closed
victor-ferrer opened this issue Mar 30, 2016 · 7 comments
Labels
for/stackoverflow Questions are best asked on SO or Gitter
Milestone

Comments

@victor-ferrer
Copy link

HI, perhaps this is already available in the Flux API but I couldn´t find and I think that could be interesting. I´d like to have a way to handle discrete errors while processing a stream of elements.

For instance, see this code:

Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers)
// Get each set of quotes in a separate thread
.flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s)))
// Convert each list of raw quotes string in a new Flux<String>
.flatMap(list -> Flux.fromIterable(list))
// Convert the string to POJOs
.flatMap(x -> {
        try {
            return Flux.just(converter.convertHistoricalCSVToStockQuotation(x));    
        }
        catch (IllegalArgumentException ex){
            System.out.println("Error decoding stock quotation: " + x);
            return Flux.empty();
        }
});

I need to write this bulky try {} catch{} block to handle single errors and continue processing the rest of the flux. The methods I have seen so far allow me to return a specific value or to restart processing the flux from the beginning.

See this Stackoverflow question:
http://stackoverflow.com/questions/36237230/how-to-handle-error-while-executing-flux-map

As "akarnokd" comments, errors are terminal signals but perhaps this new method would help people developing with Reactor.

Thanks!
Victor

@smaldini smaldini added the for/stackoverflow Questions are best asked on SO or Gitter label Apr 4, 2016
@smaldini
Copy link
Contributor

smaldini commented Apr 4, 2016

Hey Victor, I just read your blog post it was a very informative one, well done !

Let's speak about patterns. As you highlighted, error is a terminal event but there are a few ways to deal with that and it goes down to subscription lifecycle.

First , fromIterable is what we consider a source, there's no parent, and its content is actually replayable since mapped from memory (Iterable) so we call it "cold" source.

The Publisher chain created by calling various operators is immutable and refer back to that fromIterable Source. When subscribe() is eventually called, it navigates back to that source to create operator instance that serves as event receiver/emitter Subscriber and Subscription request tracker. When there's no more parent, these Subscriber are being called via onSubscribe from fromIterable to the right. Their unique state is created, that state can be terminated only once.

The role of the operators like retry() is to use the immutable parent reference and a given condition to 're-subscribe' (after cleaning up its own operator state) to the parent or a given fallback sequence if we look at onErrorReturn etc.
So in your example we will restart from the beginning, 1, 2 and error again.

An alternative to the flatMap solution is to disable that "restarting" behavior and make the sequence definitive and not replayable. The operators publish and multicast or even subscribe turn any Flux or Mono into a unique sequence that will be the new point of reference for late subscribers.

 Flux.fromIterable(tickers)
     .publish()  // Turn source into hot Publisher
     .autoConnect() // Instructs the hot Publisher to start when at least one `Subscriber` subscribes
     .flatMap() //Business logic that might fail
     .retry(3) //retry on any error up to 3 times
     .consume(System.out::println)

Error mid flow (inside onSubscribe/onNext) will call cancel() upstream so your publish() operator right after your synchronous failure downstream and will automatically stop publishing event until retry() calls again subscribe in its resubscribing lifecycle.

@smaldini
Copy link
Contributor

smaldini commented Apr 4, 2016

Consider also frequently failing (and latency impacting) business logic to be wrapped as Mono.fromCallable as you don't have to catch the exception, the Callable throws. Then a simple flatMap/then can do the trick of protecting these individual timelines and merging back into the main timeline.

Basically creating new Publisher for failing or impacting logic is advised as you will get to isolate by design the sequences, which map well with a sequence diagram btw.

@victor-ferrer
Copy link
Author

Hi @smaldini,

Thanks for your answer. I´m still struggling to understand the whole model involved here but I supposed that the Mono.fromCallable + flatmap is enough for the moment.
The idea I had when I opened this ticket, was not only about encapsulating that Mono.fromCallable + flatmap in a method, but perhaps also allowing the user to specify a Dead Letter Flux where the errors could be sent to be processed.
Could that be useful?

@smaldini smaldini added this to the 2.5.0.M4 milestone Apr 19, 2016
@smaldini
Copy link
Contributor

Related to #73

@smaldini
Copy link
Contributor

related to #80

@dfeist
Copy link
Contributor

dfeist commented Jun 1, 2016

@smaldini I think this is more related to #89 than #80. The proposal in #80 is not to handle the errors but just to hide the try/catch in a utility function.

What both me and @victor-ferrer are trying to do is handle an exception thrown within an operator before it calls next.onError(). I want to wrap the exception in #89 where as victor wants to be able to drop the current event, (which is actually almost identical to my use case in #74).

Using a flatMap() as we both are currently is definitely a valid approach to tackling these use cases, i'm just not convinced it's the best long-term approach, especially given the complexity/overhead the use of flatMap adds to the constructed chain if this needs to be done many times for different components/operators.

@smaldini
Copy link
Contributor

Considering close since Flux#handle and extra Hooks features

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for/stackoverflow Questions are best asked on SO or Gitter
Projects
None yet
Development

No branches or pull requests

3 participants