Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Underlying protocol changes #123

Closed
alexandru opened this Issue Mar 2, 2016 · 1 comment

Comments

Projects
None yet
1 participant
@alexandru
Copy link
Member

commented Mar 2, 2016

The underlying protocol needs to change:

  1. back-pressuring for onComplete and onError becomes optional, because this allows the data-source to signal completion and errors faster. This is especially important for errors, because when an error happens you want to close the connection as fast as possible. So what happens if the downstream consumer is blocked? What happens if that's itself the problem?
  2. Observable's characteristic function, unsafeSubscribe, will now return a Cancelable instead of Unit. This allows for clearing resources faster in case we want to drop a connection.

The reason for change number one is that it's not OK if you have to wait on the consumer in order to (forcefully) close a connection. Think of a network socket. If the client is not responding, it would be cool the socket could be forcefully closed after a timeout. And for this to happen, we have to deliver the onError as fast as possible. This does not have an impact on user-level usage. The subscribe meant for users will wrap subscribers into SafeSubscriber, which will apply back-pressure for onError / onComplete, so nothing changes there. On the other hand unsafeSubscribe is, as it says, unsafe.

The reason for change number two is that there are instances in which we want to clean-up resources faster. For example we could think about this relationship:

obs.debounce(1.second) == 
  obs.switchMap(x => Observable.now(x).delaySubscription(1.second))

Until now only the Observer was capable of canceling the source, by returning Cancel as a result to onNext. But this means that the above is totally unsafe, because switchMap will register runnables to execute with a delay in the Scheduler, but it will not be able to cancel those tasks before execution. So if the source emits a lot of items, the above will crash the process. Also, consider stuff like this:

val o1 = Observable.eval(println("hello")).delayOnNext(10.seconds)
val o2 = Observable.error(new TimeoutException).delaySubscription(1.second)
Observable.amb(o1, o2)

In this case we are creating a race condition by starting 2 observables and the first one that emits a signal will win the race (with the amb operator). But it would be nice to cancel the other one, no?

These 2 protocol changes combined means we can easily have an operator like this:

// If the consumer takes longer than 5 seconds to process an `onNext`,
// then interrupt the source and send an error downstream.
observable.timeoutDownstream(5.seconds)

Such an operator would be more difficult to implement in version 1.0.

@alexandru alexandru referenced this issue Mar 2, 2016

Closed

Protocol #124

alexandru added a commit that referenced this issue Mar 2, 2016

@alexandru alexandru added this to the 2.0 milestone Mar 2, 2016

@alexandru

This comment has been minimized.

Copy link
Member Author

commented Mar 2, 2016

It's done.

@alexandru alexandru closed this Mar 2, 2016

alexandru added a commit that referenced this issue Mar 22, 2016

alexandru added a commit that referenced this issue Jun 6, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.