Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Contract Definition – Multicast/Unicast – Simplified Types #41

Closed
wants to merge 2 commits into from

Conversation

benjchristensen
Copy link
Contributor

Updates to README to match simplified types from #40 and update contract based on discussions in unicast/multicast (#19) and closed PR #37

@@ -19,7 +19,7 @@ The latest preview release is available on Maven Central as

Handling streams of data—especially “live” data whose volume is not predetermined—requires special care in an asynchronous system. The most prominent issue is that resource consumption needs to be carefully controlled such that a fast data source does not overwhelm the stream destination. Asynchrony is needed in order to enable the parallel use of computing resources, on collaborating network hosts or multiple CPU cores within a single machine.

The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundarythink passing elements on to another thread or thread-poolwhile ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, backpressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. The benefits of asynchronous processing would be negated if the communication of backpressure were synchronous (see also the [Reactive Manifesto](http://reactivemanifesto.org/)), therefore care has been taken to mandate fully non-blocking and asynchronous behavior of all aspects of a Reactive Streams implementation.
The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundarythink passing elements on to another thread or thread-poolwhile ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, backpressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. The benefits of asynchronous processing would be negated if the communication of backpressure were synchronous (see also the [Reactive Manifesto](http://reactivemanifesto.org/)), therefore care has been taken to mandate fully non-blocking and asynchronous behavior of all aspects of a Reactive Streams implementation.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "fully non-blocking and asynchronous behavior of all aspects of a Reactive Streams implementation" [emphasis mine] is a little too inclusive given there will be times that actions taken in an implementation are non-blocking but not asynchronous. Shouldn't this language be restricted to the specific components in which this behavior should be indisputable vs times when its allowed to be synchronous as long as its non-blocking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I had focused more on the lower parts of the document where I had changed it to clearly state it's about being non-blocking, not async vs sync for much of it.

The only two places where truly async behavior is needed as I see it are:

  1. the Subscription.request method (and possible cancel)
  2. The need for a Subscriber to use request(n) in the first place which implies that somewhere down the chain it is async (buffering).

Experience with prototypes in RxJava have led me to believe the Subscription.request method should allow receiving -1 or some other value to represent infinite – just send me as fast as you can.

This is a very valid use case for a variety of things. For example, if I have a Publisher that represents a file and it's a choice of the Subscriber whether it is consumed synchronously or asynchronously, the Subscriber should be able to tell the Subscription to send without restriction.

Similarly, if I'm streaming metrics, I an do request(-1) and then just sample or throttle, since request(n) doesn't really make much sense in that use case. The data is flowing along at whatever rate it does and I always want to sample whatever the latest value is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose to discuss the topic of asynchrony in #46 and open new issues for discussing the other things: we should focus on fixing the README so that it matches the code again ASAP (and then update the website with correct links as well).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

focus on fixing the README so that it matches the code again ASAP (and then update the website with correct links as well).

Agreed, which was my hope with this PR even though I knew it would continue evolving over time.

- The `Publisher.subscribe` method can be called as many times as wanted as long as it is with a different `Subscriber` each time. It is up to the `Publisher` whether underlying streams are shared or not. In other words, a `Publisher` can support multi-subscribe and then choose whether each `Subscription` is unicast or multicast.
- A `Publisher` can refuse subscriptions (calls to `subscribe`) if it is unable or unwilling to serve them (overwhelmed, fronting a single-use data source, etc) and can do so by calling `Subscriber.onError` instead of `Subscriber.onSubscribe` on the `Subscriber` instance calling `subscribe`.
- A `Publisher` should not throw an `Exception`. The only legal way to signal failure (or reject a `Subscription`) is via the `Subscriber.onError` method.
- The `Subscription.request` method must behave asynchronously (separate thread, event loop, trampoline, etc) so as to not cause a StackOverflow since `Subscriber.onNext` -> `Subscription.request` -> `Subscriber.onNext` can recurse infinitely. This allows a `Subscriber` to directly invoke `Subscription.request` and isolate the async responsibility to the `Subscription` instance which has responsibility for scheduling events.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a little tricky because, using the RingBuffer as an example, it would be impossible to fulfill this requirement as-is. I would deadlock the thread if I tried to checkout another slot in the RingBuffer while in the event thread of the RingBuffer itself. The only solution to this is what we do in Reactor which is simulate tail recursion by placing a task to be dispatched onto a Queue that will be emptied after the current event handling method has returned.

It also doesn't seem optimal to enforce crossing a thread boundary here. When using the RingBuffer I only ever use a single thread so I can certainly make task execution asynchronous but I can't make it run in another thread (nor would I want to since I'm gaining a lot of performance by not context-switching).

If we're going to mandate crossing an asynchronous boundary here I think it should be fairly explicit what we expect. I don't feel like the current text does enough to explain what's expected and why.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is certainly one of the more tricky things I came across in the implementation. First of all, I am not sure that only allowing async boundary is sensible. For testing purpose, the flow behavior should be the same synchronously. E.g simple RxJava Observable.from(xxx) flow or Reactor Streams.defer(xxx). It is certainly optimized towards async, but what does that mean is we should have a -1 marker to let the Subscription dismisses the backpressure and just fully drain its source. If the publisher detects such capacity on one of its subscriptions, it should never buffer/queue but directly call the subscriber.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also doesn't seem optimal to enforce crossing a thread boundary here

By async this does not mean crossing thread boundaries. If the producer and consumer are on the same thread this can be achieved via trampolining for example.

I think #46 is a good discussion to continue on this. I won't update this text for now as it feels like it needs further clarification.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should have a -1 marker to let the Subscription dismisses the backpressure and just fully drain its source

I would definitely like to add this to the contract.


*If N is the total number of demand tokens handed to the Publisher P by a Subscriber S during the time period up to a time T, then the number of onNext calls that are allowed to be performed by P on S before T must be less than or equal to N. The number of pending demand tokens must be tracked by the Producer separately for each of its subscribers.*
*If N is the total number of demand tokens handed to the `Publisher` P by a `Subscriber` S during the time period up to a time T, then the number of `onNext` calls that are allowed to be performed by P on S before T must be less than or equal to N. The number of pending demand tokens must be tracked by the `Producer` separately for each of its subscribers.*

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Me no understand. 😕

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The producer must check the capacity on the subscription/subscriber before handing over any onNext. Until Subscription#request is effectively called, the capacity remains null at first. However onSubscribe will probably generate a request(xxx) from the subscriber which will lead to adding to its capacity the requested number then signaling the Publisher to broadcast its onNext events from its buffer (if any, if it is a cold/replayable stream).

@mattroberts297
Copy link

Please accept my apologies for joining the discussion late and, perhaps more importantly, uninvited.

But I'm not sure how multicast (or multi-subscribe for that matter) brings you closer to solving the problem of non-blocking backpressure (which seems to be the unique selling point of this SPI).

Moreover, attempting to solve multicasting in Publisher (I'll steer clear of a naming discussion) appears to drastically complicate the class’ contract definition and violate the single responsibility principle.

Further, I'm not entirely convinced that a 'multicaster' (currently Publisher) should be responsible for choosing the 'overflow strategy'. Instead, one could argue that the individual subscribers are best placed to decide whether a multicaster should buffer, resample or drop their messages.

Finally, thank you for all of your work on this. I really enjoyed reading through your discussion in #19.

@benjchristensen
Copy link
Contributor Author

Please accept my apologies for joining the discussion late and, perhaps more importantly, uninvited

Not a problem at all, this is open for exactly this purpose, for anyone to jump in and get involved! Thank you for doing so.

But I'm not sure how multicast (or multi-subscribe for that matter) brings you closer to solving the problem of non-blocking backpressure (which seems to be the unique selling point of this SPI).

It doesn't. It actually has little to do with back pressure other than affecting how a Publisher may choose to handle it.

The issue was whether this spec should require or dictate unicast or multicast and the discussion at #19 was about removing the requirement for either and letting a Publisher implementation choose whatever it wishes. The reason is that different sources of data naturally lend themselves to one or the other.

I'm not entirely convinced that a 'multicaster' (currently Publisher) should be responsible for choosing the 'overflow strategy'.

Agreed, but this spec is not intended to define strategies for handling backpressure, only a mechanism for communicating capacity/demand. Each implementation of a Publisher/Subscriber will define available strategies and solutions for this.

For example, in Rx there will be mechanisms for choosing buffering, throttling, sampling, unsubscribing, pausing, blocking and probably others. I can't speak to Akka or Reactor or others, but each implementation will choose how to expose or communicate strategies of what to do.

Thus, you could implement a Publisher that exposes strategies like "buffer", "resample" and "drop" that the Subscriber can choose from at subscription or instantiation time.

For example:

Publisher p = new MyStreamPublisher(Strategy.DROP);
p.subscribe(new MySubscriber());

Does this make sense, or is there something that should be adjusted about the contract or interfaces to better achieve these goals?

@rkuhn
Copy link
Member

rkuhn commented Apr 28, 2014

Fully agreed, @benjchristensen. In Akka we will have combinators which express different schemes of attaching and detaching streams: buffering, dropping, conflation, but also many strategies for fan-in and fan-out. The Publisher type is only in this spec because we need to have a place in the interfaces which creates a Subscription and stands in as a name for “data source”; its semantics will vary widely, even within a given library implementation.

@mattroberts297
Copy link

Thank you both @benjchristensen, @rkuhn for taking the time to respond to my queries and for giving insight into your planned implementation(s). All your responses make complete sense and greatly improved my understanding - which was previously lacking! Just to test my understanding and for the sake of completeness: I could, hypothetically, overload the subscribe method on my Publisher implementation to accept a strategy at subscription time e.g.

trait SomePublisher[T] extends Publisher[T] {
  ...
  def subscribe(subscriber: Subscriber[T], strategy: Strategy): Unit
}

Finally, to answer your question, no I cannot envisage a better way of achieving these goals! In fact, I'd go so far as to say it's the most elegant mechanism I've seen to date for handling back pressure in an asynchronous manner. Myself, and the team I work with, are excited to try out some of the implementations. Hopefully at some point we'll get an opportunity to contribute back.

@smaldini
Copy link
Contributor

@benjchristensen

The Subscription.request method must behave asynchronously (separate thread, event loop, trampoline, etc) so as to not cause a StackOverflow since Subscriber.onNext -> Subscription.request -> Subscriber.onNext can recurse infinitely. This allows a Subscriber to directly invoke Subscription.request and isolate the async responsibility to the Subscription instance which has responsibility for scheduling events.

If the Subscription owns the state, it should be aware that a given subscriber has no capacity and stop the recursion at that stage by buffering, or alternatively if it has capacity it doesn't need to buffer and can call onNext which will recurse until completion. This behavior is the current Reactor one and passes the actual TCK, would that be correct ? Of course if Subscriber.onNext dispatches asynchronously first thing this also prevents stackOverflow, but it doesn't prevent the code to be synchronous friendly for e.g. testing environment. Reactor can use a RingBuffer dispatcher (event loop like) or Synchronous Dispatcher without distinction on the Subscriber side for now. If the async rule is enforced I will probably restrict the dispatching to RingBuffer.

@benjchristensen
Copy link
Contributor Author

@mattroberts297

The following code could be used by an implementation:

trait SomePublisher[T] extends Publisher[T] {
  ...
  def subscribe(subscriber: Subscriber[T], strategy: Strategy): Unit

  def subscribe(subscriber: Subscriber[T]): Unit
}

However, since it doesn't comply with the interface, it would not work for interop, as the normal subscribe(subscriber) method is the only one to be invoked using the interface. Thus, for interop a default strategy would need to be used.

Alternatively, the SomePublisher can have a constructor that allows it to be constructed with the strategy and then anyone who uses subscribe(subscriber) would get the correct strategy as defined in the constructor.

For example:

new SomePublisher(Strategy.DROP).subscribe(subscriber)

@benjchristensen
Copy link
Contributor Author

@smaldini

This behavior is the current Reactor one and passes the actual TCK, would that be correct ?

The current TCK was built against a moving target and the contract definition before we started discussing things, so I have no idea what the TCK is validating.

If the async rule is enforced I will probably restrict the dispatching to RingBuffer.

If all the request method does is increment a counter and return, then that's fine and meets the requirement. Perhaps the wording can be clarified on this, because incrementing a counter does not need to be scheduled. Kicking off the work that will result in onNext does.

The only thing that really needs to be "enforced" is that a StackOverflow can't occur, and that means the request method should not result in synchronously calling onNext.

it doesn't prevent the code to be synchronous friendly

Can you explain more what you mean by this?

@smaldini
Copy link
Contributor

This has no issue running synchronously:

Stream<Integer> stream = Streams.defer(list).filter(integer -> true).map(integer -> integer).flatMap(Streams::defer);
stream.subscribe(new Subscriber(){
  //...
  public void onSubscribe(Subscription subscription){
    subscription.request(1000); //will stop after 5 and calls onComplete since list has only 5 elements
  }
});

Current subscription code (WIP, e.g. the buffer queue especially):
https://github.com/reactor/reactor/blob/reactive-streams/reactor-core/src/main/java/reactor/rx/StreamSubscription.java#L50

If the subscriber is a Reactor one (a subject like MapManyAction, MapAction etc), it will dispatch asynchronously inside mapManyAction.onNext, but in that case it will be synchronous unless we decide that some of the asynchronous boundary needs to move to publisher so Publisher.subscribe has to run asynchronously ?

@mattroberts297
Copy link

@benjchristensen Sorry, not my finest hour. ... was meant to be the interface implementation, but that wasn't clear. I take your point on the interop side of things though. Building on that, would this be a valid implementation?

val p = new SomePublisher()
val s1 = new SomeSubscriber(Strategy.DROP) // alternatively, new DroppingSubscriber()
val s2 = new SomeSubscriber(Strategy.RESAMPLE) // or, new ResamplingSubscriber()
p.subscribe(s1)
p.subscribe(s2)

Just to provide some context, the reason I'm pursuing this line of questioning is that for some of our work we've had one 'publisher' and many 'subscribers' each with different overflow strategies.

@smaldini
Copy link
Contributor

smaldini commented May 1, 2014

@mattroberts297 what about moving this logic into the Subscription rather than the Subscriber ? Each subscription is the unique state between a given publisher and subscriber, therefore you can do all kind of buffering/dropping/short circuit etc from there without impacting others subscribers.

@mattroberts297
Copy link

@smaldini Subscription feels like a sensible place to put this logic. But I think an implementation may decide to hide the Subscription and just expose the Publisher and Subscriber. What are your thoughts?


#### Comparison with related technologies ####
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this section removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a few problems with it while revising this:

  1. It's not comprehensive to all possible technologies or solutions, particularly when you leave the JVM world and try and take into account everything going on in Javascript, Go, Dart, .Net, etc.
  2. The "related technologies" are moving targets (such as the fact that Rx is working with this initiative to support all of this) thus baking any statements into this document then requires upkeep.
  3. This document and project should focus on what this new specification and contract is, not what other projects are.

In short, comparisons should live elsewhere, not in the specification and contract documentation which should be timeless.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is a good point; we should probably open the wiki for that purpose

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#49 created.

@benjchristensen
Copy link
Contributor Author

@mattroberts297

Building on that, would this be a valid implementation?

val p = new SomePublisher()
val s1 = new SomeSubscriber(Strategy.DROP) // alternatively, new DroppingSubscriber()
val s2 = new SomeSubscriber(Strategy.RESAMPLE) // or, new ResamplingSubscriber()
p.subscribe(s1)
p.subscribe(s2)

I can envision an implementation like that. The Publisher would probably be setup to DROP if it doesn't receive request(n), and that would mean the Strategy.RESAMPLE Subscriber would keep calling request(n) but then do sampling on top of it.

How I'd proceed would depend on whether the publisher was "hot" or "cold". Considering the RESAMPLE strategy I'm assuming it's "hot" (stream of stock prices or metrics for example).

In that case I would expect both the Strategy.DROP and Strategy.RESAMPLE subscribers to call request(n) whenever they are interested (the RESAMPLE one as often as needed to do sampling) while SomePublisher() would drop data if the Subscriber has not asked (since it's hot and can't wait).

@benjchristensen
Copy link
Contributor Author

@smaldini

This has no issue running synchronously:

Agreed, and this is why I think we need to finish clarifying the discussion started by @rkuhn at #46 about async boundaries and ultimately how synchronous computations fit into this.

@benjchristensen
Copy link
Contributor Author

what about moving this logic into the Subscription rather than the Subscriber

The Subscription is provided by the Publisher so can not be influenced by the Subscriber.

Based on discussion at #41
@benjchristensen
Copy link
Contributor Author

I have updated based on feedback. I know there are things still to discuss and change, but can we merge this so that the README is in sync with the code and then let other pull requests and issues be used for further changes?

(Note that I'll be quiet for a few days starting in a few hours ... baby is expected tonight/tomorrow).

@smaldini
Copy link
Contributor

@benjchristensen

what about moving this logic into the Subscription rather than the Subscriber
The Subscription is provided by the Publisher so can not be influenced by the Subscriber.

Implementation driven subscription API ?

publisher.subscribe(subscriber, Subscription.DROP) 

Subscriber can implement their own strategy too, but having everything managed at subscription level just makes thing simples. At subscription time you are aware of buffer size, at request time you are aware of capacity and can signal. I don't think it makes a difference in the end if the subscriber or the subscription handle it but leaving this open allows libraries not on Actor model to optimize the message passing, e.g. through a RingBuffer implementation. ATM we have a duplicate queue (one for missing capacity in Subscription, one for Dispatching in Subscriber/reactor.rx.action.Action). I'm just trying to find simple middle ground solution but happy with consensus..

NB: Awesome news Ben ! Your wife, yourself and Ben 2.0 take care!

- The `Publisher.subscribe` method can be called as many times as wanted as long as it is with a different `Subscriber` each time. It is up to the `Publisher` whether underlying streams are shared or not. In other words, a `Publisher` can support multi-subscribe and then choose whether each `Subscription` is unicast or multicast.
- A `Publisher` can reject calls to its `subscribe` method if it is unable or unwilling to serve them (e.g. because it is overwhelmed or bounded by a finite number of underlying resources, etc...). It does this by calling `onError` on the `Subscriber` passed to `Publisher.subscribe` instead of calling `onSubscribe`".
- A `Publisher` should not throw an `Exception`. The only legal way to signal failure (or reject a `Subscription`) is via the `Subscriber.onError` method.
- The `Subscription.request` method must behave asynchronously (separate thread, event loop, trampoline, etc) so as to not cause a StackOverflow since `Subscriber.onNext` -> `Subscription.request` -> `Subscriber.onNext` can recurse infinitely. This allows a `Subscriber` to directly invoke `Subscription.request` and isolate the async responsibility to the `Subscription` instance which has responsibility for scheduling events.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change forecloses the decision in #46; I would strongly prefer to keep the current wording (requiring fully asynchronous behavior of all specified methods) until we have properly settled that point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wording of this README can and will change as we continue progressing.

This pull request adds far more detail than currently exists and brings us closer to agreement so I suggest merging this and moving forward instead of continuing to bike shed on details that should be modified after and holding up everything else.

@benjchristensen
Copy link
Contributor Author

Implementation driven subscription API

Problem I see with that is we can't possibly know or define all strategies up front, so we'd basically have to use an empty marker interface and it would be useless for interop, and therefore not very useful for this project.

@smaldini
Copy link
Contributor

We don't need to add that to the project, neither necessarily go the dropping subscriber route IMO. It should be up to the impl.

Sent from my iPhone

On 13 May 2014, at 05:04, Ben Christensen notifications@github.com wrote:

Implementation driven subscription API

Problem I see with that is we can't possibly know or define all strategies up front, so we'd basically have to use an empty marker interface and it would be useless for interop, and therefore not very useful for this project.


Reply to this email directly or view it on GitHub.

@benjchristensen
Copy link
Contributor Author

Replaced with #55 as a result of discussions in #46

@viktorklang viktorklang deleted the readme-contract branch December 26, 2014 14:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants