Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Questions on API contract regarding onComplete/onError #52

Closed
alexandru opened this issue May 17, 2014 · 51 comments
Closed

Questions on API contract regarding onComplete/onError #52

alexandru opened this issue May 17, 2014 · 51 comments

Comments

@alexandru
Copy link

Unfortunately the specification is not up-to-date, I haven't been following the whole discussion and the examples given are incomplete, so in trying to implement simple publishers / subscribers, the following questions came to mind:

  1. can a Publisher emit onComplete / onError without being asked ... as in, when the Subscriber does Subscription.request(n), does this n count include the ensuing onComplete / onError?

Example:

// given a subscriber that does this:
subscription.request(1);

// can a publisher do this in response to the above or does it have 
// to wait for another request(1) for triggering onComplete?
subscriber.onNext(something);
subscriber.onComplete();
  1. Error handling - I don't remember too many mentions on how errors should be handled, so what happens if an onNext(elem) fails? Should the publisher's logic catch it and trigger an onError(ex) event? In the Rx design guidelines, this is said to trigger weird effects.
@jrudolph
Copy link

  1. can a Publisher emit onComplete / onError without being asked

Yes. The publisher can even call one of those methods instead of onSubscribe.

so what happens if an onNext(elem) fails

This is a good question. The original idea was that onNext just schedules work for processing and then returns. In this spirit, processing of Publisher and Subscriber was thought to be decoupled as much as possible so that throwing exceptions from onNext should probably be discouraged. This question should maybe be reconsidered with the outcome of #46 in mind.

@alexandru
Copy link
Author

The original idea was that onNext just schedules work for processing and then returns. In this spirit, processing of Publisher and Subscriber was thought to be decoupled as much as possible so that throwing exceptions from onNext should probably be discouraged. This question should maybe be reconsidered with the outcome of #46 in mind.

Even if onNext just schedules work to be done asynchronously, thus not affecting the publisher, the system as a whole can still be affected if the subscription is not canceled, since in the asynchronous case the Publisher won't be aware about it and the Subscriber won't receive an onError notice about it. In such a case I would rather have the Publisher's stack blow up, then the stack of a thread somewhere in a thread-pool far away.

In the Rx guidelines, section 6.4 says "protect calls to user code from within an operator" - this means that if you pass callbacks triggering exceptions to one of those combinators, then the exception is piped to onError. But on the other hand it says "do not protect calls to Subscribe, Dispose, OnNext, OnError and OnCompleted methods. These calls are on the edge of the monad. Calling the OnError method from these places will lead to unexpected behavior". In most cases subscriptions are also canceled automatically on unexpected exceptions from user code.

I have yet to understand the weird effects they are speaking about when protecting onNext. On the other hand this is close to sane - user code must be protected and Rx observables and observers must be well behaved and not throw.

@rkuhn
Copy link
Member

rkuhn commented May 19, 2014

The current spec mandates that the Subscriber cancels the subscription if it fails, which works in a local (non-distributed) context, and it says that failures within the Subscriber shall not be thrown from callbacks (we will probably qualify this to exclude fatal errors that should terminate the VM).

Your quote from the Rx docs is probably about catching exceptions thrown from onError and feeding them back into it, which obviously is not a good idea. A Publisher should only signal its own failures downstream (where the normal response to receiving an onError itself is to fail with the same exception).

We’ll keep this issue open until after the current changes are done, so that the end result will clarify this point.

@jrudolph
Copy link

it says that failures within the Subscriber shall not be thrown from callbacks

Whatever the "current spec" is I didn't found anything like this in all the versions I looked through :), so we really shouldn't forget about that in the next version.

@rkuhn
Copy link
Member

rkuhn commented May 19, 2014

You are right, it was only mentioned in discussions so far, thanks for the correction!

@benjchristensen
Copy link
Contributor

I have yet to understand the weird effects they are speaking about when protecting onNext. On the other hand this is close to sane - user code must be protected and Rx observables and observers must be well behaved and not throw.

If an onNext throws an Exception it is breaking the contract in Rx, but it can happen. Users can write buggy code or JVM-level errors can occur. Thus, an Rx implementation must account for "out of contract" behavior.

There are a few places in RxJava where these are caught and handled, and I imagine any reactive-stream implementation will have to do similar things, otherwise "weird effects" can occur, such as Subscribers never receiving a terminal event and hanging a system.

Here are places where error handling for "out of contract" errors are being handled if you're interested:

And there are some exceptions that we want to throw: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/exceptions/Exceptions.java#L44

@benjchristensen
Copy link
Contributor

In the updated contract (proposed at #55) it has these about errors:

  • A Publisher MAY send less events than requested and terminate the Subscription by calling onComplete or onError.
  • If a Publisher fails it MUST emit an onError.
  • A Publisher SHOULD NOT throw an Exception. The only legal way to signal failure (or reject a Subscription) is via the Subscriber.onError method.

We do not however have anything telling Subscriber that it can't throw.

Perhaps we should add something like:

  • A Subscriber MUST NOT throw an Exception from any of its methods (onNext, onError, onComplete, onSubscribe).

An implementation however will still have to assume exceptions can and will be thrown, because bugs and JVM errors happen.

@rkuhn
Copy link
Member

rkuhn commented May 19, 2014

The VM can of course throw Error whenever it feels like it must shut itself down, but we can mandate that implementations of the Subscriber methods catch everything that is non-fatal. Fatal errors will in general leave the system in an unusable state, which is closely related to why we should not mandate that these cases also lead to onError if they appear in a Publisher.

Proposal:

  • A Subscriber MOST NOT throw non-fatal exceptions from any of its methods; fatal exceptions are VirtualMachineError, ThreadDeath, InterruptedException and LinkageError.

@jbrisbin
Copy link

Maybe it should read "fatal exceptions include" rather than "are"? There might be other Throwables or checked exceptions that could be used as "fatal" exceptions designed to propagate up the stack.

@rkuhn
Copy link
Member

rkuhn commented May 19, 2014

The idea is that this list is exhaustive: in Scala and Akka we have been using (and refining) it for a couple of years now, the status quo is here. VM Errors are those you are referring to. We used to exclude StackOverflowError because intuitively it should be recoverable at a stack frame “higher up”, but it turns out that this error may lead to finally clauses not being run (and locks therefore not release).

@jbrisbin
Copy link

I understand the intent of making an exhaustive list but I go back to my original cross-language/cross-platform argument in #46. We can't know all the kinds of errors considered "fatal" in all possible implementations of Reactive Streams so it seems like we should be giving guidance on what we consider fatal on one specific platform. In JavaScript, an Error is an Error is an Error so it doesn't make a lot of sense to try and segregate errors by type. IMO it's sufficient to use the term "fatal" to mean different things to different implementors with the intent being made clear by the examples of JVM errors given.

@jbrisbin
Copy link

So, more accurately, it might read:

  • A Subscriber MUST NOT throw non-fatal exceptions from any of its methods; examples of fatal exceptions in the Java Virtual Machine include VirtualMachineError, ThreadDeath, InterruptedException and LinkageError.

@benjchristensen
Copy link
Contributor

A Subscriber MUST NOT throw non-fatal exceptions from any of its methods; examples of fatal exceptions in the Java Virtual Machine include VirtualMachineError, ThreadDeath, InterruptedException and LinkageError.
I like this.

@rkuhn
Copy link
Member

rkuhn commented May 19, 2014

Ah, of course: what I meant was JVM-specific (and exhaustive in that case), specifications for other platforms will fill this hole in their own fashion. I was assuming JVM for everything we currently do, since we use Java interfaces and language semantics to describe it.

@benjchristensen
Copy link
Contributor

Those 4 should not be considered exhaustive even for the JVM.

StackOverflow is fatal, OOM generally is and different libraries may have others they need to throw. For example, RxJava has OnErrorNotImplemented if the user doesn't provide an error handler and in that case all that can be done is treat it as a fatal error (and user bug).

@rkuhn
Copy link
Member

rkuhn commented May 19, 2014

SO and OOME are subclasses of VMError, so from that side we got it covered. And LinkageError covers all the usual class loading errors.

If it is an error to not implement onError then would it not make sense to make that method abstract? (leading to the fatal NoSuchMethodError if someone sneaks in illegal byte-code)

@benjchristensen
Copy link
Contributor

SO and OOME are subclasses of VMError,

Yup, good point :-) I obviously don't look at the inheritance path for VMError very often!

If it is an error to not implement onError then would it not make sense to make that method abstract?

It is required if someone is implementing the class directly, as it's a method on the interface. However, in Rx where there are multiple ways of handling errors, it's legit to write code like this:

source.onErrorResumeNext(function).subscribe(onNextHandler);

In that case, if they didn't include the onErrorResumeNext and allowed errors to hit the final Observer which only has an onNext handler, then it is fatal, otherwise the errors will just silently be swallowed.

// this will blow up with OnErrorNotImplemented if an onError occurs
source.subscribe(onNextHandler);

// this correctly handles the error
source.subscribe(onNextHandler, errorHandler);

Thus, in RxJava it is possible to write code that results in a fatal OnErrorNotImplemented exception and we'd rather not make it extend from VMError or java.lang.Error as it is not from the VM.

Also, a library is very tedious and error-prone if every onNext/onError/etc must have code like this:

public void onNext(T t) {
 try {
    doStuff(t);
 }catch(Throwable e) {
   if(isFatal(e)) {
      throw e;
   } else {
     try {
         onError(e);
      } catch(Throwable e) {
          // all we can do is throw since onError blew up
          throw e;
       }
   }
 }
}

Thus, for practical reasons libraries will typically want to handle errors when thrown from user code. Overall the library would generally comply with the spec, but any given Subscriber may not due to bugs, lazy programmers, etc.

If an onNext throws an Exception, a library can catch it and propagate to onError. If an onError throws an Exception there isn't much else a library can do but fatally fail.

What would you recommend a library implementation do if the onError breaks the contract and throws an Exception? We can't specify away bugs like this.

@viktorklang
Copy link
Contributor

@benjchristensen

"We can't specify away bugs like this."

We can specify that the behavior is undefined for implementations that violate it. So, "You're free to call System.exit(1), throw new ThreadDeath() and , but you can't expect anything to function properly afterward"

@benjchristensen
Copy link
Contributor

You didn't answer my question :-)

What would you recommend a library implementation do if the onError breaks the contract and throws an Exception?

@viktorklang
Copy link
Contributor

@benjchristensen I like fail-fast approaches so I'd recommend doing that. What do you think about that?

@benjchristensen
Copy link
Contributor

you can't expect anything to function properly afterward

A user making a mistake should not cause an entire system to stop functioning properly if it's within the capability of the library to be resilient. I suppose however that a library implementation can handle that without it being part of the spec, but if the spec is draconian and not representing reality it would feel like a library is going against the spec to handle errors.

I like fail-fast approaches so I'd recommend doing that.

Agreed. That is why I'm recommending the spec allow use of a fatal exception defined by the library. @jbrisbin's wording keeps the door open for out-of-contract scenarios that need to fast-fail:

  • A Subscriber MUST NOT throw non-fatal exceptions from any of its methods; examples of fatal exceptions in the Java Virtual Machine include VirtualMachineError, ThreadDeath, InterruptedException and LinkageError.

In practice, libraries will need to assume exceptions being thrown because users are not going to write bug-free code and we want resilient systems.

We could incorporate that reality via additions like this:

  • It is RECOMMENDED that a Publisher protect against errors being thrown by a Subscriber (ie. from onNext, onError, onComplete, onSubscribe).
    • If a Subscriber throws an exception from onError the Publisher SHOULD throw a fatal exception and fast-fail.
    • If an error is thrown from an onNext, onComplete or onSubscribe the Publisher SHOULD propagate it to onError.

@viktorklang
Copy link
Contributor

@alexandru "If an error is thrown from an onNext, onComplete or onSubscribe the Publisher SHOULD propagate it to onError."

If the Subscriber has violated the specification by throwing an exception where none is allowed, one cannot assume that the Subscriber is in a consistent state anymore, so the only course of action is to log the throwable (if non-fatal) and either A) exit the Publisher itself or if sensible B) dropping the Subscription to the Subscriber and continue or if fatal, exit the program.
Well, that or not catching the exception and hope that the UnhandledExceptionHandler for the current thread does the right thing (which it should, but rarely does).

@benjchristensen
Copy link
Contributor

I disagree, as 'onError' is how users expect to receive errors and most issues are simple bugs like NPEs, but this is not a big deal to debate. Libraries will choose to do what they wish in out-of-contract scenarios.

@viktorklang
Copy link
Contributor

Calling onError (again) after onError fails would be a spec violation
(calling onError multiple times). Calling onError after onComplete fails
would be the same.
Which leaves onSubscribe and onNext, in which case it would be fine, but it
would be a hacky and partial solution so I would recommend to have a single
way of dealing with it so that it can be reasoned about.
I.e. "This is what happens if something fail to follow the spec."

Thoughts?
On May 20, 2014 1:14 AM, "Ben Christensen" notifications@github.com wrote:

I disagree, as 'onError' is how users expect to receive errors and most
issues are simple bugs like NPEs, but this is not a big deal to debate.
Libraries will choose to do what they wish in out-of-contract scenarios.


Reply to this email directly or view it on GitHubhttps://github.com//issues/52#issuecomment-43569830
.

@benjchristensen
Copy link
Contributor

a single way of dealing with it so that it can be reasoned about.

And that single way is blowing up? What is the prescribed way of blowing up?

@viktorklang
Copy link
Contributor

Do we agree that calling onError after onError or onComplete throws a
non-fatal exception would currently be a spec violation in itself?

If logging it is the recommendation, and the logging lib throws an
exception, I'd say just rethrow and let the UEH deal with it.
Having to assume Byzantine code in the same VM is not feasible as it could
do reflection magic, exit the VM, generate invalid bytecode, deadlock, get
stuck in an endless loop or otherwise break things undetectibly from the
Publishers point of view. The simplest and most general solution is always
letting it bubble up to the UEH and let the person responsible for the
execution know what should be done in case of invalid implementations.

My 2c,
V
On May 20, 2014 10:31 AM, "Ben Christensen" notifications@github.com
wrote:

a single way of dealing with it so that it can be reasoned about.

And that single way is blowing up? What is the prescribed way of blowing
up?


Reply to this email directly or view it on GitHubhttps://github.com//issues/52#issuecomment-43599285
.

@viktorklang
Copy link
Contributor

Another big benefit of this approach is that it is symmetric no matter if
the Subscriber is sync or async.
On May 20, 2014 11:37 AM, "√iktor Ҡlang" viktor.klang@gmail.com wrote:

Do we agree that calling onError after onError or onComplete throws a
non-fatal exception would currently be a spec violation in itself?

If logging it is the recommendation, and the logging lib throws an
exception, I'd say just rethrow and let the UEH deal with it.
Having to assume Byzantine code in the same VM is not feasible as it could
do reflection magic, exit the VM, generate invalid bytecode, deadlock, get
stuck in an endless loop or otherwise break things undetectibly from the
Publishers point of view. The simplest and most general solution is always
letting it bubble up to the UEH and let the person responsible for the
execution know what should be done in case of invalid implementations.

My 2c,
V
On May 20, 2014 10:31 AM, "Ben Christensen" notifications@github.com
wrote:

a single way of dealing with it so that it can be reasoned about.

And that single way is blowing up? What is the prescribed way of blowing
up?


Reply to this email directly or view it on GitHubhttps://github.com//issues/52#issuecomment-43599285
.

@alexandru
Copy link
Author

Thank you for the clarifications.

In practice, libraries will need to assume exceptions being thrown because users are not going to write bug-free code and we want resilient systems.

I have this world view that exceptions are either return types waiting to be documented in a type (e.g. Try[T] from Scala) or errors that must be fixed. My only problem is that with asynchronicity an exception that blows up a thread somewhere might not get noticed by anybody. This would happen with Future[T] in Scala for example, which is why the ExecutionContext implementation does an ex.printStackTrace() by default, in case there's no other handler specified for catching it. This simple print to stdout saved me countless of times in noticing problems and fixing them. Maybe the Publisher should catch errors from onError and do a printStackTrace() before rethrowing it.

@benjchristensen
Copy link
Contributor

Do we agree that calling onError after onError or onComplete throws a
non-fatal exception would currently be a spec violation in itself?

Calling onError after onError throws makes no sense, as it just failed. Calling onError after onComplete throws makes good sense, as onError is the equivalent of catch in an async system and is where errors are expected to propagate to.

The only time an exception should be thrown is when onError itself throws, in which case it must be treated as a fatal failure.

If logging it is the recommendation

I don't like logging as that means we are making a decision on behalf of the user that impacts IO. This kind of logging brings a system to its knees at the very worst time - when things are failing.

Maybe the Publisher should catch errors from onError and do a printStackTrace() before rethrowing it.

No, this is not desirable as we are causing IO on behalf of the user, and a synchronized IO at that. Also, stack traces are not always very helpful in an async system as it can point to one piece of a stream on a thread but the user may have no clue what the final subscriber was, this is why it's so helpful to catch errors and propagate to the final onError of the subscriber.

Having to assume Byzantine code in the same VM is not feasible as it could
do reflection magic, exit the VM, generate invalid bytecode, deadlock, get
stuck in an endless loop or otherwise break things undetectibly from the
Publishers point of view.

@viktorklang Can you elaborate on what you mean here?

Another big benefit of this approach is that it is symmetric no matter if
the Subscriber is sync or async.

Exception throwing does not semantically behave the same in these two cases (from the perspective of the user/developer/application) so being symmetric in how errors are handled is not helpful. If it was symmetric in behavior we wouldn't have the onError method and would just throw.

The simplest and most general solution is always
letting it bubble up to the UEH and let the person responsible for the
execution know what should be done in case of invalid implementations.

Agreed, but through 2 years of feedback from people filing bugs on Github and my own production experience in handling errors (and losing them) in async streams, leaving this to naturally occur in an async system leads to bad results like hung user requests, infinite loops, silent swallowing of errors etc.

Letting exceptions throw in an async system is generally far worse than catching and routing them via the onError pipeline because of two things:

  1. they throw in completely random places without any try/catch/finally to allow cleanup/shutdown
  2. because of the first issue they prevent a terminal notification to the subscribers which means they can end up waiting indefinitely (hanging user requests and other such things).

Because of this, a library implementation will generally want to program defensively and do their best to get exceptions captured and propagated (including across async boundaries) using the onError pipeline, regardless of whether the exception came "out of contract" (don't most errors?). This then leaves the very far edge case of when onError itself blows up, and I'm suggesting we leave the freedom for library implementations to handle these as needed and not specify what they can't do.

For these reasons I recommend we adopt phrasing that states that the contract is to not throw exceptions from the Subscriber methods, but because it will happen, that a library is given the freedom to handle these "out of contract" errors in whatever way makes sense for it.

I really think the Reactive Streams spec needs to allow libraries to choose how they will deal with "out of contract" error handling as it is the reality of code that exceptions will be thrown, even if we say they preferably shouldn't be.

@viktorklang
Copy link
Contributor

On Tue, May 20, 2014 at 6:07 PM, Ben Christensen
notifications@github.comwrote:

Do we agree that calling onError after onError or onComplete throws a
non-fatal exception would currently be a spec violation in itself?

Calling onError after onError throws makes no sense, as it just failed.
Calling onError after onComplete throws makes good sense, as onError is
the equivalent of catch in an async system and is where errors are
expected to propagate to.

The only time an exception should be thrown is when onError itself
throws, in which case it must be treated as a fatal failure.

onError, to me, is used to signal when the -upstream- completes in an
erronous way, not when the Subscriber itself throws an exception.
If this is not the case, could you point me to the part of the spec that
clarifies this?

Also, when the Subscriber methods are async, their exceptions won't be
propagating to the Publisher, leading to an inconsistency in this
reasoning: Who calls onError when the onX methods throw exceptions (that
they aren't allowed to).

If logging it is the recommendation

I don't like logging as that means we are making a decision on behalf of
the user that impacts IO. This kind of logging brings a system to its knees
at the very worst time - when things are failing.

That assumes that the (production?) logging system that you're using isn't
discarding overflow under pressure, which seems like something that
shouldn't be used in production, as it as you say, can bring the system
down under pressure.

Maybe the Publisher should catch errors from onError and do a
printStackTrace() before rethrowing it.

No, this is not desirable as we are causing IO on behalf of the user, and
a synchronized IO at that. Also, stack traces are not always very helpful
in an async system as it can point to one piece of a stream on a thread but
the user may have no clue what the final subscriber was, this is why it's
so helpful to catch errors and propagate to the final onError of the
subscriber.

Well, in this case you'll get a stack trace that will point out the line in
the user code that is violating the spec.

Having to assume Byzantine code in the same VM is not feasible as it
could
do reflection magic, exit the VM, generate invalid bytecode, deadlock, get
stuck in an endless loop or otherwise break things undetectibly from the
Publishers point of view.

@viktorklang https://github.com/viktorklang Can you elaborate on what
you mean here?

If people want to, they'll shoot their feet and others too, if they decide
to violate the spec, or are just careless or don't test their programs,
there's really nothing we can do to save them. This is why I'd recommend to
be loud about the spec violation so that the problem gets fixed. If a
Subscriber is found to be in violation of the contract, especially when it
comes to throwing exceptions when it is not allowed, then it has to be
considered compromised. This stems from the fact that if we allow both sync
and async Subscribers, then there's no difference between failing to
signal, and failing to process. If we can detect failing to signal, then we
can repeat, but repeating failing to process could lead to performing the
same side-effects multiple times, which breaks the spec in all sorts of
ways.

Another big benefit of this approach is that it is symmetric no matter if
the Subscriber is sync or async.

Exception throwing does not semantically behave the same in these two
cases (from the perspective of the user/developer/application) so being
symmetric in how errors are handled is not helpful. If it was symmetric in
behavior we wouldn't have the onError method and would just throw.

I don't understand, can you elaborate?

The simplest and most general solution is always
letting it bubble up to the UEH and let the person responsible for the
execution know what should be done in case of invalid implementations.

Agreed, but through 2 years of feedback from people filing bugs on Github
and my own production experience in handling errors (and losing them) in
async streams, leaving this to naturally occur in an async system leads to
bad results like hung user requests, infinite loops, silent swallowing of
errors etc.

Letting exceptions throw in an async system is generally far worse than
catching and routing them via the onError pipeline because of two things:

  1. they throw in completely random places without any try/catch/finally to
    allow cleanup/shutdown
  2. because of the first issue they prevent a terminal notification to the
    subscribers which means they can end up waiting indefinitely (hanging user
    requests and other such things).

Because of this, a library implementation will generally want to program
defensively and do their best to get exceptions captured and propagated
(including across async boundaries) using the onError pipeline,
regardless of whether the exception came "out of contract" (don't most
errors?). This then leaves the very far edge case of when onError itself
blows up, and I'm suggesting we leave the freedom for library
implementations to handle these as needed and not specify what they can't
do.

For these reasons I recommend we adopt phrasing that states that the
contract is to not throw exceptions from the Subscriber methods, but
because it will happen, that a library is given the freedom to handle these
"out of contract" errors in whatever way makes sense for it.

I really think the Reactive Streams spec needs to allow libraries to
choose how they will deal with "out of contract" error handling as it is
the reality of code that exceptions will be thrown, even if we say they
preferably shouldn't be.

If I interpret you correctly; this means that out-of-contract throws do not
require to be funnelled back into onError.
Right?


Reply to this email directly or view it on GitHubhttps://github.com//issues/52#issuecomment-43646649
.

Cheers,

@benjchristensen
Copy link
Contributor

onError, to me, is used to signal when the -upstream- completes in an
erronous way, not when the Subscriber itself throws an exception.
If this is not the case, could you point me to the part of the spec that
clarifies this?

It's not in the spec which is why we're discussing this :-)

It does say this though:

  • A Publisher SHOULD NOT throw an Exception. The only legal way to signal failure (or reject a Subscription) is via the Subscriber.onError method.

In an async system, the onError in a Subscriber is how they receive errors. If the error happens to have originated from the Subscriber itself it is still an error and the Publisher should attempt to deliver the error to where errors are expected, otherwise we risk not unsubscribing up or terminating the Subscriber down and leaving users hanging and not cleaning up resources.

Also, when the Subscriber methods are async, their exceptions won't be
propagating to the Publisher, leading to an inconsistency in this
reasoning: Who calls onError when the onX methods throw exceptions (that
they aren't allowed to).

Depends on whether it's synchronous or async (as per agreement in #46). In the async case it would typically be whatever the async scheduler is, but it's up to the implementation. If the Subscriber itself implemented their async scheduling, then there is nothing the library can do about it. When the library however is providing the scheduling and asynchrony mechanisms, it can do a lot to help in the case of failure. It can correctly emit onError down to the Subscriber and unsubscribe up to the Publisher. This is preferable so it doesn't leave things in a non-terminated state, never to be cleaned up or release resources or users waiting on them.

That assumes that the (production?) logging system that you're using isn't
discarding overflow under pressure, which seems like something that
shouldn't be used in production, as it as you say, can bring the system
down under pressure.

Why would our spec get involved in defining a logging system? We should not dictate that errors require being logged, that is most definitely an implementation detail and something that will be configured differently by everyone.

Well, in this case you'll get a stack trace that will point out the line in
the user code that is violating the spec.

If an implementation wants to do so, go ahead, just don't put it in the spec and require it.

If people want to, they'll shoot their feet and others too

Of course, all I'm saying is to NOT remove the right for an implementation to provide extra error protection. The specification should not state a finite list of exceptions considered fatal, or that exceptions from onNext can not be caught and propagated to onError.

If I interpret you correctly; this means that out-of-contract throws do not
require to be funnelled back into onError.

Nope, not at all what I said :-) I said that real-world code will break the contract (sometimes intentionally, but generally due to bugs) and throw exceptions from onNext, onComplete and onError. Libraries should not be restricted by the reactive-stream specification in capturing and propagating these errors, regardless of where they originate, to the Subscriber.onError since that is where errors are expected to go in an async system. Throwing is an absolute last ditch effort, only for fatal scenarios as it risks hanging users and resources because terminal notification or unsubscribe events will never be received.

@benjchristensen
Copy link
Contributor

Right after posting I remembered a use case we had in production. An onNext was blowing up somehow, can't remember why ... but let's assume it was an NPE.

If we had just thrown that NPE and done nothing to handle it, we would have killed our entire system eventually as the unsubscribe would have never propagated up, and the HTTP request would have never been released with an error because onError or onComplete would not have been called.

We do handle the errors though, and when onNext failed, we caught it, passed it correctly to onError so that the HTTP request was terminated with a 500, and we logged the errors and alerted that a given endpoint/route had problems.

The system continued behaving just fine and it was a trivial error affecting a small amount of traffic. No resources were tied up as a result of it.

Libraries need the flexibility in the spec to do this, or they will be forced to break the spec.

@viktorklang
Copy link
Contributor

Answers inline! :)

On Thu, May 22, 2014 at 7:49 AM, Ben Christensen
notifications@github.comwrote:

onError, to me, is used to signal when the -upstream- completes in an
erronous way, not when the Subscriber itself throws an exception.
If this is not the case, could you point me to the part of the spec that
clarifies this?

It's not in the spec which is why we're discussing this :-)

Ah, to me it is extremely important to know if we discuss to understand the
implications of the current specification or if we are talking about a
future specification.

It does say this though:

  • A Publisher SHOULD NOT throw an Exception. The only legal way to
    signal failure (or reject a Subscription) is via the Subscriber.onErrormethod.

I think we need to fix that, as it implies that if onError throws an
exception, it needs to be put back into onError. It also precludes logging
as that would be a way to signal failure. It also does not use proper
semantics (i.e. MUST, SHOULD etc)

In an async system, the onError in a Subscriber is how they receive
errors. If the error happens to have originated from the Subscriberitself it is still an error and the
Publisher should attempt to deliver the error to where errors are
expected, otherwise we risk not unsubscribing up or terminating the
Subscriber down and leaving users hanging and not cleaning up resources.

Yes, so lets split this thing up into two parts:

  1. (Which I am arguing) What is the rules that a legal implementation
    must follow?
  2. (What I believe you are arguing) What should we do with offending code?

Regarding 1: I think we should spec that the only legal exceptions that a
Subscriber is allowed to throw from onX is fatal exceptions as specced by
a list, and when such an exception is thrown, it needs to be rethrown so
that eventually it hits the UEH. A fatal exception means: We need to shut
down the JVM as it is in an inconsistent and possibly broken state.

Regarding 2: I think that it is important that broken code gets fixed STAT,
so when code is broken it needs to be avoided to create "it's not a bug,
it's a feature"-situations.

Also, when the Subscriber methods are async, their exceptions won't be
propagating to the Publisher, leading to an inconsistency in this
reasoning: Who calls onError when the onX methods throw exceptions (that
they aren't allowed to).

Depends on whether it's synchronous or async (as per agreement in #46#46).
In the async case it would typically be whatever the async scheduler is,
but it's up to the implementation. If the Subscriber itself implemented
their async scheduling, then there is nothing the library can do about it.
When the library however is providing the scheduling and asynchrony
mechanisms, it can do a lot to help in the case of failure. It can
correctly emit onError down to the Subscriber and unsubscribe up to the
Publisher. This is preferable so it doesn't leave things in a
non-terminated state, never to be cleaned up or release resources or users
waiting on them.

So let's summarize the consequences of your proposal:

  1. This means that onError may be called AT MOST ONCE after onComplete (if
    onComplete violates the specification)
  2. This means that if onError violates the specification, the outcome is
    undecided

That assumes that the (production?) logging system that you're using
isn't
discarding overflow under pressure, which seems like something that
shouldn't be used in production, as it as you say, can bring the system
down under pressure.

Why would our spec get involved in defining a logging system?

Where did I suggest that? It shouldn't.

We should not dictate that errors require being logged, that is most
definitely an implementation detail and something that will be configured
differently by everyone.

We never talked about dictating, we talked about recommendations. Any sane
library implementation is not going to mandate any logging
implementation, but is going to use something like SLF4J so that the
DevOps/Ops guys can configure and use an appropriate logging implementation
that will not suffer from the problems that you outlined. I was merely
counter-arguing your point which made assumptions regarding the
consequences of a particularly bad and non-recommended production logging
setup.

A quick glance reveals that most of the involved parties are already using
this approach:

SLF4J:

Reactor: https://github.com/reactor/reactor/blob/master/build.gradle#L28
Netty: https://github.com/netty/netty/blob/master/pom.xml#L441
Vert.x:
https://github.com/eclipse/vert.x/blob/master/vertx-core/build.gradle#L21
Akka:
https://github.com/akka/akka/blob/master/project/Dependencies.scala#L32

Well, in this case you'll get a stack trace that will point out the line
in
the user code that is violating the spec.

If an implementation wants to do so, go ahead, just don't put it in the
spec and require it.

See above regarding the recommendation.

If people want to, they'll shoot their feet and others too

Of course, all I'm saying is to NOT remove the right for an implementation
to provide extra error protection. The specification should not state a
finite list of exceptions considered fatal, or that exceptions from onNextcan not be caught and propagated to
onError.

See above discussion regarding specification vs violation.

If I interpret you correctly; this means that out-of-contract throws do
not
require to be funnelled back into onError.

Nope, not at all what I said :-) I said that real-world code will break
the contract (sometimes intentionally, but generally due to bugs) and throw
exceptions from onNext, onComplete and onError.

I think this warrants further discussion.
IMO: either throwing non-fatal is fine in the specification (legal)—a
feature—and then it becomes fine to throw intentionally,
OR it is not fine (illegal)—a bug—and then we need to make the party aware
S.T.A.T. so that they have a fighting chance to fix their problems.

Libraries should not be restricted by the reactive-stream specification in
capturing and propagating these errors, regardless of where they originate,
to the Subscriber.onError since that is where errors are expected to go
in an async system. Throwing is an absolute last ditch effort, only for
fatal scenarios as it risks hanging users and resources because terminal
notification or unsubscribe events will never be received.

Is onError throwing a non-fatal exception to be consider a fatal exception
(warranting rethrowing and shutting down the JVM) OR is it to be considered
transient and warrants logging (see above discussion regarding logging) or
something else?


Reply to this email directly or view it on GitHubhttps://github.com//issues/52#issuecomment-43850820
.

Cheers,

@rkuhn
Copy link
Member

rkuhn commented May 22, 2014

The way I read the spec is that if onNext throws an exception (i.e. something that can be handled instead of shutting down the JVM) then this should be treated as the Publisher failing, which mandates that onError is called—so your use-case is covered. If onError or onComplete throw, then the Publisher fails as well, but it cannot call anything further on that Subscriber since we have already called a terminal callback. That leaves onSubscribe, which I would treat exactly like onNext (i.e. Publisher fails, which mandates onError).

So as far as I can see we have a definitive set of rules already in place, maybe we just need to clarify that these are the consequences that arise from them?

@viktorklang
Copy link
Contributor

@rkuhn What would the prescribed way of dealing with contract violations in
this case mean? (i.e. what should the Publisher do if onError or onComplete
throws?)

On Thu, May 22, 2014 at 1:29 PM, Roland Kuhn notifications@github.comwrote:

The way I read the spec is that if onNext throws an exception (i.e.
something that can be handled instead of shutting down the JVM) then this
should be treated as the Publisher failing, which mandates that onError is
called—so your use-case is covered. If onError or onComplete throw, then
the Publisher fails as well, but it cannot call anything further on that
Subscriber since we have already called a terminal callback. That leaves
onSubscribe, which I would treat exactly like onNext (i.e. Publisher fails,
which mandates onError).

So as far as I can see we have a definitive set of rules already in place,
maybe we just need to clarify that these are the consequences that arise
from them?


Reply to this email directly or view it on GitHubhttps://github.com//issues/52#issuecomment-43876064
.

Cheers,

@rkuhn
Copy link
Member

rkuhn commented May 22, 2014

We know what it cannot do: call onError again. Failing (i.e. cleaning itself up by canceling possible upstream Publishers etc.) would certainly be an option, but so would treating this situation as if the Subscriber had canceled its Subscription. It should be up to the implementation to choose any non-violating strategy.

@rkuhn
Copy link
Member

rkuhn commented May 22, 2014

Ah, now I see a case that we might want to clarify: should exceptions from onNext terminate the Publisher (by failing it) or should they only cancel the Subscription?

@danarmak
Copy link

Sorry, I didn't see the last comment when I started replying. Edited this comment. What I have to add now is:

Publishers can support multicast, and 'hot' publishers especially want to remain alive for future consumers to subscribe to them. So exceptions from a single Consumer should cause only that one consumer's onError to be called, and the consumer to be unsubscribed. Therefore the spec should not be worded as:

if onNext throws an exception [...] then this should be treated as the Publisher failing

@alexandru
Copy link
Author

On Thu, May 22, 2014 at 3:46 PM, Roland Kuhn notifications@github.comwrote:

Ah, now I see a case that we might want to clarify: should exceptions from
onNext terminate the Publisher (by failing it) or should they only cancel
the Subscription?

If the Publisher is unicast, then canceling the Subscription would
terminate the publisher too. If the Publisher is multicast, then I don't
think terminating the publisher is a good way of handling this.

Alexandru Nedelcu
https://bionicspirit.com

@alexandru
Copy link
Author

On Sat, May 17, 2014 at 1:11 PM, Johannes Rudolph
notifications@github.com wrote:

  1. can a Publisher emit onComplete / onError without being asked

Yes. The publisher can even call one of those methods instead of onSubscribe.

I want to ask again this question for clarifications.

I think that after an onSubscribe happens, then onComplete should
be counted as a part of N from Subscription.request(N). In case I
misunderstood the spec that is, my concern is this:

onComplete is basically an EOF, a marker that indicates the end of
the stream and first of all without back-pressure applied, it may mean
that onComplete can arrive before the last onNext and also, from a
Subscriber's perspective, onComplete can be just another event
waited upon to execute some logic. Think about implementing a
foldLeft subscriber. A foldLeft subscriber would need to
accumulate a result, emitting it somewhere else after onComplete.

The spec currently says this:

The number of onNext events emitted by a Publisher to a Subscriber MUST NOT exceed the cumulative demand that has been signaled via that Subscriber’s Subscription.

I think that should be changed to this:

The number of onNext events plus the final onComplete event emitted by a Publisher to a Subscriber MUST NOT exceed the cumulative demand that has been signaled via that Subscriber’s Subscription.

I would go even further and require that for onError as well, since
the whole point of streaming an error downstream is for downstream to
do stuff with it and an onError may be an event requiring
back-pressure for certain subscribers. I mean, why not?

I apologize for redundancy in case this was already addressed or in case I misunderstood the spec.

@danarmak
Copy link

I don't think onError should have to be requested. If a subscriber doesn't request more items for some time, the publisher should still be able to propagate the error downstream, if only so the whole stream can fail fast.

Could you give a concrete example of a subscriber whose onError handling requires back pressure handling? I think it would be the exception, not the norm.

@viktorklang
Copy link
Contributor

@alexandru the request for subscription, completion and/or error is implicit in the act of subscribing. if you want to defer subscription (i.e. hold off for all onX calls, then you just wait with calling subscribe on the publisher). Having to actively read/write in order to get a notification that the "connection" is broken or closed has been the source of many a WTFs for the Sockets library. Does that make sense?

@alexandru
Copy link
Author

On Thu, May 22, 2014 at 6:04 PM, Viktor Klang (√) notifications@github.com
wrote:

@alexandru https://github.com/alexandru the request for subscription,

completion and/or error is implicit in the act of subscribing. if you want
to defer subscription (i.e. hold off for all onX calls, then you just wait
with calling subscribe on the publisher). Having to actively read/write in
order to get a notification that the "connection" is broken or closed has
been the source of many a WTFs for the Sockets library. Does that make
sense?

Unfortunately I don’t know about the WTFs you’re talking about, if you can
point out to some article I could read, I would appreciate it. You’re
probably right, but this complicates things.

For example, here’s my implementation of the Rx concat operator (i.e.
flatten / flatMap):
https://github.com/alexandru/monifu/blob/v0.9.4/monifu/src/shared/scala/monifu/reactive/Observable.scala#L228

This operator is asynchronous in nature. With back-pressure applied, my
concat doesn’t need buffering (compared to RxJava’s current implementation)
and elements are emitted in order (compared with Rx merge). But in order
for this to work, I have to know when a child Observable (from the list of
produced Observables) is complete, such that upstream can emit the next
Observable to be subscribed. So this operator uses onComplete events for
applying back-pressure and the observer receiving it needs to be ready for
it. My observers (i.e. subscribers) return Future[Continue] and
Future[Done]for applying back-pressure, which would be roughly
equivalent to calling
Subscription.request(1) and Subscription.cancel(). So it’s a Stop-and-Wait
scheme. And if onComplete doesn’t need to wait for acknowledgement, than
this implementation would be broken.

I don’t really know the implications right now, maybe I’m worrying about
nothing.

Alexandru Nedelcu
https://bionicspirit.com

@viktorklang
Copy link
Contributor

On Thu, May 22, 2014 at 5:23 PM, Alexandru Nedelcu <notifications@github.com

wrote:

On Thu, May 22, 2014 at 6:04 PM, Viktor Klang (√) <
notifications@github.com>
wrote:

@alexandru https://github.com/alexandru the request for subscription,

completion and/or error is implicit in the act of subscribing. if you
want
to defer subscription (i.e. hold off for all onX calls, then you just
wait
with calling subscribe on the publisher). Having to actively read/write
in
order to get a notification that the "connection" is broken or closed
has
been the source of many a WTFs for the Sockets library. Does that make
sense?

Unfortunately I don’t know about the WTFs you’re talking about, if you can
point out to some article I could read, I would appreciate it. You’re
probably right, but this complicates things.

Google for "Detect closed TCP connection" or "Detect broken TCP connection".

For example, here’s my implementation of the Rx concat operator (i.e.
flatten / flatMap):

https://github.com/alexandru/monifu/blob/v0.9.4/monifu/src/shared/scala/monifu/reactive/Observable.scala#L228

This operator is asynchronous in nature. With back-pressure applied, my
concat doesn’t need buffering (compared to RxJava’s current
implementation)
and elements are emitted in order (compared with Rx merge). But in order
for this to work, I have to know when a child Observable (from the list of
produced Observables) is complete, such that upstream can emit the next
Observable to be subscribed. So this operator uses onComplete events for
applying back-pressure and the observer receiving it needs to be ready for
it.

The total buffer needed is in the worst case:

1 + N + 1

onSubscribe + request(N) + ( onComplete || onError )

My observers (i.e. subscribers) return Future[Continue] and
Future[Done]for applying back-pressure, which would be roughly
equivalent to calling
Subscription.request(1) and Subscription.cancel(). So it’s a Stop-and-Wait
scheme. And if onComplete doesn’t need to wait for acknowledgement, than
this implementation would be broken.

What do you mean by "acknowledgement"?

I don’t really know the implications right now, maybe I’m worrying about
nothing.

Thanks for raising it, I don't know if you have anything to worry about,
but so far everything looks fine over here :)

Alexandru Nedelcu
https://bionicspirit.com

Reply to this email directly or view it on GitHubhttps://github.com//issues/52#issuecomment-43902865
.

Cheers,

@alexandru
Copy link
Author

On Thu, May 22, 2014 at 6:39 PM, Viktor Klang (√)
notifications@github.comwrote:

My observers (i.e. subscribers) return Future[Continue] and
Future[Done]for applying back-pressure, which would be roughly
equivalent to calling
Subscription.request(1) and Subscription.cancel(). So it’s a
Stop-and-Wait
scheme. And if onComplete doesn’t need to wait for acknowledgement, than
this implementation would be broken.

What do you mean by "acknowledgement"?

I meant that the publisher in my implementation needs to wait for
acknowledgement that it can send onComplete, in my stop-and-wait scheme.
On the other hand I see now how my problem can be solved - sorry for the
noise.

For regular folks, an explicit mention in the spec that onComplete/onError
is not covered by request(n) wouldn't hurt - since if there's important
work to be done onComplete and it needs to happen after the last onNext is
done, then the Subscriber implementation needs to schedule work to be done
asynchronously.

Alexandru Nedelcu
https://bionicspirit.com

@benjchristensen
Copy link
Contributor

I think we need to fix that, as it implies that if onError throws an
exception, it needs to be put back into onError. It also precludes logging
as that would be a way to signal failure. It also does not use proper
semantics (i.e. MUST, SHOULD etc)

I agree it needs to be fixed, which is what we're discussing here. But it does use the semantics of SHOULD NOT. They may be wrong, but it's using them, so what do you mean?

Regarding 2: I think that it is important that broken code gets fixed STAT,
so when code is broken it needs to be avoided to create "it's not a bug,
it's a feature"-situations.

The reality is that there is always broken code. The only way to not have any broken code is for literally every method implementation in the entire codebase to have try/catch(Throwable) { onError(e)}, and that is unreasonable.

  1. This means that onError may be called AT MOST ONCE after onComplete (if
    onComplete violates the specification)

Yes that works.

  1. This means that if onError violates the specification, the outcome is
    undecided

Yes. All that can be done at that point is throw an exception.

I think this warrants further discussion.
IMO: either throwing non-fatal is fine in the specification (legal)—a feature—and then it becomes fine to throw intentionally,
OR it is not fine (illegal)—a bug—and then we need to make the party aware S.T.A.T. so that they have a fighting chance to fix their problems.

Unfortunately when it comes to error handling I think it's more nuanced than this. Errors are generally by definition unexpected. They are exceptions, hence the name Exception.

Thus, a Subscriber SHOULD NOT throw, but if it does (has no other choice, buggy code, whatever) then is is RECOMMENDED to do everything possible to get that error sent via onError so proper termination logic can occur.

I don't want to encourage throwing exceptions, as when things are async it's always possible to lose one, but it can't be a hard-and-fast MUST NOT rule that results in the system breaking if an exception is thrown.

Is onError throwing a non-fatal exception to be consider a fatal exception
(warranting rethrowing and shutting down the JVM) OR is it to be considered
transient and warrants logging (see above discussion regarding logging) or
something else?

Definitely not worth shutting down the JVM.

The way I read the spec is that if onNext throws an exception (i.e. something that can be handled instead of shutting down the JVM) then this should be treated as the Publisher failing, which mandates that onError is called—so your use-case is covered.

I agree with this and it solves most problems.

If onError or onComplete throw, then the Publisher fails as well, but it cannot call anything further on that Subscriber since we have already called a terminal callback. That leaves onSubscribe, which I would treat exactly like onNext (i.e. Publisher fails, which mandates onError).

I would modify this slightly to allow calling onError if onComplete fails.

We know what it cannot do: call onError again. Failing (i.e. cleaning itself up by canceling possible upstream Publishers etc.) would certainly be an option, but so would treating this situation as if the Subscriber had canceled its Subscription. It should be up to the implementation to choose any non-violating strategy.

Agreed.

Ah, now I see a case that we might want to clarify: should exceptions from onNext terminate the Publisher (by failing it) or should they only cancel the Subscription?

I think it should just result in onError being passed down.

I think that after an onSubscribe happens, then onComplete should
be counted as a part of N from Subscription.request(N). In case I
misunderstood the spec that is, my concern is this:

No, I think Subscribers should always just expect n+1 for a terminal event.

@tmontgomery
Copy link

FYI. The WTF that arises from TCP half-open connections for most people results from a basic (mis)understanding of distributed systems and the history of responsibilities with TCP. Historically, TCP does not take application liveness into account from the spec perspective, but BCP is that implementations provide keepalive semantics to apps. This leads to most application protocols having to be specified without TCP keepalive.

I mention this for clarification. I think @viktorklang is correct. And if this is assumed, has some implications on a control protocol design.

@viktorklang
Copy link
Contributor

On Thu, May 22, 2014 at 6:02 PM, Ben Christensen
notifications@github.comwrote:

I think we need to fix that, as it implies that if onError throws an
exception, it needs to be put back into onError. It also precludes logging
as that would be a way to signal failure. It also does not use proper
semantics (i.e. MUST, SHOULD etc)

I agree it needs to be fixed, which is what we're discussing here. But it
does use the semantics of SHOULD NOT. They may be wrong, but it's using
them, so what do you mean?

I meant: "The only legal way to signal failure (or reject a Subscription)
is via theSubscriber.onError method." this sentence does not use
MUST/SHOULD/etc wording, and looks more like a footnote. I think that
SHOULD be fixed :)

Regarding 2: I think that it is important that broken code gets fixed
STAT,
so when code is broken it needs to be avoided to create "it's not a bug,
it's a feature"-situations.

The reality is that there is always broken code. The only way to not have
any broken code is for literally every method implementation in the entire
codebase to have try/catch(Throwable) { onError(e)}, and that is
unreasonable.

As a user, you have a choice:

a) You take responsibility for implementing the spec and create your own
raw Subscriber.
b) You are unsure on how to do it right and delegate the details to a
library, see my example:

def safeSubscriber[T](us: Subscriber[T], logger: Logger): Subscriber[T] =
new Subscriber {
override def onSubscribe(subscription: Subscription) = try
us.onSubscribe(subscription) catch {
case NonFatal(t) => onError(new IllegalStateException("Reactive Streams
Specification violation: onSubscribe threw an exception", t))
}

override def onNext(element: T) = try us.onNext(element) catch {
case NonFatal(t) => onError(new IllegalStateException("Reactive Streams
Specification violation: onNext threw an exception", t))
}

override def onError(throwable: Throwable) = try us.onError(throwable)
catch {
case NonFatal(t) => logger.error(t, "Reactive Streams Specification
violation: onError for {} threw an exception while handling {}.", us,
throwable)
}

… and so on and so forth
}

  1. This means that onError may be called AT MOST ONCE after onComplete
    (if
    onComplete violates the specification)

Yes that works.

What I don't like about it is that it becomes a feature? (I can abort my
own flow, not by unsubscribing, but by violating the spec)

In any case, I don't think the exception should ever be put "raw" into
onError in case of specification violation, I think it should be wrapped as
such: onError(new IllegalStateException("Reactive Streams Specification
violation: onNext threw an exception", t))
The actual type of exception could be discussed: Should we have a specific
type in the spec? Should we use AssertionError, other suggestion?

The benefits are:
a) The user knows immediately that they did something wrong
b) The user knows immediately what they need to do to fix it
c) It doesn't hide the fact that it is a specification violation

  1. This means that if onError violates the specification, the outcome is
    undecided

Yes. All that can be done at that point is throw an exception.

Well, besides logging it.

I think this warrants further discussion.
IMO: either throwing non-fatal is fine in the specification (legal)—a
feature—and then it becomes fine to throw intentionally,
OR it is not fine (illegal)—a bug—and then we need to make the party aware
S.T.A.T. so that they have a fighting chance to fix their problems.

Unfortunately when it comes to error handling I think it's more nuanced
than this. Errors are generally by definition unexpected. They are
exceptions, hence the name Exception.

That's not how they are used on the JVM though. IllegalArgumentException is
validation disguised as an error, one could even argue that the entire
class of checked exceptions are a part of the defined protocol for methods,
and are to be expected. Not to mention the sad reality that much Java code
catches Throwable, cannot handle InterruptedException properly and does not
know which throwables are not safe to continue the JVM running after they
have been thrown.

Thus, a Subscriber SHOULD NOT throw, but if it does (has no other choice,
buggy code, whatever) then is is RECOMMENDED to do everything possible to
get that error sent via onError so proper termination logic can occur.

See above discussion re: safeSubscriber and Specification Violation
exceptions.

I don't want to encourage throwing exceptions, as when things are async
it's always possible to lose one, but it can't be a hard-and-fast MUST NOT
rule that results in the system breaking if an exception is thrown.

Is onError throwing a non-fatal exception to be consider a fatal exception
(warranting rethrowing and shutting down the JVM) OR is it to be considered
transient and warrants logging (see above discussion regarding logging) or
something else?

Definitely not worth shutting down the JVM.

But now we may have resource leaks piling up?

The way I read the spec is that if onNext throws an exception (i.e.
something that can be handled instead of shutting down the JVM) then this
should be treated as the Publisher failing, which mandates that onError is
called—so your use-case is covered.

I agree with this and it solves most problems.

If onError or onComplete throw, then the Publisher fails as well, but it
cannot call anything further on that Subscriber since we have already
called a terminal callback. That leaves onSubscribe, which I would treat
exactly like onNext (i.e. Publisher fails, which mandates onError).

I would modify this slightly to allow calling onError if onComplete fails.

Which would only ever happen if the specification is followed, I don't want
to have sane and legal implementations of the spec having to defensively
encode whether it is ok for onError to be called after onComplete.

We know what it cannot do: call onError again. Failing (i.e. cleaning
itself up by canceling possible upstream Publishers etc.) would certainly
be an option, but so would treating this situation as if the Subscriber had
canceled its Subscription. It should be up to the implementation to choose
any non-violating strategy.

Agreed.

Ah, now I see a case that we might want to clarify: should exceptions from
onNext terminate the Publisher (by failing it) or should they only cancel
the Subscription?

I think it should just result in onError being passed down.

I think that after an onSubscribe happens, then onComplete should
be counted as a part of N from Subscription.request(N). In case I

misunderstood the spec that is, my concern is this:

No, I think Subscribers should always just expect n+1 for a terminal
event.


Reply to this email directly or view it on GitHubhttps://github.com//issues/52#issuecomment-43908409
.

Cheers,

@viktorklang
Copy link
Contributor

Great elaboration, Todd!

On Thu, May 22, 2014 at 6:55 PM, Todd L. Montgomery <
notifications@github.com> wrote:

FYI. The WTF that arises from TCP half-open connections for most people
results from a basic (mis)understanding of distributed systems and the
history of responsibilities with TCP. Historically, TCP does not take
application liveness into account from the spec perspective, but BCP is
that implementations provide keepalive semantics to apps. This leads to
most application protocols having to be specified without TCP keepalive.

I mention this for clarification. I think @viktorklanghttps://github.com/viktorklangis correct. And if this is assumed, has some implications on a control
protocol design.


Reply to this email directly or view it on GitHubhttps://github.com//issues/52#issuecomment-43914899
.

Cheers,

@viktorklang
Copy link
Contributor

Move to close. @reactive-streams/contributors

@benjchristensen
Copy link
Contributor

👍 for closing. Error handling will be defined in the spec adequately I think with the change in #68

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants