Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multicast Requirement #19

Closed
benjchristensen opened this issue Apr 15, 2014 · 89 comments
Closed

Multicast Requirement #19

benjchristensen opened this issue Apr 15, 2014 · 89 comments

Comments

@benjchristensen
Copy link
Contributor

Currently the spec states "A Publisher can serve multiple subscribers subscribed dynamically at various points in time. In the case of multiple subscribers the Publisher should respect the processing rates of all of its subscribers (possibly allowing for a bounded drift between them)."

I think this is a mistake to complicate the spec and require implementations to support multicasting and therefore management of subscriptions over time. In fact, I think it should expressly stick to unicast (new lifecycle per Subscription).

Multicasting techniques should be layered on top by libraries, not required of the Publisher instances themselves.

For example, each Subscriber could result in a new network connection, open file, etc. This would be a basic implementation.

In Rx this greatly simplifies things and is a good separation of concern. Multicasting can be added and done with different behaviors such as replaying all of history, a portion of history, the last value only, or ignoring history and just starting from now onwards, etc.

In other words, someone providing me a Publisher should not concern themselves with multiple subscriptions, how I want to multicast or other such things. If I subscribe it should start a new lifecycle, emit the data as I request it and stop when I unsubscribe.

This keeps the mental model clear, the Publisher implementations simple and allows libraries a simple contract to interface with.

@normanmaurer
Copy link
Member

I agree with @benjchristensen here :)

@rkuhn
Copy link
Member

rkuhn commented Apr 15, 2014

I fully agree with the part about not mandating multicast: it is a complication that does not pull its own weight (due to there being so many different useful variants that picking a default is too limiting).

The remaining question is whether or not it should be allowed to call subscribe twice on the same Publisher. The simplification in implementation only fully materializes if no multicast is required at all, meaning that a Publisher will only ever publish to at most one Subscriber. We have wording in the spec that defines the shut-down state as that which is entered after having been disposed by its downstream (currently called cancelation, but that is another discussion).

What you have in mind is the Rx way of the Subscription being the “active” entity in the sense of holding the state relevant between Observable and Observer, but I would prefer this spec to be more general and allowing the Publisher to be the active (asynchronous) entity.

Thinking about the user-visible contract I prefer very clear semantics:

  • if you get a Publisher, then you can subscribe to it exactly once
  • if you pass someone a Publisher, only do so for a “fresh” one (because the recipient can reasonably expect being able to subscribe to it)
  • if you need to attach multiple Subscribers, make that explicit with a suitable fan-out combinator

This behavior matches Rx practices as far as I can see, the difference is that in Rx the Subject and any other Observable are not distinguished in the type system. In this spec I would propose to reserve Publisher for single-output streams, have an API element that explicitly supports fan-out (à la Rx Subject) and keep the high-level combinators DSL outside the scope as we currently have (i.e. the transformation methods on Observable are library specific and this spec does not say anything about them or their semantics).

@viktorklang
Copy link
Contributor

👍 @rkuhn!

@benjchristensen
Copy link
Contributor Author

I think a Publisher should be capable of being subscribed to as many times as wanted, otherwise a library must have a "Publisher Factory" and the user can't pass a Publisher around safely as once consumed it couldn't be used again.

If a Publisher can be subscribed to many times, it is the only type needed by a library and can be passed around and subscribed to whenever and however often they want.

I would not add anything to this spec for fan-out, that's for libraries to do.

@rkuhn
Copy link
Member

rkuhn commented Apr 16, 2014

There is a relationship between subscribing to a Publisher more than once and fan-out, which divides Publishers into two categories:

  • If the Publisher just replays the same stream for all Subscribers independently, then there is no fan-out, and in effect you have one Publisher per Subscriber (i.e. the behavior is not distinguishable from such a design).
  • If the Publisher only has one (live) stream which is sent to its Subscriber, then having no fan-out logic prohibits having a second Subscriber—that would be fan-out by definition!

As I said above, the semantics must be clear: if I get a Publisher (e.g. as a method argument) then I must know what that means. Saying “it can be used multiple times unless it cannot” violates this constraint. In the second case above there cannot be reuse since the point of this discussion is that we agree that having a fan-out logic in every Publisher is a bad idea. This is a difference to Rx Observable because for Publisher we would have to define how to merge the back pressure streams when splitting the data streams (which we cannot do in a one size fits all fashion).

Therefore the only consistent choice is to disallow a Publisher from being reused.

Having a source of data which can support multiple sinks is perfectly reasonable, but I don’t think it should be covered by this specification—at least not in the first version. This means that e.g. Observable should have a “toPublisher” method which you can use to get a back pressure aware outlet that can be connected to any Reactive Streams Subscriber, and you would implement it such that you get a new one every time. Would that not solve this issue in a quite natural fashion?

@benjchristensen
Copy link
Contributor Author

Therefore the only consistent choice is to disallow a Publisher from being reused.

This just means we're forcing people to use a factory pattern for handing out Publisher instances rather than being able to use the Publisher directly. The Publisher should be the factory and each time subscribe is invoked it starts a new lifecycle. Multi-subscribe on a single Publisher is a far nicer API and contract.

Rx Observables work this way very elegantly. If I have an Observable anyone can subscribe to it as many times as they want and they'll each get whatever stream the Observable is going to send to it. I do not need to ship around an "Observable Factory" for someone to generate an Observable and then subscribe to it.

A Publisher should ALWAYS be capable of being subscribed to as many times as it wants. Each one though is independent. It is not multicast, it is unicast per subscription.

The ability to subscribe multiple times to a Publisher is not to be confused with multicast.

Multicast means it is sharing a stream. Ability to subscribe multiple times to a Publisher does not.

Multicast immediately means that a Publisher is required to maintain state and always share the stream, which is an incorrect thing to force or make every Publisher do.

Here are examples ...

1) Stock Stream - hot, always changing

Publisher: StockStream stream = new StockStream()
Consumer 1: stream.subscribe(subscriberA)
Consumer 2: stream.subscribe(subscriberB)

Two independent streams are flowing to subscriberA and subscriberB.
Each consumer starts receiving data from whenever it starts.

The Publisher is free to choose whether it has 2 separate network connections or 1.
Thus, it can do multicast internally if it wants, but it is not a requirement, nor is it expected to. Hot streams can always choose to do so though to conserve resources if they want.

2) Catalog Stream - cold, starts from beginning each time

Publisher: CatalogStream stream = new CatalogStream()
Consumer 1: stream.subscribe(subscriberA)
Consumer 2: stream.subscribe(subscriberB)

In this case the Publisher will establish a new network connection each time and start fresh.
Each consumer will receive all titles in the catalog from the beginning regardless of when they subscribe.

3) File Stream - cold, loads each time

Publisher: FileStream stream = new FileStream()
Consumer 1: stream.subscribe(subscriberA)
Consumer 2: stream.subscribe(subscriberB)

In this case the Publisher will most likely open and read the file each time and start fresh.
Each consumer will receive all bytes of the file regardless of when they subscribe.

Multicast vs Multiple Subscribe

A Publisher can be subscribed to multiple times and let each subscription be independent of each other (unicast). This trivial example demonstrates this:

public class AStreamExample implements Publisher<String> {

    public static void main(String[] args) {
        AStreamExample stream = new AStreamExample();

        MySubscriber s1 = new MySubscriber();
        MySubscriber s2 = new MySubscriber();
        stream.subscribe(s1);
        stream.subscribe(s2);
    }

    @Override
    public void subscribe(final Subscriber<String> subscriber) {
        // you wouldn't really implement it like this ... but you get the point
        new Thread(new Runnable() {

            @Override
            public void run() {
                System.out.println("Subscribed on thread: " + Thread.currentThread());
                BooleanSubscription s = new BooleanSubscription();
                subscriber.onSubscribe(s);
                // ignoring unsubscribe and requestMore behavior for this simple example
                subscriber.onNext("1");
                subscriber.onNext("2");
                subscriber.onComplete();
            }

        }).run();
    }

}

A Publisher is the factory. Each time subscribe is invoked it kicks off the work.

It is not required to multicast. In fact, it is assumed it will unicast. If a backing stream naturally multicasts (a hot stream of events that never stops) then it could choose as an implementation decision to do multicasting. This ties back to the 3 example use cases above with "hot" and "cold" sources.

If we didn't allow multi-subscribe, then the code would need to look like this:

    public static void main(String[] args) {
        MySubscriber s1 = new MySubscriber();
        MySubscriber s2 = new MySubscriber();
        new AStreamExample().subscribe(s1);
        new AStreamExample().subscribe(s2);
    }

This seems trivial in this case, but put it behind an API and it becomes more clear:

    public static void main(String[] args) {
        MySubscriber s1 = new MySubscriber();
        MySubscriber s2 = new MySubscriber();
        getStream().subscribe(s1);
        getStream().subscribe(s2);
    }

Do I have to call getStream() each time I subscribe? If so, why? The subscribe methods says I can subscribe so why not multiple times?

It gets worse once someone starts passing the stream around.

    public void doStuff(AStreamExample stream) {
        stream.subscribe();
        doMoreStuff(stream);
    }

    public void doMoreStuff(AStreamExample stream) {
        // can I subscribe? what if someone else already did?
        stream.subscribe();
    }

Perhaps my original statement was unclear, but I'm not saying a Publisher can't do multicast on a live stream, but that a Publisher should not be required to do multicast as it does not cater to all use cases. There are also many multicast use cases that are better suited to libraries (such as replay with its many different variants).

I suggest we remove the multicast requirement for a Publisher but require that subscribe be capable of being invoked multiple times. It is then left to the Publisher to decide whether it does multicast or unicast based on its use cases, resource usage, etc.

@smaldini smaldini reopened this Apr 17, 2014
@rkuhn
Copy link
Member

rkuhn commented Apr 17, 2014

@benjchristensen, thanks for your elaboration, now it is perfectly clear what we are talking about: you are equating Publisher with Observable, perhaps because of similarities in method names and also due to the existence of Subscription as a name shared between Rx and Reactive Streams.

To be very blunt and concise: these names have misled you, and I am very sorry about that.

The Publisher/Subscriber pair is designed only for one purpose: to transport exactly one stream of elements across an asynchronous boundary (without unbounded buffering etc.). Therefore a Publisher delivers exactly one stream, which is why multiple subscriptions and multicast are identical in this case.

Thinking about it some more, it would probably have been better to leave out the Subscription altogether—we had it since we started out from the position of building in multicast as a convenience feature that we have come to realize is detrimental—which would lead us to equate Publisher with an Rx Subscription, leaving the details of what you want to publish out of the scope of this specification.

What you want is to have an easy path towards using Reactive Streams as a way to express remote Observables, and what we want is to use it as a way to push data to an Actor without blowing up message queues, and as far as I know everyone agrees on the underlying basic problem set. I’ll try out what the above proposal would look like, and if it makes sense (i.e. passes my plausibility test) then we can continue the discussion on the resulting pull request.

Again, I’m sorry that it took so long for me to realize what the root cause of the misunderstanding was.

@benjchristensen
Copy link
Contributor Author

Roland, this is not about Observables (though that heavily influences my perspective as I've spent almost 2 years building "reactive stream" applications in production).

Let me equate it with Iterable/Iterator instead so there is no chance of conflict with Observable.

In code, I can pass an Iterable around to anyone and anything can "subscribe" to it via a new Iterator.

Since it's pull based, it creates the Iterator for me on a getIterator() method or something along those lines.

The getIterator() method can be called n times and every time will get a clean "subscription" to iterating the Iterable.

Now we flip to a Publisher/Subscriber. Since it's push, I provide the Subscriber instead of calling getSubscriber(). It is via the Publisher.subscribe(Subscriber s) that I start the push-based iteration (or streaming).

The point I'm making is two-fold:

  1. The Publisher should be usable like an Iterable so that developers can pass it around and allow anyone to subscribe to it whenever and however many times they want. Otherwise there ends up needing to be a 3rd component, a PublisherFactory that would be used like PublisherFactory.getPublisher().subscribe(Subscriber s).

  2. Multicast is an option, not a requirement that is up to the Publisher to decide upon based on what they're streaming.

Therefore, please put aside comparisons to or use cases of Actors and Observables and consider this being an interface in the JDK for decades to come that everyone will implement. In that case, it will no longer be used purely for interop (as it will be now since it's a 3rd-party dependency). If it becomes part of the JDK it will be implemented directly by classes and be the common interface, like Iterable, and should therefore be capable of being used like an Iterable without a factory being required.

Code such as the following should be safe to write:

doStuff() {
  Publisher p = new PublisherImpl();
  doMoreStuff(p)
  p.subscribe(subscriber);
}

doMoreStuff(Publisher p) {
  p.subscribe(subscriber);
}

If we don't allow multi-subscribe behavior this will instead need to be written with a 3rd level such as:

doStuff() {
  PublisherFactory p = new PublisherFactoryImpl();
  doMoreStuff(p)
  p.getPublisher().subscribe(subscriber);
}

doMoreStuff(PublisherFactory p) {
  p.getPublisher().subscribe(subscriber);
}

@jrudolph
Copy link

@benjchristensen What about streams that can naturally only be used once? Like a stream of network data for an already opened network connection? Given your suggestion would it be possible to get a Publisher for a stream like that?

IMO either, you would have to allow that this Producer would instantly call onError for all subsequent subscriptions after the first, or you need some buffering logic / fan-out logic that allows consumers to join the game later.

@benjchristensen
Copy link
Contributor Author

It's up to the Publisher whether it wants to reuse the underlying network connection or not.

See the 3 use cases I listed above at #19 (comment)

@jrudolph
Copy link

Yes, I've seen them. That's why I ask. I don't think an open connection fits into any of those three categories. An already open network connection (similar to an already generated Iterator) has very different backpressure needs than something that publishes events. "Reuse" is the thing that doesn't work easily because the backpressure demand of all possible consumers have to be coordinated somehow.

So, that's the question: can an open network connection (an iterator, an InputStream) be a Publisher in your view (in Rx)?

@benjchristensen
Copy link
Contributor Author

There are two cases where an open network stream applies.

  1. Hot

Assume a "hot" stream where back pressure is not relevant, it's stock prices, mouse events, system metrics or some other thing that continues to fire at you. You can't say "stop", all you can say is "I can't take any more". In that case it's up to the developer to choose whether they want to drop, sample, debounce, buffer (rarely useful in this case). The "requestMore" behavior from a Subscriber can be translated into one of these mechanisms, but that's up to each implementation and the developer to choose.

In a "hot" case like this, a Publisher can safely reuse the network connection because it's going to stream the exact same data whether a single connection or multiple connections are open. There is no buffering, no state per Subscriber, just a flow of data. It is transparent to the Subscriber whether there are network connections being shared or not, it's just a hot stream of data that they can throttle on if they wish. Each Subscription will manage the requestMore behavior independently of each other, and each one will just be doing throttling of some flavor, the Publisher just keeps emitting the data.

Summary:

  • single Publisher
  • multiple Subscriber instances each with their own Subscription
  • single network connection (probably only open if there is >0 subscribers)
  1. Cold

A "cold" source is very different. In this case the Publisher would need to create a new network connection for each Subscription because "requestMore" is now meaningful.

Each Subscriber will request however much it can handle and the state of data flowing is one-to-one between the underlying resource. The single Publisher can be subscribed to multiple times, but each time a new Subscription is created it will represent a separate network connection that only serves that Subscriber/Subscription pair.

Summary:

  • single Publisher
  • multiple Subscriber instances each with their own Subscription
  • network connection per subscription

In short, the Publisher is the factory that returns a Subscription to the Subscriber that manages the correct state and behavior (via the "requestMore" invocations) depending on what fits the use case.

@alexandru
Copy link

I think Publisher here is the equivalent of the Iterator, whereas Iterable.getIterator() is Producer.getPublisher(), is it not?

I agree with @benjchristensen, as from the subscriber's point of view it shouldn't matter if a Publisher is hot or cold. I like this in Rx Observables.

@benjchristensen
Copy link
Contributor Author

Related to this issue is that I think we should be using the Publisher/Subscriber/Subscription types only (https://github.com/reactive-streams/reactive-streams/tree/master/spi/src/main/java/org/reactivestreams/spi) and not have (Producer/Consumer) => #25

The getPublisher() thing is misleading in its similarity to getIterator(), as that's not how a push based interface behaves.

@viktorklang
Copy link
Contributor

@benjchristensen Well, technically the method is called iterator() (the one that creates a new Iterator for each incovation). I'd expect the general assumption in Java to be that getters don't create new things.
Nevertheless, we should strive to minimize the surface area of the spec/spi.

@jbrisbin
Copy link

I think I'm identifying more with @benjchristensen 's comments here. I like the example of iterator() (no "get" ;)) to illustrate the point.

It also seems to me that we might benefit from a hasMore(int num) or equivalent. From hard experience we've found that iteration in high-volume code can be vastly more efficient if you first make sure a collection isn't empty. It will often be the case that a Publisher will be able to answer a hasMore query more efficiently than by assuming work will be done or attempting to do work when none is available.

One might argue that simply attempting to do work and finding out after the fact that none was done would be sufficient but in many cases that won't work. e.g. when doing least-busy kinds of routing.

At the very least we need a way to determine if no work was done so we can decide to do something about it. Even if it's as simple as requestMore(int) returning boolean.

@benjchristensen
Copy link
Contributor Author

Well, technically the method is called iterator()

Whatever it's called ... here is the implementation of ArrayList where a new Iterator is created each time, exactly as I would expect Publisher.subscribe to do:

    public Iterator<E> iterator() {
        return new Itr();
    }

@viktorklang
Copy link
Contributor

@benjchristensen The name is important (otherwise we have had waaay too many naming discussions already, don't you agree?!) My point was that "nobody" would expect a getter to return a new instance each time. And it seems like we agree there?

Whatever it's called ... here is the implementation of ArrayList where a new Iterator is created each time, >exactly as I would expect Publisher.subscribe to do:

Publisher.subscribe already creates a new Subscription (well, unless the Subscriber was already subscribed) for each subscribe. However, as a Subscriber you simply don't know which elements you'll get (if any!), you'll get the ones the Publisher actually already has, can request from someone else, or can create. Right?

@rkuhn
Copy link
Member

rkuhn commented Apr 17, 2014

@jbrisbin The analogy you are making goes in the wrong direction: the Subscriber does not query the Publisher, it just signals capacity to receive more elements (or “demand”, if we agree on the meaning à la “demand in the market”). A query would be synchronous; if not on the type level then on a semantic level.

@rkuhn
Copy link
Member

rkuhn commented Apr 17, 2014

@benjchristensen We are still discussing about different things, I think: the prime problem we want to solve is how to pass data across an asynchronous boundary with non-blocking back pressure. Since this is a bidirectional endeavor, allowing multiple participants on one side of the fence by inversion means allowing multiple participants on the other as well. Splitting a stream means merging the back pressure, merging streams means splitting the back pressure.

Now, I think it is entirely reasonable to ask the question of “how do I obtain a source for such asynchronous exchange”, but that question is outside the scope of the initial problem. We should first solve the case of connecting both ends of this async pipe.

The reason why I persevere in this case is that, plainly speaking, not every iterator is born of an iterable, and your answer of “just drop data” in the other case (which you called “hot”) is by all means not the only conceivable one. I might well have an incoming TCP stream which does support back pressure at its source, but which I want to consume in parallel in multiple places. There are many strategies of how to connect multiple Subscribers to a logical “Source”, wherefore it just places useless burden on all Publishers if they must implement one of two specific choices.

So, coming back for a second try: what does not work for you with the following proposal?

trait Publisher[T] { // corresponds to Rx Subscription with back pressure
  def subscribe(_: Subscriber[T]): Unit
  def signalAdditionalDemand(N: Int): Unit
  def cancel(): Unit
}
trait Subscriber[T] {
  def onSubscribe(_: Publisher[T]): Unit
  def onNext(_: T): Unit
  def onError(_: Throwable): Unit
  def onComplete(): Unit
}
// and that would be ALL, nothing else in Reactive Streams

(remark: in this case we will want to change the names because PubSub is not 1:1, but that is a discussion to be had afterwards)

@benjchristensen
Copy link
Contributor Author

coming back for a second try: what does not work for you with the following proposal?

I really like how simple and clean this is and it also solves #25.

It does however require a 3rd factory type to generate Publisher since the Publisher object now has the state of signalAdditionalDemand and cancel and thus needs a new instance each time.

I would suggest this change that makes Publisher the factory and moves the state to Subscription:

public interface Publisher<T> {
    public void subscribe(Subscriber<T> s);
}

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onCompleted();
}

public interface Subscription {
    public void signalAdditionalDemand(int n);
    public void cancel();
}

I'm still not 100% convinced the onSubscribe method is correct, as it requires some awkward handling ... but this is close and works.

It allows the Publisher to be the type that gets passed around in code (the factory) and combines SPI/API into a 3 types that are clear in what they do.

Here is code using those 3 types:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class Test {

    public static void main(String... args) throws InterruptedException {
        Publisher<Integer> dataStream = getData();

        dataStream.subscribe(new MySubscriber("A"));
        Thread.sleep(750);
        dataStream.subscribe(new MySubscriber("B"));
    }

    static Publisher<Integer> getData() {
        return new MyDataPublisher();
    }

    static class MyDataPublisher implements Publisher<Integer> {

        @Override
        public void subscribe(final Subscriber<Integer> s) {

            AtomicInteger i = new AtomicInteger();

            Subscription subscription = new Subscription() {

                AtomicInteger capacity = new AtomicInteger();

                @Override
                public void signalAdditionalDemand(int n) {
                    System.out.println("signalAdditionalDemand => " + n);
                    if (capacity.getAndAdd(n) == 0) {
                        // start sending again if it wasn't already running
                        send();
                    }
                }

                private void send() {
                    System.out.println("send => " + capacity.get());
                    // this would normally use an eventloop, actor, whatever
                    new Thread(() -> {
                        do {
                            s.onNext(i.incrementAndGet());
                        } while (capacity.decrementAndGet() > 0);
                    }).start();
                }

                @Override
                public void cancel() {
                    capacity.set(-1);
                }

            };

            s.onSubscribe(subscription);

        }

    }

    static class MySubscriber implements Subscriber<Integer> {

        final int BUFFER_SIZE = 10;
        private final ArrayBlockingQueue<Integer> buffer = new ArrayBlockingQueue<>(BUFFER_SIZE);
        private volatile boolean terminated = false;
        private final String token;

        MySubscriber(String token) {
            this.token = token;
        }

        @Override
        public void onSubscribe(Subscription s) {
            System.out.println("onSubscribe => request " + BUFFER_SIZE);
            s.signalAdditionalDemand(BUFFER_SIZE);
            startAsyncWork(s);
        }

        @Override
        public void onNext(Integer t) {
            buffer.add(t);
        }

        @Override
        public void onError(Throwable t) {
            terminated = true;
            throw new RuntimeException(t);
        }

        @Override
        public void onCompleted() {
            terminated = true;
        }

        private void startAsyncWork(Subscription s) {
            System.out.println("**** Start new worker thread");
            /* don't write real code like this! just for quick demo */
            new Thread(() -> {
                while (!terminated) {
                    Integer v = buffer.poll();
                    try {
                        Thread.sleep(100);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    if (buffer.size() < 3) {
                        s.signalAdditionalDemand(BUFFER_SIZE - buffer.size());
                    }
                    if (v != null) {
                        System.out.println(token + " => Did stuff with v: " + v);
                    }
                }
            }).start();
        }

    }

}

@benjchristensen
Copy link
Contributor Author

The output of the above code is:

onSubscribe => request 10
signalAdditionalDemand => 10
send => 10
**** Start new worker thread
A => Did stuff with v: 1
A => Did stuff with v: 2
A => Did stuff with v: 3
A => Did stuff with v: 4
A => Did stuff with v: 5
A => Did stuff with v: 6
A => Did stuff with v: 7
onSubscribe => request 10
signalAdditionalDemand => 10
send => 10
**** Start new worker thread
signalAdditionalDemand => 8
send => 8
A => Did stuff with v: 8
B => Did stuff with v: 1
A => Did stuff with v: 9
B => Did stuff with v: 2
A => Did stuff with v: 10
B => Did stuff with v: 3
A => Did stuff with v: 11
B => Did stuff with v: 4
A => Did stuff with v: 12
B => Did stuff with v: 5
A => Did stuff with v: 13
B => Did stuff with v: 6
A => Did stuff with v: 14
B => Did stuff with v: 7
A => Did stuff with v: 15
signalAdditionalDemand => 8
send => 8
B => Did stuff with v: 8
signalAdditionalDemand => 8
send => 8
A => Did stuff with v: 16
B => Did stuff with v: 9
A => Did stuff with v: 17
B => Did stuff with v: 10
A => Did stuff with v: 18
B => Did stuff with v: 11
A => Did stuff with v: 19
B => Did stuff with v: 12
A => Did stuff with v: 20
B => Did stuff with v: 13
A => Did stuff with v: 21
B => Did stuff with v: 14
A => Did stuff with v: 22
B => Did stuff with v: 15
A => Did stuff with v: 23
signalAdditionalDemand => 8
send => 8
B => Did stuff with v: 16
signalAdditionalDemand => 8
send => 8
A => Did stuff with v: 24
B => Did stuff with v: 17
A => Did stuff with v: 25
B => Did stuff with v: 18
A => Did stuff with v: 26
B => Did stuff with v: 19
A => Did stuff with v: 27
B => Did stuff with v: 20
A => Did stuff with v: 28
B => Did stuff with v: 21
A => Did stuff with v: 29
B => Did stuff with v: 22
A => Did stuff with v: 30
B => Did stuff with v: 23
A => Did stuff with v: 31
signalAdditionalDemand => 8
send => 8
B => Did stuff with v: 24
signalAdditionalDemand => 8
send => 8
A => Did stuff with v: 32
B => Did stuff with v: 25
A => Did stuff with v: 33
B => Did stuff with v: 26
A => Did stuff with v: 34
B => Did stuff with v: 27
A => Did stuff with v: 35
B => Did stuff with v: 28
A => Did stuff with v: 36
B => Did stuff with v: 29
A => Did stuff with v: 37
B => Did stuff with v: 30
A => Did stuff with v: 38
B => Did stuff with v: 31
A => Did stuff with v: 39
signalAdditionalDemand => 8
send => 8
B => Did stuff with v: 32
signalAdditionalDemand => 8
send => 8
A => Did stuff with v: 40
B => Did stuff with v: 33

@jbrisbin
Copy link

I do like how this makes things clean and simple as well.

I'm playing with a reactive Buffer so I'll be trying this out @benjchristensen Is this pushed to your fork yet or somehow otherwise available for me to test against?

/cc @smaldini

@jbrisbin
Copy link

Not a fan of signalAdditionalDemand but it's certainly clear in intent and self-documenting...

@benjchristensen
Copy link
Contributor Author

Is this pushed to your fork yet or somehow otherwise available for me to test against?

No, what is pasted above is the entirety of the code. I purposefully did not make anything depend on RxJava so as to keep this completely separate and clean.

Not a fan of signalAdditionalDemand but it's certainly clear in intent and self-documenting...

Yes, I'm not sold yet on that or the onSubscribe callback, but not ready to bikeshed on those as I can't yet provide a better solution :-)

Going to followup in a few minutes with documented APIs that communicate the contract.

@benjchristensen
Copy link
Contributor Author

With poorly written docs to try and communicate the contracts:

public interface Publisher<T> {

    /**
     * Request {@link Subscription} to start streaming data.
     * <p>
     * This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}.
     * <p>
     * Each {@link Subscription} will work for only a single {@link Subscriber}.
     * <p>
     * A {@link Subscriber} should only subscribe once to a single {@link Publisher}.
     * 
     * @param s
     */
    public void subscribe(Subscriber<T> s);
}


/**
 * Will receive call to {@link #onSubscribe(Subscription)} once after passing an instance of {@link Subscriber} to {@link Publisher#subscribe(Subscriber)}.
 * <p>
 * No further notifications will be received until {@link Subscription#signalAdditionalDemand(int)} is called.
 * <p>
 * After signaling demand:
 * <ul>
 * <li>One or more invocations of {@link #onNext(Object)} up to the maximum number defined by {@link Subscription#signalAdditionalDemand(int)}</li>
 * <li>Single invocation of {@link #onError(Throwable)} or {@link #onCompleted()} which signals a terminal state after which no further events will be sent.
 * </ul>
 * <p>
 * Demand can be signalled via {@link Subscription#signalAdditionalDemand(int)} whenever the {@link Subscriber} instance is capable of handling more.
 *
 * @param <T>
 */
public interface Subscriber<T> {
    /**
     * Invoked after calling {@link Publisher#subscribe(Subscriber)}.
     * <p>
     * No data will start flowing until {@link Subscription#signalAdditionalDemand(int)} is invoked.
     * <p>
     * It is the resonsibility of this {@link Subscriber} instance to call {@link Subscription#signalAdditionalDemand(int)} whenever more data is wanted.
     * <p>
     * The {@link Publisher} will send notifications only in response to {@link Subscription#signalAdditionalDemand(int)}.
     * 
     * @param s
     *            {@link Subscription} that allows requesting data via {@link Subscription#signalAdditionalDemand(int)}
     */
    public void onSubscribe(Subscription s);

    /**
     * Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#signalAdditionalDemand(int)}.
     * 
     * @param t
     */
    public void onNext(T t);

    /**
     * Failed terminal state.
     * <p>
     * No further events will be sent even if {@link Subscription#signalAdditionalDemand(int)} is invoked again.
     * 
     * @param t
     */
    public void onError(Throwable t);

    /**
     * Successful terminal state.
     * <p>
     * No further events will be sent even if {@link Subscription#signalAdditionalDemand(int)} is invoked again.
     */
    public void onCompleted();
}

/**
 * A {@link Subscription} represents a one-to-one lifecycle of a {@link Subscriber} subscribing to a {@link Publisher}.
 * <p>
 * It can only be used once by a single {@link Subscriber}.
 * <p>
 * It is used to both signal desire for data and cancel demand (and allow resource cleanup).
 *
 */
public interface Subscription {
    /**
     * No events will be sent by a {@link Publisher} until demand is signalled via this method.
     * <p>
     * It can be called however often and whenever needed.
     * <p>
     * Whatever has been signalled can be sent by the {@link Publisher} so only signal demand for what can be safely handled.
     * 
     * @param n
     */
    public void signalAdditionalDemand(int n);

    /**
     * Request the {@link Publisher} to stop sending data and clean up resources.
     * <p>
     * Data may still be sent to meet previously signalled demand after calling cancel as this request is asynchronous.
     */
    public void cancel();
}

@DougLea
Copy link
Contributor

DougLea commented Apr 18, 2014

Ben, this is starting to look a lot like TCP windowing.
Have you considered going all the way?

subscribe takes a window size argument that is used to create/size a buffer
ack() replaces signalAdditionalDemand to tell the publisher that one more item has been processed so it can slide/advance window. Optionally support ack(int n) to allow bulk advance.

@benjchristensen
Copy link
Contributor Author

I have submitted a pull request that I propose we merge to end this particular thread of discussion: #37

@normanmaurer
Copy link
Member

@benjchristensen damn it ... it seems like I deleted your comment (summary) by mistake because clicking on the wrong button :( Can someone just copy and paste it from the emails ? Sorry for the mess... It seems like I can not "revert" my change .

@normanmaurer
Copy link
Member

Anyway... I really like what @benjchristensen proposed in his summary and I think it makes the contract quite clear. So I would be quite happy to have the proposal merged in.

I also like the naming that @mariusaeriksen proposed here, but as the @benjchristensen pointed out it may be better to think about names after we agree on something.

Thanks again to all of you for the hard work on this as I could not keep up with all of it over the last weeks :(

@viktorklang
Copy link
Contributor

Roland is mid air afaik and will reply as soon as he can.

@jbrisbin
Copy link

I'm actually not feeling the warm fuzzies about the alternative naming above. To me it reads very IO-oriented which I don't think fits every use case. Although I caveat that by saying: calling things "sources" and "sinks" does have its advantages and is the standardized terminology used in Spring XD. I'm flexible.

But I definitely vote for merging the simplification changes (and Gradle build? :D) @benjchristensen proposed so we can get to work using it.

@tmontgomery
Copy link

Looking over the summary that @benjchristensen made (before it disappeared), it seems to hang together quite well. There are some niggly bits with the terms, but the behavior works, I think.

It would be good to put the summary back up, if possible.

listen is a better term for anything push related. With associations for listener semantics.

@jrudolph
Copy link

Here's a resurrection of @benjchristensen's comment from github's email:

Since no responses over the weekend and this thread is very long and hard to read, I'd like to summarize and ask for folks to weigh in. The proposal is as follows:

Contract

  • Subscriber can be used once-and-only-once to subscribe to a Publisher.
  • a Subscription can be used once-and-only-once to represent a subscription by a Subscriber to a Publisher.
  • The Publisher.subscribe method can be called as many times as wanted as long as it is with a different Subscriber each time. It is up to the Publisher whether underlying streams are shared or not.
  • A Publisher can refuse subscriptions (calls to subscribe) if it is unable or unwilling to serve them (overwhelmed, fronting a single-use data sources, etc) and can do so by immediately calling Subscriber.onError on the Subscriber instance calling subscribe.
  • Events sent to a Subscriber can only be sent sequentially (no concurrent notifications).
  • Once an onComplete or onError is sent, no further events can be sent.
  • Once a Subscription is cancelled, the Publisher will stop sending events as soon as it can.
  • A Publisher will never send more onNext events than have been requested via the Subscription.request/signalDemand method.

Types

Naming of classes and methods are not part of what is being discussed here. That can be argued over after we agree upon behavior :-)

package org.reactivestreams;

public interface Publisher<T> {

    /**
     * Request {@link Subscription} to start streaming data.
     * <p>
     * This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}.
     * <p>
     * Each {@link Subscription} will work for only a single {@link Subscriber}.
     * <p>
     * A {@link Subscriber} should only subscribe once to a single {@link Publisher}.
     * 
     * @param s
     */
    public void subscribe(Subscriber<T> s);
}
package org.reactivestreams;

/**
 * A {@link Subscription} represents a one-to-one lifecycle of a {@link Subscriber} subscribing to a {@link Publisher}.
 * <p>
 * It can only be used once by a single {@link Subscriber}.
 * <p>
 * It is used to both signal desire for data and cancel demand (and allow resource cleanup).
 *
 */
public interface Subscription {
    /**
     * No events will be sent by a {@link Publisher} until demand is signalled via this method.
     * <p>
     * It can be called however often and whenever needed.
     * <p>
     * Whatever has been signalled can be sent by the {@link Publisher} so only signal demand for what can be safely handled.
     * 
     * @param n
     */
    public void signalAdditionalDemand(int n);

    /**
     * Request the {@link Publisher} to stop sending data and clean up resources.
     * <p>
     * Data may still be sent to meet previously signalled demand after calling cancel as this request is asynchronous.
     */
    public void cancel();
}
package org.reactivestreams;

/**
 * Will receive call to {@link #onSubscribe(Subscription)} once after passing an instance of {@link Subscriber} to {@link Publisher#subscribe(Subscriber)}.
 * <p>
 * No further notifications will be received until {@link Subscription#signalAdditionalDemand(int)} is called.
 * <p>
 * After signaling demand:
 * <ul>
 * <li>One or more invocations of {@link #onNext(Object)} up to the maximum number defined by {@link Subscription#signalAdditionalDemand(int)}</li>
 * <li>Single invocation of {@link #onError(Throwable)} or {@link #onCompleted()} which signals a terminal state after which no further events will be sent.
 * </ul>
 * <p>
 * Demand can be signalled via {@link Subscription#signalAdditionalDemand(int)} whenever the {@link Subscriber} instance is capable of handling more.
 *
 * @param <T>
 */
public interface Subscriber<T> {
    /**
     * Invoked after calling {@link Publisher#subscribe(Subscriber)}.
     * <p>
     * No data will start flowing until {@link Subscription#signalAdditionalDemand(int)} is invoked.
     * <p>
     * It is the resonsibility of this {@link Subscriber} instance to call {@link Subscription#signalAdditionalDemand(int)} whenever more data is wanted.
     * <p>
     * The {@link Publisher} will send notifications only in response to {@link Subscription#signalAdditionalDemand(int)}.
     * 
     * @param s
     *            {@link Subscription} that allows requesting data via {@link Subscription#signalAdditionalDemand(int)}
     */
    public void onSubscribe(Subscription s);

    /**
     * Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#signalAdditionalDemand(int)}.
     * 
     * @param t
     */
    public void onNext(T t);

    /**
     * Failed terminal state.
     * <p>
     * No further events will be sent even if {@link Subscription#signalAdditionalDemand(int)} is invoked again.
     * 
     * @param t
     */
    public void onError(Throwable t);

    /**
     * Successful terminal state.
     * <p>
     * No further events will be sent even if {@link Subscription#signalAdditionalDemand(int)} is invoked again.
     */
    public void onCompleted();
}

@normanmaurer
Copy link
Member

Thx

Am 22.04.2014 um 18:06 schrieb Johannes Rudolph notifications@github.com:

Here's a resurrection of @benjchristensen's comment from github's email:

Since no responses over the weekend and this thread is very long and hard to read, I'd like to summarize and ask for folks to weigh in. The proposal is as follows:

Contract

Subscriber can be used once-and-only-once to subscribe to a Publisher.
a Subscription can be used once-and-only-once to represent a subscription by a Subscriber to a Publisher.
The Publisher.subscribe method can be called as many times as wanted as long as it is with a different Subscriber each time. It is up to the Publisher whether underlying streams are shared or not.
A Publisher can refuse subscriptions (calls to subscribe) if it is unable or unwilling to serve them (overwhelmed, fronting a single-use data sources, etc) and can do so by immediately calling Subscriber.onError on the Subscriber instance calling subscribe.
Events sent to a Subscriber can only be sent sequentially (no concurrent notifications).
Once an onComplete or onError is sent, no further events can be sent.
Once a Subscription is cancelled, the Publisher will stop sending events as soon as it can.
A Publisher will never send more onNext events than have been requested via the Subscription.request/signalDemand method.
Types

Naming of classes and methods are not part of what is being discussed here. That can be argued over after we agree upon behavior :-)

package org.reactivestreams;

public interface Publisher {

/**
 * Request {@link Subscription} to start streaming data.
 * <p>
 * This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}.
 * <p>
 * Each {@link Subscription} will work for only a single {@link Subscriber}.
 * <p>
 * A {@link Subscriber} should only subscribe once to a single {@link Publisher}.
 * 
 * @param s
 */
public void subscribe(Subscriber<T> s);

}
package org.reactivestreams;

/**

  • A {@link Subscription} represents a one-to-one lifecycle of a {@link Subscriber} subscribing to a {@link Publisher}.

  • It can only be used once by a single {@link Subscriber}.

  • It is used to both signal desire for data and cancel demand (and allow resource cleanup).
    *
    /
    public interface Subscription {
    /
    *

    • No events will be sent by a {@link Publisher} until demand is signalled via this method.
    • It can be called however often and whenever needed.
    • Whatever has been signalled can be sent by the {@link Publisher} so only signal demand for what can be safely handled.
    • @param n
      */
      public void signalAdditionalDemand(int n);

    /**

    • Request the {@link Publisher} to stop sending data and clean up resources.
    • Data may still be sent to meet previously signalled demand after calling cancel as this request is asynchronous.
      */
      public void cancel();
      }
      package org.reactivestreams;

/**

  • Will receive call to {@link #onSubscribe(Subscription)} once after passing an instance of {@link Subscriber} to {@link Publisher#subscribe(Subscriber)}.

  • No further notifications will be received until {@link Subscription#signalAdditionalDemand(int)} is called.

  • After signaling demand:

    • One or more invocations of {@link #onNext(Object)} up to the maximum number defined by {@link Subscription#signalAdditionalDemand(int)}
    • Single invocation of {@link #onError(Throwable)} or {@link #onCompleted()} which signals a terminal state after which no further events will be sent.
  • Demand can be signalled via {@link Subscription#signalAdditionalDemand(int)} whenever the {@link Subscriber} instance is capable of handling more.
    *

  • @param
    /
    public interface Subscriber {
    /
    *

    • Invoked after calling {@link Publisher#subscribe(Subscriber)}.
    • No data will start flowing until {@link Subscription#signalAdditionalDemand(int)} is invoked.
    • It is the resonsibility of this {@link Subscriber} instance to call {@link Subscription#signalAdditionalDemand(int)} whenever more data is wanted.
    • The {@link Publisher} will send notifications only in response to {@link Subscription#signalAdditionalDemand(int)}.
    • @param s
    •        {@link Subscription} that allows requesting data via {@link Subscription#signalAdditionalDemand(int)}
      
      */
      public void onSubscribe(Subscription s);

    /**

    • Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#signalAdditionalDemand(int)}.
    • @param t
      */
      public void onNext(T t);

    /**

    • Failed terminal state.
    • No further events will be sent even if {@link Subscription#signalAdditionalDemand(int)} is invoked again.
    • @param t
      */
      public void onError(Throwable t);

    /**

    • Successful terminal state.
    • No further events will be sent even if {@link Subscription#signalAdditionalDemand(int)} is invoked again.
      */
      public void onCompleted();
      }

      Reply to this email directly or view it on GitHub.

@benjchristensen
Copy link
Contributor Author

Thanks for your feedback @tmontgomery @jbrisbin @mariusaeriksen and @normanmaurer

Definitely there are things to continue debating (naming, etc). If you all agree (including @rkuhn once he's available again), I suggest we merge #37 and start new issues to discuss the next round of topics that build on top of what we've agreed upon so far.

@benjchristensen
Copy link
Contributor Author

The pull request (#37) has progressed as a result of discussion between @rkuhn and I. If any of you have the time, the README is worthy of review and feedback by more than just the two of us.

@alexandru
Copy link

"Events sent to a Subscriber can only be sent sequentially (no concurrent notifications).

What does that mean? In an asynchronous context, you cannot guarantee that events won't reach the Subscriber concurrently, unless the Subscriber applies back-pressure by means of subscription.request(1) (e.g. acknowledgement), this being one of the problems that back-pressure is meant to solve. I also thought that this subscription.requestMore(n) as means for back-pressure was chosen precisely to allow concurrent onNext events, with the subscriber being responsible for synchronization.

"Calls from a Subscriber to Subscription such as Subscription.request(int n) must be dispatched asynchronously (separate thread, event loop, trampoline, etc) so as to not cause a StackOverflow since Subscriber.onNext -> Subscription.request -> Subscriber.onNext can recurse infinitely."

While I understand the need for this, maybe it's the Publisher that should ensure that dispatching the next onNext events following a subscription.request(n) happen asynchronously, because the Subscriber's implementation is more user-facing than the Publisher. Or maybe I don't understand the implications of that, just saying. Either way, this explanation was needed in the description, thanks for adding it.

Subscriber controlled queue bounds

What this section is basically saying is that the Publisher should respect subscription.request(n) and in case the source produces more data than the Subscriber can handle, then the Publisher must decide what to do with it. Is this right?

@alexandru
Copy link

Also related to this ... what happens if we have a Subscriber that an onNext like:

public void onNext(T elem) {
   process(elem);
   dispatchRequestEvent(100);
}

I don't see anything in the contract of the Subscriber that disallows this and things get more complicated if the Publisher is allowed to send onNext events following a subscription.request(n) event synchronously. Granted, we go back to that mention about the Publisher that should send events sequentially, which makes sense in this light. But it would be good for the Subscriber's contract if you could rely on the call to subscription.request(n) to not produce undesired effects ... like stack or buffer overflows, because the Subscriber is more user-facing and maybe the safe handling of the subscription.request(n) event should be the Publisher's responsibility.

TL;DR, I'd like to call subscription.request(n) directly from Subscriber.onNext :-) I.e. code like this makes a lot of sense to me:

public void onNext(T elem) {
   if (isStillValid(elem)) {
     process(elem);
     subscription.request(1); // acknowlegement, next please
  }
  else
     subscription.cancel(); // no longer interested, stop please
}

rkuhn added a commit that referenced this issue Apr 24, 2014
Squash of #37
Work as a result of discussion in #19
Removes TCK implementation so it is not out of sync with API as per #39
benjchristensen added a commit that referenced this issue Apr 24, 2014
Subset of discussion at #37
Work as a result of discussion in #19
@benjchristensen
Copy link
Contributor Author

The discussion for this is now at #41 where the README is being revised.

@rkuhn
Copy link
Member

rkuhn commented May 5, 2014

@alexandru please refer to #46 for further discussion on the asynchronous semantics of the interfaces.

Since this has been split up into multiple topics, and in the light of @benjchristensen’s latest comment, I therefore close this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests