Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Polyglot Support #45

Closed
benjchristensen opened this Issue · 83 comments
@benjchristensen

I suggest expanding this initiative beyond the JVM since most of us need our data streams and systems to interact over network boundaries with other languages.

Thus, it seems it's actually more important to define the protocol and contract and then allow each language platform to define the interfaces that meet it.

Perhaps an approach to this is breaking out into multiple sub projects such as:

  • /reactive-streams/specification (purely for contract definition)
  • /reactive-streams/reactive-streams-jvm
  • /reactive-streams/reactive-streams-dotnet
  • /reactive-streams/reactive-streams-javascript
  • /reactive-streams/reactive-streams-websockets
  • /reactive-streams/reactive-streams-tcp
  • /reactive-streams/reactive-streams-udp

Even if the focus in the short-term remains on the JVM interface design, we would gain a lot by including communities such as Javascript/Node.js, Erlang, .Net, banking and financial trading (who have been doing high performance messaging for decades). It would also make the model far more useful as we could then consume a reactive stream from Javascript in a browser via WebSockets to powered by Netty or Node.js receiving data from Rx/Akka/Reactor/whatever and it would "just work".

@jbrisbin
Collaborator

I'm currently working on integrating Java 8's Nashorn Javascript engine with Reactor and this idea of extending Reactive Streams into other languages is very much on my mind.

There's the possibility of providing Reactive Streams--the API--in Javascript for use in server-side and client-side code to get the same characteristics as defined in the spec, and there's the cross-network-boundary possibility of communicating from one Reactive Stream to another over a network. As long as the interaction is clearly defined, I don't think the transport and protocol (or combination of the two) really matters.

e.g. one could conceivably use plain HTTP + Javascript on the client side (sending JSON) and the Reactive Streams application on the server side would simply invoke onComplete to communicate back to the JS in the browser that the request is finished. This would work slightly differently in a WebSocket where the responses might come back individually rather than all at once as in the body of an HTTP response.

@benjchristensen

After a great meeting today at Netflix with @tmontgomery I am more convinced that we should expand this out into several projects.

I propose we focus this main reactive-streams project on the contract, semantics and possibly network protocol definitions, and move the JVM implementation into reactive-streams-jvm along with other projects for the various other platforms.

To start with I think we need at least these:

  • /reactive-streams/reactive-streams
    • contract and semantics
    • network protocol?
  • /reactive-streams/reactive-streams-websockets
    • websockets network protocol
  • /reactive-streams/reactive-streams-jvm
    • JVM interfaces
  • /reactive-streams/reactive-streams-javascript

We could also (if we're ambitious) include reference implementations of the network protocol, at least in Java and Javascript. Or we leave that to external projects to implement according to the network protocol.

Todd and I are ready to start working on defining the websockets protocol compliant with reactive-streams and Netflix is ready to start implementing on top of it. Do we have agreement on making reactive-streams polyglot and include the network protocol? If so, can I proceed to create the new repos and migrate the JVM interfaces into reactive-streams-jvm?

@jbrisbin
Collaborator

Enthusiastic +1 from me. What can I do to help?

@smaldini
Collaborator
@tmontgomery
Collaborator

We probably only need 1 protocol spec that can run over any reliable unicast protocol (TCP or WebSocket or something else). I really like the direction this work is heading!

@benjchristensen

@rkuhn Do you agree with and support splitting into multiple sub-projects under the "github.com/reactive-streams" umbrella?

@kirkshoop

+1 !
I was going to phrase it differently, but the result is the same. There are many ways to communicate the signals across an async boundary and the tradeoffs for a surface, change for each language and protocol (A return value from an on_next function might be okay but an ACK message for each on_next in a protocol is not so good.).

@tmontgomery
Collaborator

Exactly.... communicating across an async [binary] boundary.

To that end, the types of transports that the protocol would need to support are, at least: TCP, WebSocket, and IPC (most likely in the form of a shared memory SPSC queue). Adaptation to a multicast media (or SPMC or MPMC queue) probably should be considered, but might need to be treated differently.

With request(n) flow control semantics, it shouldn't be necessary to ACK every message in the general case. For persistency semantics, though, request(n) could piggyback/infer message consumption.

@rkuhn
Owner

Yes, enthusiastic +1 from me as well! I also agree with the proposed split into multiple sub-projects. The only part I am not sure I understand correctly is the “network protocols” part: there are different network protocols available today, some of which already have all desired semantics (plus some extraneous ones, like TCP) and some of which need dedicated protocol descriptions for supporting Reactive Streams (like UDP, raw Ethernet frames, etc.), and in addition people may choose to implement a stream transport on a completely new medium as well. Therefore in my opinion we can leave out the network protocol description from the reactive-streams project and add concrete ones like reactive-streams-udp as we get around to specifying them. The most important part then is to agree on the semantics—call it an abstract protocol if you will—and fix that first.

Apropos: I take it that we agree on #46, especially given this angle on the scope of the project? If so, please comment on that issue as well.

@tmontgomery
Collaborator

TCP semantics, and by extension WebSocket, don't have some semantics that are needed. And as @rkuhn points out, has some additional unneeded and undesired semantics.

Specifically, TCP and WS have no concept of channels. So, while a single stream can be supported, multiple concurrent streams (with or without head-of-line blocking) can't be supported without an additional framing layer. However, protocols like SPDY and HTTP2 have a nice framing mechanism that could be leveraged directly.

Additionally, request(n) semantics require a control protocol on top to be implemented. The most notable issue that TCP introduces here is an impedance mismatch between initial reception via push and flow control. i.e. a TCP sender may slam in an initial segment (1500 bytes of data or more depending) without the receiver requesting it. Relying solely on TCPs flow control would introduce some potentially bad artifacts into how the back pressure works, IMO.

Such a framing and control layer would also be needed by any other transport (UDP, etc.) as well.

@rkuhn
Owner

@tmontgomery I see, we were talking about slightly different things: what I meant was dedicating one TCP connection to the transfer of one stream, in which case no extra capabilities are needed beyond serialization and deserialization of the data items, using the event-based socket operations to convey data and demand. The difference between TCP and a Publisher/Subscriber pair is that that would act like a buffer:

--> Subscriber --> Buffer (TCP connection) --> Publisher -->

where the buffer size is given by output + input buffers plus network components and in-flight bits. As far as I can see (please correct me if I’m wrong) this still has the behavior of dynamic push & pull, i.e. it switches between the two according to which side is currently faster (with the man in the middle capping how fast the receiver can appear to be). Are there other artifacts or bad effects that I am missing?

If we want to transfer multiple streams over a shared medium (like one WebSocket or one TCP connection or one Unix pipe) then we will of course need some multiplexing and scheduling capability, including dedicated return channels for the demand signaling. I am not sure whether that should be our first goal, though, since this can be implemented on a primitive (single) stream by merging and splitting streams in the “application layer”.

If OTOH the transport mechanism already comes with channel support, then we should of course try to use that. One potential issue I see with just pushing data and demand across an HTTP2 WebSocket is that multiple streams will have to be balanced in a fair fashion somehow, which implies that we must specify a scheduler or the protocol stack needs to already come with one—is that already the case? (please excuse my ignorance, I have yet to read up on this topic)

@tmontgomery
Collaborator

@rkuhn Even with a single stream over a TCP connection, I believe you will need to consider having a control protocol for the semantics as I have read so far. You are correct in that TCP has flow control (all flow control must be push and pull), but those semantics in TCP have subtleties. That is perhaps the bit that is being missed. In TCP, controlling flow control only by receiving data rate is a very coarse grained tool. And has some quirks.... one is the interaction of Nagle and Delayed ACKs, for example.

You are absolutely correct about multiple streams and scheduling. It becomes a scheduling problem immediately, but it is worse than that, actually. Here are a couple links that you might find interesting on the subject.

http://sites.inka.de/~W1011/devel/tcp-tcp.html
http://www.ietf.org/mail-archive/web/tls/current/msg03363.html

Some of the complexity has creeped into HTTP2, which is unfortunate, as the protocol has no other option than to expose the controls to the application. And most applications won't have any clue how to handle the complexity.

However, I see that as a tremendous opportunity for reactive streams to bring value. It's a simpler mechanism bound to the application. It's in a perfect position to make this easier, much easier. And accessible for applications.

And multiple streams per connection should be considered the norm, IMO. A single stream only per TCP connection is going to be very limiting. A proliferation of TCP connections is not a good thing. In fact, one of the main motivations for device battery life, for example, is very few TCP connections to keep device out of the high energy state as much as possible. HTTP2 and SPDY multiplex requests and responses and use multiple streams for many reasons. One of which is to reduce TCP connection counts for browsers and servers.

With that in mind, standardizing how to mux multiple streams onto any transport is a good thing, I think.

@rkuhn
Owner

Thanks a lot for this explanation, it entirely makes sense. Great to have you on board!

@benjchristensen

multiple streams per connection should be considered the norm

Agreed, otherwise this doesn't work for us in what we're pursuing with both TCP and WebSockets.

@benjchristensen

It seems we have consensus to migrate this initiative to being polyglot, so shall we move forward with making sub-projects and moving things around?

Shall we start with these?

1) /reactive-streams/reactive-streams

  • contract and semantics
  • move the code from here to reactive-streams-jvm

2) /reactive-streams/reactive-streams-websockets

  • websockets network protocol

3) /reactive-streams/reactive-streams-jvm

  • JVM interfaces (moved from reactive-streams)

4) Modify www.reactive-streams.org to not be JVM only

@tmontgomery Do we need separate projects for TCP and websockets, or can they be together? If together, under what name?

@Coneko Coneko referenced this issue in ReactiveCocoa/ReactiveCocoa
Closed

Conform to reactive-streams.org back pressure interface #1320

@rkuhn
Owner

I’d say we can start moving things around once we have fixed the discrepancies between code and documentation (i.e. #41 and possibly follow-up fixes are merged).

@benjchristensen

I'm okay with that ... I'll keep tracking #41 and #46 closely this week so we can try and finish those discussions to unblock everything else.

@rkuhn
Owner

Sounds good!

@benjchristensen

@rkuhn Shall we proceed now that we've agreed in #46 and merged the contract definition?

Do you want to release 0.4 first from the current project structure and then proceed with the steps I outlined above in #45 (comment) ?

@rkuhn
Owner
@benjchristensen

Okay, so shall I proceed with submitting a PR to propose the changes?

Do you agree with the layout I suggested?

@drewhk
Collaborator

I don't like the idea of specifying a protocol here for many reasons. one of them that it feels completely out of scope.

Even with a single stream over a TCP connection, I believe you will need to consider having a control protocol for the semantics as I have read so far.

The reactive streams semantics will allow a recipient to send a huge request if the potential set of answers fits its memory, but that does not translate to the kernel buffer size which will eventually has to deal with the incoming binary data from the wire. The semantics do not map one-to-one, so I think this is misguided.

You will inevitably need a local bridge Subscriber that communicates properly with whatever underlying kernel driver it must talk to and gives proper request counts.

@tmontgomery
Collaborator

@drewhk Actually, what you mention makes the case for there being a control protocol. Relying on TCP semantics here alone is not enough because of the possibility of overrunning the subscriber side if the obvious solution were to be used.

In the obvious solution, there must be double buffering outside the kernel SO_RCVBUF and SO_SNDBUF. In that model without a control protocol, for large objects, the buffers become, effectively, unbounded. There is no visibility to bound them on the publishers side without appropriate reactive streams level feedback. That feedback may be based on objects instead of bytes, but object size can be bounded (and should be and is in any real system e.g. via fragmentation).

Whether a bridge subscriber is used as a solution is open for debate. It's hardly the inevitable solution, IMO.

In most cases, having event units that are in the multi GB range, means a range of tradeoffs to be made. Adding another point to buffer multiple fragments into a single unit (like a bridge subscriber would) is a less than ideal solution. I would handle that by only reassembling at the end subscriber site. Where it has to be... unless the system decides that smaller units are the way to go anyway.

@drewhk
Collaborator

Actually, what you mention makes the case for there being a control protocol. Relying on TCP semantics here alone is not enough because of the possibility of overrunning the subscriber side if the obvious solution were to be used.

I am not sure what you mean here. Just because a remote Subscriber requests for 1 000 000 elements that does not mean that the underlying transport should also request 1 000 000 elements. It might even -- ad absurdum -- request elements one by one and have a buffer size of exactly 1 element and eventually stil serves the 1 000 000 requests. There is no unboundedness here.

Another example similar to this, just because you have a chain of purely asynchronous map stages:

map.map.map.map

And the downstream subscriber for the last map requests 1 000 000 elements, that does not mean that the map stages between each other will also issue a request for 1 000 000. That would mean that each step is forced to have a buffer that can hold 1 000 000 elements worst case. On the other hand they can have a buffer size of 1, 128, or even different buffer sizes in-between, and issuing requests in even smaller batches (say, bufSize / 2 for example).

@tmontgomery
Collaborator

Further on in my comment, I actually mention large objects (i.e. large elements). Which is what I thought you meant. Overrun in that case is because of a single large object. And a request of 1. That has overrun possibilities unless handled well. i.e. if you request 1GB in a single element. Which users will do.

The underlying implementation can optimize down to a single element for a pipeline. In fact, it should. So, the 1M elements you mention can be handled a number of ways fine. However, below a single element, the only possibility is fragmentation. Which needs to be reassembled. Which works best with a framing protocol and definition of MTU. Without that, you are left with very few options how to handle it efficiently.

@danarmak

The discussion in #47 may be relevant here.

The Reactive Streams interface allows requesting n 'elements', but if the element type is a byte array and not a single byte, there is no way to request a number of bytes. It's impossible to "request 1GB in a single element"; if you request a single element of type byte array, the publisher can give a single byte array of any size it wishes and still be within the spec.

An implementation can introduce controls for the byte array size, but if the on-the-wire protocol is some standardized form of Reactive Streams, it won't be able to communicate this to the other side. In #47 I said this made 'byte streams' between different implementations over the network impractical. The response was that the implementation talking to the network on each end should document its buffer size (the size of the byte arrays it produces), and specific publishers should also document the chunk sizes they produce, and then consumers can rely on those two things. We'll see in practice if this is good enough.

What is the element type of your Reactive Stream? If it's byte[], then there is no way to signal the size of the byte[] you want. If it's some object, but that object type's size can vary greatly and it can be split and merged (eg HTTP message chunks), the same problem exists. The type can't be byte because then you have to call onNext for each byte.

If on the other hand the element type can't be split and merged, then you have no choice but to request some whole amount of elements. If a single element can be 1GB in size, but you can only process whole elements, then you don't have a choice but to buffer it. If you don't want to buffer it, write a streaming processor and change your element type to a smaller fixed-size frame.

@jbrisbin
Collaborator

I would imagine that an implementation wanting to stream raw byte[]s would provide a bounded Buffer facility that would allow you to tell the Publisher on configuration, how big of a chunk to use. Then request(10) doesn't mean 10 byte[Integer.MAX_VALUE] but 10 bounded Buffers. This would be part of the initial configuration of the Publisher I would expect. One could imagine it being volatile as well so that the buffer size could be changed ad hoc but that would be determined by the implementation.

I don't think this particular issue can be spec'd away. IMO this is going to have to be a best practice given the intent of the spec to always be bounded. Since the spec intends handling N elements to be finite, it seems logical to assume that those objects also need clear bounds to fit into the "spirit" of the spec.

It should be possible to spec out how to handshake a Publisher/Subscriber pair over an IO boundary such that both parties agree on the bounds. If a Subscriber can only successfully subscribe to a Publisher after agreeing on a buffer size, then it's not necessary to stipulate how that agreement is arrived at--or its specifics--but that there was a mechanism in place that allowed the two parties to agree to acceptable terms before beginning the "real" process of exchanging data.

This is how I see a "control protocol" being useful to the spec. Its at a much higher level than maybe what we're used to thinking of when using that phrase. Maybe it's closer to a "handshake protocol" or somesuch.

@danarmak

I also think that a handshake protocol would be useful. It should be opt-in, to keep the base spec simple, since many users deal with objects of a predictable size, and don't need any negotiation. Each side could also refuse to communicate if the other side didn't negotiate. There should be a reasonable way to behave in its absence when transporting e.g. byte arrays.

When should it be used? It's easiest to leave it to the application to decide, but what do we expect? Only across networks? Across inter-process boundaries? Whenever a Reactive Stream has an element type that is an array, string or similar? Would it usually be end-to-end, or point-to-point?

@jbrisbin
Collaborator

IMO any time the data will cross a substantially expensive "boundary" this should apply. That might mean crossing from userland into network, from business logic into serialization, from one JVM to another process, from client to server, etc... The boundaries don't have to involve IO but mostly likely will 90% of the time.

@tmontgomery
Collaborator

Negotiation would be a pretty useful option. Which could be done in the subscribe. Negotiation ala TLS/SSL, no. However, negotiation ala WebSocket protocol/extension or HTTP/2, yes. Simpler forms make more sense. Simply a union of capabilities on the exchange.

Any point where (de)serialization is required (due to crossing a binary boundary - network, disk, process) will benefit from having the following:

  1. Means to frame elements that includes fragmentation/reassembly (but NOT additional element serialization)
  2. Means to provide stream multiplexing
  3. Means to control flow in accordance with agreed to semantics beyond transport flow control
  4. Means to negotiate Publisher/Subscriber behaviors/options/configuration

Are there additional thoughts? (maybe this should be a separate issue to discuss?)

@drewhk
Collaborator

I believe sending 1GB elements is an abuse of streams since the main purpose is to be able to represent large datasets as a partially materialized stream instead of a huge blob -- elements should be bounded. Fragmentation and reassembly is just a simple stream processing element, and the mentioned negotiation can also be expressed in terms of streams. I understand that a standardized way is useful, but I feel this is just too early.

Means to frame elements that includes fragmentation/reassembly (but NOT additional element serialization)
Means to provide stream multiplexing
Means to control flow in accordance with agreed to semantics beyond transport flow control
Means to negotiate Publisher/Subscriber behaviors/options/configuration

While the above are good ideas, I don't think it is a good idea to rush this. Introducing an n+1th protocol is not an effort that should not be underestimated (and the technical effort might be the smallest one here). For example multiplexing requires a platform independent way to identify stream endpoints (an URI scheme? have a registry as part of the SPI? etc.) that raises a lot of questions already, without even touching the question of a custom transport/flow-control protocol.

@tmontgomery
Collaborator

And no spec has been proposed...

So, don't think there is a rush, really. I've certainly got enough going on that I have to be careful how much time to devote to the effort anyway. I'm passionate about the entire effort, though, and would like to not see it suffer from the crippling mistakes of JMS.

Good news is that we're not reinventing the wheel. There are several good protocols/designs to build from. Nothing I have seen here is that new or difficult.

Multiplexing and how to identify endpoints is well understood and used by a number of protocols dating all the way back to SOCKS and beyond. I see no reason why the HTTP/2 and SPDY model wouldn't work for this. It's almost directly analogous and it doesn't mean the lookup has to be tied to anything specific out of the gate and it can evolve separately if needed. In short, the "name" of a stream can be a blob, that blob generates (via allocation by one side) an ephemeral ID for efficient exchange that has a given lifetime for the connection.

Flow control is both understood and not understood at the same time. HTTP/2 is handling a similar, yet more encompassing problem and making it quite complex. The needs here are much much simpler and for the most part more understood. At least until implementation constraints would invalidate them. In short, make request mimic TCP flow control at the object level. Divorce the underlying transport buffering from the application constraints so we can avoid issues like
TCP interaction between Nagle and Delayed ACKs without devolving to implementation specific workarounds.

The only way I would think a pure transport protocol (akin to TCP, WebSocket, PGM, etc.) would be required to be speced would be in the area of reliable multicast.... But, I don't think a custom transport protocol for reliable multicast is needed. There are already several reliable multicast protocols that can fit... and there could be more on the horizon as well. However, an application control protocol on top would be needed.

@danarmak

I think a way to send Reactive Streams on top of HTTP would be useful too. There are many places you can reach with HTTP but not with a custom TCP based protocol, because of firewalls blocking everything else, and because of server components running inside HTTP app servers.

To clarify, I'm certainly not saying that only HTTP should be supported. HTTP wouldn't be as efficient or nearly as convenient as a custom solution.

@tmontgomery
Collaborator

HTTP/1.1 is problematic for the semantics needed. Specifically, streaming from server to client without resorting to long polling. However, HTTP/1.1 with WebSocket can work and is mostly OK wrt app servers and load balancers. i.e. there are numerous solutions that support WebSocket fine, but not every old component works off the shelf as required. For an old netgear sitting in someones living room using HTTP/1.1 with WebSocket to connect out, you are fine, for example. Most 101 switching protocols, like WebSocket, work fine with firewall traversal. Most modern app servers support WebSocket as well. If not full JSR 356 compliance. Even if app servers are a terrible model for deployment. Load balancers are a mixed bag, but good solutions exist and more are coming.

HTTP/2 (or SPDY) is another option, but brings a lot of mandatory baggage.

BTW, I am talking at QCon NY on this stuff and will be mentioning Reactive Streams quite a bit.

@viktorklang
Owner
@danarmak

Alternatives to HTTP/1.1 are better when available, but sadly, deployment options in many large organizations are limited to HTTP.

Here's a more general question: what transport assumptions should be made by the Reactive Streams remote protocol specification (which this conversation is about)?

Should it assume an opaque stream-type transport to be given (TCP, pipes, websockets, optional TLS, etc) or should it concern itself with lower level stuff like the underlying flow control?

Should it be couched in terms of sending messages over the underlying channel (which could be implemented on top of any message bus in theory), or in terms of managing the transmit-receive buffers (relying on having the stream to itself and trying to optimize from there)?

@tmontgomery
Collaborator

@viktorklang the HTTP/2 WG is going through a healthy debate right now. As proposed h2 has a lot of mandatory pieces and a lot of really ugly stuff to straighten out. The WG is suffering right now from a push that is being driven by the pain of poor design decisions in the past in HTTP/1 that is only getting more acute. I recommend the thread for anyone who is interested in the future of web protocols.

@viktorklang
Owner
@tmontgomery
Collaborator

@danarmak IMO and experience, I would not assume a stream-based abstraction. And dangerous to rely on buffer sizing (if I understand your suggestion). TCP has some dark corners at high speed to work around, for example.

Here's a more general question: what transport assumptions should be made by the Reactive Streams remote protocol specification (which this conversation is about)?

Should it assume an opaque stream-type transport to be given (TCP, pipes, websockets, optional TLS, etc) or should it concern itself with lower level stuff like the underlying flow control?

The assumptions that make the most sense would be transports that span stream-based, datagram-based, and (R)DMA*. All require framing and all can use the same framing.

Not making assumptions based on transport flow and congestion control as the huge advantage this entire work has is application level flow control. And besides, it doesn't exist for UDP (congestion control, yes, flow control, no) or (R)DMA (where flow is done differently).

Should it be couched in terms of sending messages over the underlying channel (which could be implemented on top of any message bus in theory), or in terms of managing the transmit-receive buffers (relying on having the stream to itself and trying to optimize from there)?

Sending framed messages over the underlying channel makes more sense.

Simply managing the transmit-receive buffers is attractive, but very tricky. See HTTP/2 WG archive for some of the issues. Be great to avoid this hairball. Think of it this way, double buffering is required to avoid some behaviors at speed. That introduces indirection into how to control flow. A little like trying to push a door closed with a rope. That is loss of control and a spiral into more and more buffering to compensate and thus more indirection. For more on that, I suggest looking at what has happened with the evolution and combating of buffer bloat. It's a little different, but the results are the same.

BTW, using (R)DMA to illustrate various techniques that involve lock-free/wait-free mechanisms on top of shared memory. Whether that be InfiniBand, RoCE, PCI-e3, or good ol' L2/L3 memory.

@danarmak

The assumptions that make the most sense would be transports that span stream-based, datagram-based, and (R)DMA*. All require framing and all can use the same framing.

You referred to problems with TCP. Do those problems go away if you handle flow control at the application level, while still running on top of TCP?

Does the application-level algorithm need to include TCP-specific behavior to perform well, and how many underlying transports would we end up supporting (in the spec, not in an implementation)? At a minimum, it would have to take into account whether the underlying transport does its own retrying or not, and maybe buffering related issues (TCP window scaling...), but I'm no expert on TCP. There are presumably issues with TCP routers along the way...

Would running on TCP be significantly less efficient than running the same algorithm on UDP? Would a naive algorithm on top of TCP be much worse than a TCP-optimized variant?

@tmontgomery
Collaborator

You referred to problems with TCP. Do those problems go away if you handle flow control at the application level, while still running on top of TCP?

No, they don't go away... unless you aren't using TCP. However, the problems can be handled in a standardized way instead of being implementation specific and often not interoperable in a consistent way between implementations. So, the big advantage is consistency for interoperability. And some simplification as a result if done well.

Does the application-level algorithm need to include TCP-specific behavior to perform well, and how many underlying transports would we end up supporting (in the spec, not in an implementation)? At a minimum, it would have to take into account whether the underlying transport does its own retrying or not, and maybe buffering related issues (TCP window scaling...), but I'm no expert on TCP. There are presumably issues with TCP routers along the way...

The application-level protocol doesn't need to include any TCP-specific behavior. Nor should it. Three transport "classes" suffices. Reliable, best effort, end-to-end delivery is a decent base assumption that allows TCP, reliable multicast (like NORM), and shared memory options.

Would running on TCP be significantly less efficient than running the same algorithm on UDP? Would a naive algorithm on top of TCP be much worse than a TCP-optimized variant?

This is complex to answer. There are few absolutes. Flow control is different than congestion control. Which makes this hard to answer in a general case comparing TCP and unicast UDP on arbitrary networks. Also, the big problem with TCP and multiple streams is going to be head-of-line blocking. The results of which are increased delay/latency. This is very noticeable for REST APIs, for example.

Multicast UDP opens up additional questions and tradeoffs wrt flow and congestion control that effect perceived performance. RFC 4654 is a place to look for more on congestion control. There really isn't viable flow control solutions for multicast, but the same issues exist as with congestion control, just need different solutions.

A naive algorithm on top of TCP is almost assuredly going to be an order of magnitude (if not more) off where it could be if appropriately optimized for TCP. Could even be worse if the naive solution reintroduces silly window on top of TCP. Which is easy to do with the semantics of reactive streams.

BTW, window scaling is an artifact of TCP usage beyond the original design. Specifically, high bandwidth delay product networks, like satellite, in an effort to make them more efficient at bandwidth utilization.

@drewhk
Collaborator

This is complex to answer. There are few absolutes. Flow control is different than congestion control. Which makes this hard to answer in a general case comparing TCP and unicast UDP on arbitrary networks. Also, the big problem with TCP and multiple streams is going to be head-of-line blocking. The results of which are increased delay/latency. This is very noticeable for REST APIs, for example.

Head-of-line blocking will happen in any bounded case even if you have separate demultiplex buffers for the substreams. Once one of them fills up you are still unable to dequeue the next frame from the TCP buffer since it might end up just in the filled up buffer, and you don't know where it would end up without dequeueing. As for the general case, yes comparing UDP, TCP or any transport protocol is impossible on arbitrary networks (just take mobile, where you have multipath-fading, soft and hard handoffs, continuously changing bitrates because of adaptive modulation and of course any form of IP mobility solution that is in the way).

A naive algorithm on top of TCP is almost assuredly going to be an order of magnitude (if not more) off where it could be if appropriately optimized for TCP. Could even be worse if the naive solution reintroduces silly window on top of TCP. Which is easy to do with the semantics of reactive streams.

Do you have an example of this?

@drewhk
Collaborator

Correction of myself: head-of-line blocking as in waiting for a missing piece (gap) in a sequenced stream is obviously an issue, which causes all substreams to suffer the potential RTT of a redelivery of a missing piece from unrelated substreams. Solving this issue obviously need proper substream channel at the transport level (i.e. cannot be built on top of TCP).

@viktorklang
Owner
@tmontgomery
Collaborator

A naive algorithm on top of TCP is almost assuredly going to be an order of magnitude (if not more) off where it could be if appropriately optimized for TCP. Could even be worse if the naive solution reintroduces silly window on top of TCP. Which is easy to do with the semantics of reactive streams.

Do you have an example of this?

The link to interaction of Nagle and Delayed ACKs that I posted is one. Just keep playing with the send sizes in relation to buffer sizes as well as larger round trip time. It can get very bad.

@tmontgomery
Collaborator

@drewhk @viktorklang

Which means something like SCTP, QUIC, etc?

Something like it, yes.

@drewhk
Collaborator

The link to interaction of Nagle and Delayed ACKs that I posted is one. Just keep playing with the send sizes in relation to buffer sizes as well as larger round trip time. It can get very bad.

Ok, but I thought you are talking about onNext, request from a Subscriber side. In fact this is exactly why I like to isolate the local backpressure schedule from the link flow control/batching/framing schedule.

We discussed things with @rkuhn with some beers, and he convinced me that substream request signals should actually be propagated through the channel (to avoid the scenario I sketched where the demultiplexer is blocked because one of the substream output buffers are full). So I think the best option is that the local Publishers representing the substream should bridge the request/onNext schedule from its local consumers and buffer/progress according to their own schedule that fits best the underlying transport/channel.

The bridging for example prevents a scenario where the local Subscriber operates strictly in a Stop-And-Wait fashion (always requesting exactly one after each consumed element), since the local bridge can protect the underlying channel from this schedule and work in a proper windowed mode while still serving the local Subscriber in the way it wants. It still sends its own request signals through the channel though, so that if it ever becomes full, other substreams can still progress since the sender will not send frames corresponding to the blocked substream, therefore the demux will not block.

@drewhk
Collaborator

Which means something like SCTP, QUIC, etc?

Well, SCTP is not as widely available, so that leaves QUIC which uses UDP. Or we build something on top of UDP taking the parts from QUIC that are relevant.

Btw. isn't the number of available substreams in SCTP or QUIC limited? It was long ago when I read the specs, but I recall max 64 subchannels in SCTP.

@tmontgomery
Collaborator

@drewk SCTP is used with WebRTC now. So, slightly more common. Which uses it over DTLS over UDP. Normal uses of SCTP were using IP directly (IP protocol number 132, IIRC). Which presents some additional barriers to overcome.

SCTP stream field is 16-bits. But implementations may limit this to lower than the 64K available. Not sure about QUIC limits off hand. The state management of substreams/channels doesn't come for free. There is always some limit beyond just the field size.

Some messaging systems can support substreams without head-of-line blocking. 0MQ and Ultra Messaging are the immediate ones that come to mind, but others might also.

Also, there is some other work being done in this area that will be public in a few months.

I don't think we will need to do our own transport protocol. We can ride on top of existing ones if we design it well enough.

@viktorklang
Owner

Is this something that will be pursued (i.e. should we close it or who wants to "drive" it?) @reactive-streams/contributors

@benjchristensen

I'd still like to see this. Focusing on getting the Java side of things figured out right now then will turn my attention to at least WebSockets.

@tmontgomery
Collaborator

Same as @benjchristensen . I'd like to see this. And plan to work on WebSocket based protocol once I can clean a few things off my plate.

@viktorklang
Owner
@rozza

Regarding dotnet I see in their design notes that they are looking at working on better solutions to async sequences / streams:

Async sequences: We introduced single-value asynchrony in C# 5, but do not yet 
have a satisfactory approach to asynchronous sequences or streams

Might be a good time to collaborate - see: dotnet/roslyn#98

@viktorklang
Owner

Hi @rozza, sorry for the belated reply. Thanks for the heads up!

@jeffsteinmetz

Is it safe to say Reactive Streams are currently meant work on a single machine / single JVM?

My confusion is based on my understanding of Akka supporting remote actors (in a cluster).
That said, is there an outline that describes the use case Reactive Streams are best suited for?

Specifically, can subscribers be distributed across a cluster?
Or am I missing the point?

@smaldini
Collaborator

Nope its not meant to work a single machine, its a contract over how a resource A passes bounded demand to B. Naturally the contract fits in Java threading but it is more than that. If you write a driver for a database or a message broker, or a microservice client, you will need to deal with some issues RS takes care of:

  • Latency: thats why the contract uses specific callbacks on subscribe, next, error and completion signals. They might happen eventually, not when a call to the protected resource is made (blocking).
  • Error isolation: the error signal carries the effective underlying errors instead of letting an implementation rising an exception.
  • Backpressure: the Subscriber implementation, which can be an http client driver for instance, will control how many data should be read upstream by the Publisher (or even a Processor chain, as in a data pipeline). Because it can control this demand when it has effectively sent his data over IO, it knows how many in-flight data there is. The publisher won't send anything to subscribers who didn't ask a given amount of data, naturally propagating the backpressure decision to the source.
  • Batching: since the demand is a long number between 1 and Long.MAX, an implementation can decide to group/aggregate events accordingly without introducing specific metadata information.

Beyond that the contract specifies additional control over when to start and stop consuming (Subscription protocol). E.g., a database driver will close is connection if a Subscriber invoked Subscription.cancel.

Now all these classic software components (the drivers etc) can implement the same contract to pass these signals around, this is where the win occurs for all of us implementors and users. Because we have a single contracts, database drivers, message broker drivers, reactive systems including reactive extensions, clients, servers, they can all be bound in a single processing chain (calling subscribe in chain) to compose a fully asynchronous pipeline with flow-control. Obviously adding IO protocol to carry these signals would be a plus to the spec as we have to implement this for every new transport, but a good chunk of the flow can end up being truly reactive right now.

Hope it clarifies a bit !

@jeffsteinmetz

Thank you for the quick response.

So the "current" status of Reactive Streams does allow passing messages to remote actors on remote servers (I want to make sure we are talking about the current implementation vs the discussion in this thread to create new protocols.)

To be specific, here is a proposed scenario using the "current" version of RS:

A REST front end, which is a Play Application, receives with several JSON posts (lots of streaming docs or batches of docs) being sent to it from many remote sources. Realizing this could also be an AKKA HTTP endpoint, but in our case, we use Play everywhere upstream.

Logical Outline might be:
This Play Framework initiates the source of the stream.

Then it calls an actor (which may or may not be on the same machine) which is the Publisher of the stream
Then several subscribers to this stream may live anywhere on the cluster.

1 subscriber (actually 2 or more for redundancy) might simply drop the JSON in an S3 bucket on amazon,

another subscriber (again, actually 2 or more for redundancy) on more servers might do some ETL / Data cleaning, and drop some useful summary data in Cassandra, and so on, and so on..

I am scoping out some ideas on when a simple load balanced front end (a Play app that does everything, but several load balanced with some supporting app tier micro services) might be enough, vs building out a Play -> akka cluster vs building out Play -> Akka Streaming Cluster.

Getting my head around a specific use case.

@viktorklang
Owner

@jeffsteinmetz Hi Jeff,

some of your questions sound like they may be more suited for the Play or Akka User MLs.

The goal behind this Issue is to provide, as @smaldini says: "[…] adding IO protocol to carry these signals would be a plus to the spec as we have to implement this for every new transport, but a good chunk of the flow can end up being truly reactive right now."

Defining the "how" to transport Reactive Streams signals across the network would allow for seamless interoperability between different platforms (think JS, Python, JVM, .NET) over different networked transports.

I hope that clarifies it a bit :)

@jeffsteinmetz

Makes sense. I figured the question was a bit of a broad topic to be posting within an "issue" on github.
So in summary, Akka to Akka reactive streams "do" support transport in a cluster (across a network boundary).

p.s.
After doing more research (and to really keep me on my toes) I see Akka has an EventBus for Pub/Sub and a related Event Stream which adds one more "way to do something" to my list.
EventBus + EventStream looks unrelated to ReactiveStream, but may solve a similar problem (without back pressure).

And then - I found Akka Persistance to ensure at least one delivery, which feels very Kafka like. Lots to try.
:)

@blesh

Up until this afternoon, I was under the impression this org/repository was solely to support standardization of reactive streams on the JVM. Consequently, we created this organization this afternoon (https://github.com/Reactive-Streams-Open-Standard/reactive-streams-spec) which is JavaScript specific.

Speaking with @benjchristensen this afternoon, I found out about this polyglot effort and I was wondering if you all would like to role the JavaScript effort in under your organization.

The focus of the JavaScript effort right now is as follows:

  • Identify a common, open standard for Reactive Streams (EventStreams/Observables) for use by consuming JS libraries
  • Possibly create a set of unit tests (ala Promises/A+) to help test compliance to the standards.

In the future, it would also be great to have the JavaScript effort collaborate in reactive networking standards, (being communication over HTTP/WebSockets/SSE/etc).

@tmontgomery
Collaborator

I would love to see a JavaScript effort here, myself. And I still intend to get to some network protocols around this effort.

@benjchristensen

@blesh I definitely would like to see this happen as part of reactive-streams.org, and thanks @tmontgomery for jumping in to provide support of this.

@reactive-streams/contributors I propose the following and would like your feedback and/or +1 on moving forward:

  • create reactive-streams/reactive-streams-javascript repo for JavaScript spec, types and TCK
  • rename reactive-stream/reactive-streams to reactive-streams-java so Java does not own the root name
  • create reactive-stream/reactive-streams-io to start defining network protocols
  • recreate reactive-streams/reactive-streams as a root project for a generalized spec derived from the Java version that all polyglot specs should conform to.
  • create separate contributors groups for each repo (Java, JavaScript and IO for now)
  • update of home page for reactive-streams.org that is updated to match the Reactive Streams for Java 1.0 release and embrace the polyglot aspirations.
@blesh

I'm happy to help! This is easily the most exciting set of developments in programming right now, IMO.

reactive-streams/reactive-streams-js is preferable to me*, but other than that :+1:

* It might help with searching GitHub or tabbing through directories in terminal too... (e.g. "reactive-streams-j", TAB ...)

@benjchristensen

@blesh I'm fine with reactive-streams/reactive-streams-js.

@viktorklang in particular I'd like your input on this.

@rkuhn
Owner

@blesh Developing the JavaScript standard and TCK under this organization is definitely a good idea, we’d love to extend the family to more than just JVM languages. Your suggested repository name sounds good to me, it matches how things are called otherwise :-)

@benjchristensen I support your proposal of renaming and integrating repositories, maybe with the slight alteration of calling the JVM dialect reactive-streams-jvm (since it is meant to be used from all JVM languages).

Should we leave the artifact names as they are, based on the reasoning that Maven central is only used for JAR-files anyway?

Concerning the website we will have to update for 1.0 anyway (ASAP, where the ‘P’ was the problem so far), so folding the polyglot aspects into that rewrite should be easy and it adds one more reason to not procrastinate ;-)

@benjchristensen

reactive-streams-jvm

I'm good with that name.

Should we leave the artifact names as they are, based on the reasoning that Maven central is only used for JAR-files anyway?

Yes, I think it's okay to leave the name.

<dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams</artifactId>
    <version>1.0.0</version>
</dependency>

I'm not aware of any other type of artifacts outside the JVM community that end up on Maven Central, so I don't think we need to change the artifactId to reactive-streams-jvm.

and it adds one more reason to not procrastinate

Sounds good :-)

@reactive-streams/contributors any other comments on this before we move forward with creating the new reactive-streams-js repo?

@viktorklang or @rkuhn I don't have privileges to create new repos, so one of you will have to do it.

@rkuhn
Owner

@benjchristensen I added you to the Owners, created the reactive-streams-io and reactive-streams-js repositories and renamed reactive-streams-jvm. @blesh has been added to the js-admin team so that he should be able to set everything in motion on the JavaScript side of things.

I hope I didn’t overlook anything, it is getting a little late here ;-)

@blesh

:+1: thank you, @rkuhn and @benjchristensen for helping us put this together.

@blesh

@rkuhn: Just checked, I don't see any admin features on that repo... I think maybe I'm missing something.

@blesh

@rkuhn: Nevermind, I just got the org invite. Took a while to show up in my email I guess.

@benjchristensen

Thanks @rkuhn for the quick action on this!

@viktorklang
Owner
@purplefox
Collaborator

Very interesting proposal, but also very ambitious.

Defining a standard messaging protocol for asynchronously moving data with back pressure.Platform/ language agnostic. Scores of language specific clients will then spring up. It could even be embedded in routers.

Sounds very much like what AMQP is/was supposed to be. I've never been a big fan of AMQP (too complex imho), but any attempt to solve the problem in a simpler/better way gets my attention :)

@smaldini
Collaborator
@benjchristensen benjchristensen referenced this issue in reactive-streams/reactive-streams-io
Open

Goals and Motivation #1

@benjchristensen

I have opened an issue at reactive-streams/reactive-streams-io#1 to start this up. Please tell me if I'm embarrassing myself and should stop, or help me move forward with it if it makes sense. I am somewhat out of my league in this area so am really seeking help and expertise, so forgive what I'm sure is naive and elementary attempt at describing what I'm seeking.

@benjchristensen

@purplefox Thanks for getting involved. I would love to have your guidance and involvement on this as it is honestly more aspirational for me than my core skill set.

@tmontgomery Todd, I rely heavily on your experience, skills and expressed interest as we've talked in the past to make this happen!

@tmontgomery
Collaborator

@benjchristensen I'm in! Will definitely help.

@viktorklang
Owner
@viktorklang
Owner

@reactive-streams/contributors Closing this issue since these efforts will be continued as reactive-streams subprojects!

@viktorklang viktorklang closed this
@briantopping

This issue might be worth reopening as I think there's one more working group that would be helpful to polyglot components. As Reactive Streams offers "seamless interoperability between different platforms (think JS, Python, JVM, .NET) over different networked transports", there ought to be an intelligent discovery system for components. This WG would define the metadata used across app subprojects such that components can be found in a unified manner.

Two schemes I've seen are active registries that require being pinged with a trackback URL to a publicly accessible repo (http://bower.io/search) or passive registration such as Maven Central. Since different platforms are unlikely to ever converge on a single repository over time, it seems that an active registry is a better way to go. The ability to easily discover components could eventually segue into IDE support for graphical pipelines, so it may be good to think in that direction as well.

@viktorklang
Owner

Hi @briantopping,

the topics are very interesting, but seem to supersede reactive-streams-jvm, so should be opened in a more general repo? Wdyt?

@briantopping

Hi @viktorklang, it seemed to be the case to open another repo, but I could have been missing something. I'm happy to start a discussion there to gauge interest and see what could be generated in the way of requirements if so.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.