Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WHATWG streams for data channel messages #1732

Closed
lgrahl opened this issue Jan 11, 2018 · 41 comments
Closed

WHATWG streams for data channel messages #1732

lgrahl opened this issue Jan 11, 2018 · 41 comments
Assignees

Comments

@lgrahl
Copy link
Contributor

lgrahl commented Jan 11, 2018

There is a long standing issue with large data channel messages in the existing API of RTCDataChannel. If I want to send a single message containing 1 GiB of data (for example a large file), I have to have this gigantic message in memory at the time of sending. If I receive a 1 GiB message, this message is slowly being reassembled until it's in memory and then handed out to the application. That creates backpressure and the like.

My idea is to resolve this by extending the RTCDataChannel API in the following way:

Sending

Add a .createWritableStream method which returns a WritableStream instance. With the associated WritableStreamWriter instance, one can add chunks by calling .write on the writer. Once the writer is closed, the message is considered complete.

Receiving

If .binaryType is set to stream, the event raised for onmessage contains a ReadableStream instance that is being created when the first chunk is being received. Once the whole message has been read, the reader will return EOF on a .read call (as specified by the streams API).

Edit: What should happen when a string is being received will need to be discussed.


In the meeting, I think there was a slight confusion about what streaming API I meant. Basically, I propose two streaming APIs that use WHATWG streams:

  • WHATWG streams for the QuicStream API is being discussed here: https://github.com/w3c/webrtc-quic/issues/2
  • WHATWG streams for the existing RTCDataChannel API. This is what this issue is for. And with the above description it should be clear that for RTCDataChannel there would have to be a stream for each message as data channels transfer datagrams and not a sequence of bytes. This is the point I wanted to make during the meeting. I hope this clarifies it a bit. :)
@martinthomson
Copy link
Member

This is a good idea.

You could define a data channel as a stream of streams, but that might be a little more difficult than the simple addition suggested here. I would try to find a way to model this more on fetch API model if possible, but I'd need to look carefully at the API to decide if that were feasible or just more disruptive.

You can use Blobs to avoid the memory commitment for big things, but that's not a great solution.

@martinthomson
Copy link
Member

See the fetch Body mixin for a model that might be an improvement, at least for receiving.

@lgrahl
Copy link
Contributor Author

lgrahl commented Jan 11, 2018

cc @annevk @ricea @domenic (as you might be interested on commenting this)

@domenic
Copy link
Contributor

domenic commented Jan 11, 2018

This would be super-great; I know developers who do both Node.js and Web RTC development, and their frustration in not having a streaming model for RTC data channels is part of what originally motivated the streams work.

The hardest part of designing this is the interaction with existing methods of reading/writing. That is, the most developer-friendly interface is (IMO) dataChannel.writable + dataChannel.readable, i.e. properties that always exist. See https://streams.spec.whatwg.org/#example-both. The issue then becomes figuring out what happens to your existing API when someone acquires a reader or writer from the stream. If possible, just making those APIs not function (e.g. never deliver events, or throw errors/reject promises) is the simplest way to do that.

Let me know how I can help; this would be very exciting to get working. I don't know much about RTC data channels at the moment myself, but perhaps I should read up.

@lgrahl
Copy link
Contributor Author

lgrahl commented Jan 11, 2018

That is, the most developer-friendly interface is (IMO) dataChannel.writable + dataChannel.readable, i.e. properties that always exist.

This sounds like you'd also like to wrap data channels, so handing out the messages (not the content) could also be done using the streams API? So, basically you'd have a stream that hands out messages where each message itself is a byte stream? I think this is what @martinthomson mentioned as well and it's a cool idea but I think we should start with using the streams API for the messages' content for now. (Wrapping data channels in such a way would make a neat JS lib, though.)

The issue then becomes figuring out what happens to your existing API when someone acquires a reader or writer from the stream.

Good point. This is what I think would be feasible:

  • Make .send and .createWritableStream raise an exception as long as there's an existing WritableStream instance that's not closed or aborted.
  • Either make .binaryType read-only once the first chunk has been received or make it read-only as long as a ReadableStream instance exists for an incoming message.

@domenic Maybe you can also comment on @martinthomson's idea regarding the use of fetch Body mixin as I'm not really familiar with it. If answering that would require understanding data channels a little more, send me a mail.

@lgrahl
Copy link
Contributor Author

lgrahl commented Jan 11, 2018

Also, to clarify if it's not clear:

The example you've mentioned (https://streams.spec.whatwg.org/#example-both) sends a WebSocket message for each chunk and when the stream is at EOF the WS connection is being closed. So, if I had one stream associated to a file, I can send that file and then the stream is closed.

Whereas I'm proposing an API where a RTCDataChannel message remains just a message but each message is a stream. When the stream for a specific message is at EOF, the message is complete. This makes it possible to send more than one message. So, if I had one stream associated to a file, I can send that file as one message.

@martinthomson
Copy link
Member

@lgrahl, a stream of streams only really makes sense if we can resolve the issue of whether messages on data channels are strictly ordered.

@martinthomson
Copy link
Member

Actually, belay that, lack of sleep. I assume that we would only be able to produce a stream of streams if the data channel were ordered. Though perhaps it's OK if there is no requirement for messages to be complete before the next message is delivered. There might be some caveats regarding back pressure in that case though.

If A is sent before B, but the first octets of B are delivered first, we would manifest a new message on the top-level stream. But A might be completed prior to completing B. For instance, the stream might deliver B2, B1, A1, A2, A3, B3. B1 and B2 would have to be reordered by the browser for delivery (messages themselves are always ordered internally), but it has no obligation to hold B until A arrives. In an ordered stream, the browser would have to hold all of B until A arrives.

(It seems like there's a memory exhaustion attack somewhere in there, but I'm sure our SCTP friends will point out the natural defense that applies.)

@lgrahl
Copy link
Contributor Author

lgrahl commented Jan 12, 2018

I'm getting confused by the term stream of streams. Maybe we can call this a sequence of data channel messages where each message is a stream of bytes?

SCTP (with the ndata extension) can interleave messages on different streams but not on the same stream. That means messages on a data channel are strictly ordered (even though messages can be abandoned when using partially reliable delivery - this doesn't affect order).

@lgrahl lgrahl changed the title WHATWG streams for data channels WHATWG streams for data channel messages Jan 12, 2018
@lgrahl
Copy link
Contributor Author

lgrahl commented Jan 13, 2018

I'm adding some examples here, so it's more clear how my idea would look like from a user's perspective:

Today's API

Peer A's code:

const dc = pc.createDataChannel(...);
dc.send(largeFile);

largeFile may be a Blob in which case the data doesn't have to be in memory at the time of sending (IIRC, please correct me if I'm wrong) or an ArrayBuffer in which case the data will need to be in memory.

Peer B's code:

const dc = pc.createDataChannel(...);
dc.binaryType = 'blob'; // ... or 'arraybuffer', it doesn't really matter here
dc.onmessage = (event) => {
    // Note: Ignoring the string case here.
    // ... write 'event.data' to disk
};

When the event has been raised, the large file's data has been transmitted entirely which can take a very long time. Also, the data will be in memory at that point.

Streamed (assuming the file is not a stream)

Peer A's code:

const dc = pc.createDataChannel(...);
const writableStream = dc.createWritableStream();
const writer = writableStream.getWriter();

while (true) {
    await writer.ready;
    // Note: Once a WritableStreamBYOBWriter exists, we could reuse the same buffer
    // over and over again.
    const length = Math.min(
        myPreferredChunkSize, remainingLength, writer.desiredSize)
    const buffer = new Uint8Array(length);
    // ... copy the file's chunk here into 'buffer'
    //    (or create a view if you already have the chunk in memory)
    writer.write(buffer);
    // ... continue until all chunks have been written and then break
}
await writer.close();

(Edit: I fixed the example above to not await the call to .write(...) but rather await the .ready attribute on the writable stream writer.)

If something goes wrong, you'd call writer.abort() which would abort sending the message. (We would have to discuss what .abort triggers but it should probably close the channel.)

Peer B's code:

const dc = pc.createDataChannel(...);
dc.binaryType = 'stream';
// We only need one small buffer per channel
const buffer = new Uint8Array(myPreferredChunkSize);
let view = new DataView(buffer.buffer, 0);

// Note: The function is declared async!
dc.onmessage = async (event) => {
    // Note: Ignoring the string case here.
    const readableStream = event.data;
    // You could also use the default reader but then the reader would not
    // copy directly into our buffer.
    const reader = readableStream.getReader({ mode: 'byob' });
    while (true) {
        const chunk = await reader.read(view);
        if (chunk.done) {
            break;
        } else {
            view = chunk.value; // We reclaim the buffer here
            // ... write the view's data to disk
        }
    }
};

The reader can call reader.cancel() to abort receiving data. (We would have to discuss what .cancel triggers but it should probably close the channel.)

This of course has the advantage that the file's data doesn't need to be in memory at once for both peers and that backpressure is being taken care of. Furthermore, writing to disk can be started with the first chunk received.

Streamed (assuming the file is also a stream)

So, the last example looked a lot more complicated than what we have now. But if the file could be read from and written to in form of a stream, it becomes pretty easy.

Peer A's code:

const file = ...;
const dc = pc.createDataChannel(...);
const writableStream = dc.createWritableStream();
await file.readableStream.pipeTo(writableStream);

Peer B's code:

const file = ...;
const dc = pc.createDataChannel(...);
dc.binaryType = 'stream';

// Note: The function is declared async!
dc.onmessage = async (event) => {
    // Note: Ignoring the string case here.
    const readableStream = event.data;
    await readableStream.pipeTo(file.writableStream);
};

Combination

Of course, peer A and B can just as well use the existing API on one side. It's entirely up to them.


There are several other ways to use WHATWG stream reader/writer instances. I've only shown two of them.

Comments? :)

@ricea
Copy link

ricea commented Jan 17, 2018

@lgrahl This looks good. I think in general it's better to have reusable wrappers for things like file APIs so that the simpler and easier pipeTo() style can be used.

@aboba
Copy link
Contributor

aboba commented Jan 17, 2018

@lgrahl The Streams API provides an example of creating readable streams by wrapping a WebSocket. Yet it appears that no text was required in the WebSockets API to enable this.

So is this something we actually need to change the WebRTC 1.0 document to enable?

@aboba aboba added the question label Jan 17, 2018
@lgrahl
Copy link
Contributor Author

lgrahl commented Jan 17, 2018

The mentioned example for WebSocket has several requirements not applicable to data channels and some problems (which the authors of the streams API couldn't address because the WebSocket API is just how it is):

  • It requires a reliable and ordered transport and therefore would break partially reliable, unreliable and unordered data channels.
  • It violates the purpose of a message-oriented protocol and breaks it down to a pure byte stream (well, rather chunk stream) protocol. The point of message-oriented protocols is that no framing mechanism in the upper layer is required. To elaborate: If I use a message-oriented protocol to send a file, I just send that as one message. And then I send another file in another message. But as this has been broken down to a byte stream now, I need to add a framing protocol that recognises the end of the first file before I can send another one.
  • This is not a byte stream but rather a stream of chunks. Thus, if the sender sends me 64 KiB chunks, the receiver will get 64 KiB chunks. Whereas with byte streams this is up to what either side prefers (and also requires less copying with BYOB readers because the underlying implementation can copy directly into that buffer if the reader is fast enough).
  • (There's no backpressure signal for the sender. Adding this in braces because the data channel API has events for that.)

(And yeah, I think the API proposed here would also make sense for WebSocket if feasible.)

Edit: I forgot to answer your question.

So is this something we actually need to change the WebRTC 1.0 document to enable?

Yes, the first two points aren't fixable without changes to the spec. (One could argue about the third point.)

@domenic
Copy link
Contributor

domenic commented Jan 17, 2018

I never understood the second point, but multiple people have brought it up, so I am probably the one in the wrong. But from what I can see, the wrapper creates a 1:1 mapping from WebSocket message to ReadableStream chunk, so there is no loss of expressiveness.

@lgrahl
Copy link
Contributor Author

lgrahl commented Jan 17, 2018

I will try to explain why the second point is important (even for WebSocket) because I believe you're not alone with that perception. Consider the following scenario: I want to receive two (or more) files.

Let's take this example as a starting point...

const file = <some stream>;
const readableStream = makeReadableWebSocketStream('wss://example.com:443/', 'protocol');
readableStream.pipeTo(file.writableStream);

Let's assume the sender will send each file as a separate stream. So, this is going to store the first file but after that the WS connection is closed. What can we do? Well, we could persuade the sender to send both files inside the same stream...

const file = <some stream>;
const readableStream = makeReadableWebSocketStream('wss://example.com:443/', 'protocol');
readableStream.pipeTo(file.writableStream);

Meh, now we've stored both files as one file. Unfortunate. What can we do? Well, we could add a header and include the file's size... or we add a header (or footer) to each chunk that tells us whether the file is EOF or not. Usually, we will prefer the latter because the first idea requires us to know the file's size. Something we may not always know (at the latest when we want to pipe another stream that doesn't follow our framing protocol). So, let's go for the latter...

const readableStream = makeReadableWebSocketStream('wss://example.com:443/', 'protocol');
const reader = readableStream.getReader();
let file = <some stream>;
let writer = file.writableStream.getWriter();

while (true) {
    await writer.ready;
    const chunk = await reader.read()
    if (chunk.done) {
        break;
    }
    const header = new DataView(chunk.value.buffer);
    const eof = header.getUint8(0) !== 0;
    writer.write(new Uint8Array(chunk.value.buffer, 1));
    if (eof) {
        await writer.close();
        file = <some stream>;
        writer = file.writableStream.getWriter();
    }
}
await writer.close();

(Edit: I fixed the example above to not await the call to .write(...) but rather await the .ready attribute on the writable stream writer.)

To recap what we've done right now: We have added a proprietary framing protocol on the application layer which essentially encapsulated multiple streams (one for each file and some metadata which is why we couldn't use .pipeTo anymore). This added complexity and required us to inspect the chunks because it is yet another protocol. It also required the sender to copy the header (or footer) into the buffer. It will have quite an impact on throughput (ping me if you want to know more, I've written some tests a while ago) and it was also unnecessary because the underlying transport already provided such a framing protocol we could have used for this exact purpose. It just wasn't doable with the existing API.

(I have completely ignored that this of course wouldn't work for data channels that aren't reliable and ordered. For this, you will need to add a more complicated framing protocol such as this.)

With the approach suggested in this issue carried over to the WebSocket API, it would be as simple as this...

const ws = new WebSocket('wss://example.com:443/', 'protocol');
ws.binaryType = 'stream';
ws.onmessage = async (event) => {
    // Note: Ignoring the string case here once again.
    const file = <some stream>;
    const readableStream = event.data;
    await readableStream.pipeTo(file.writableStream);
};

Was that helpful? 😃

@aboba
Copy link
Contributor

aboba commented Jan 24, 2018

@lgrahl What you are proposing is that a stream be encapsulated in an RTCDataChannel message. That would work for implementations that support large messages, but unless we were to require that all browsers support a large or infinite maxMessageSize, you'd still need to support mapping from an RTCDataChannel message to ReadableStream chunk.

@lgrahl
Copy link
Contributor Author

lgrahl commented Jan 25, 2018

@aboba I can't see why this wouldn't work with a message size limitation. Once the stream reaches a total size that would go above that limit, the stream could be aborted (which will probably mean the data channel will be closed). Alternatively, an exception could be raised for the writer's .write call, so the user is theoretically able to choose what to do (close early or abort the stream, really depends on whether this would break .pipeTo - maybe @domenic can comment on that).

So, I don't see maximum message size being a blocker here. But IIRC, the maximum message size limitation exists because people were concerned about stream monopolisation by large data channel messages. AFAIK, Randell's suggestion of using PPID-based fragmentation/reassembly has not been accepted by other parties and so we ended up with this limitation in the SDP and this bubbled up into this spec. With the SCTP ndata extension, stream monpolisation is not an issue anymore. With this API change, backpressure would not be a concern anymore. So, as far as I'm concerned, maximumMessageSize can go (well, it should stay in the API for legacy reasons but just say infinite) as it was planned, see this IETF discussion. If there is another reason why we have this, let me know.

@alvestrand
Copy link
Contributor

It seems that blobs contain the magic needed for connection of the API, so we don't have to change anything in the WebRTC API - we're already able to send blobs.
Now we need a demo.

@annevk
Copy link
Member

annevk commented Jan 26, 2018

Well, that is not entirely true. A blob can only be allocated once you have received the entire message (since you might encounter an error at some point). With a stream you could start processing bytes right away and error the stream the moment you encounter an error in the message. So streams would be a lower-level primitive on which you could do blobs.

@lgrahl
Copy link
Contributor Author

lgrahl commented Jan 26, 2018

@alvestrand What @annevk said.

Blob is too high level and will create backpressure on the receiver side because the data is gathered in RAM until it's complete. One can argue about the sender side but it very likely will also create backpressure there.

Streams on the other hand are able to handle backpressure, so the underlying transport can be paused and resumed. They are high-level when possible (using .pipeTo) and low-level when required (using reader/writer). Data can be handed out with the first bytes received. For example it would be easy to upload a large file efficiently using the fetch API to make a HTTP request coming from a data channel's message directly without having to store the data in RAM at once.

@alvestrand
Copy link
Contributor

ack. So it seems that we need a send() call on DataChannel that takes a WritableStream argument, and a form of the message event (which sends a MessageEvent, where the data type is Any) that allows us (probably with a new binaryType value) that generates a ReadableStream object when a message comes in.

These are new API surfaces. We can't add them by shim, although we can emulate them in a shim to some degree for small messages (by buffering the whole message before we hand it off to its recipient).
I think I understand what we need to do now.

@lgrahl
Copy link
Contributor Author

lgrahl commented Jan 30, 2018

A little nit: The WritableStream instance needs to be created by RTCDataChannel, so using .send(myWritableStream) wouldn't work. But take a look at the initial posting - that API proposal should work.

Okay, cool. So, I interpret this as positive feedback and will happily create a PR/draft for it.

@ricea
Copy link

ricea commented Jan 31, 2018

send() will actually need to take a ReadableStream argument, the same way Request does.

send() could return a WritableStream, but I assume that would break the API.

If your code wants a WritableStream you can use a TransformStream as an adaptor as this example demonstrates.

@stefhak
Copy link
Contributor

stefhak commented Jan 31, 2018

Regarding #1732 (comment): would we be able to support ReadableStreams without any API change (as this example does for WebSocket)?

@annevk
Copy link
Member

annevk commented Jan 31, 2018

No, the idea is to support streams for individual messages. As such they're strictly lower-level than anything the API currently supports as per #1732 (comment) and also OP.

@lgrahl
Copy link
Contributor Author

lgrahl commented Jan 31, 2018

@ricea Okay, if that works - even better because it's even more elegant. I'll join the WHATWG IRC so we can work out the details as you're the experts when it comes to streams.

@stefhak Sadly not. I explained why in #1732 (comment). (Edit: @annevk was faster 😄 )

@stefhak
Copy link
Contributor

stefhak commented Jan 31, 2018

@lgrahl sorry I'm having a bad day. Exactly what in #1732 (comment) can WebSocket do, but not the DataChannel (you can configure a data channel to be reliable and ordered AFAICU)? There must be something as this example shows a WebSocket sourced ReadableStream, but that is not possible using a WebTC DataChannel.

@lgrahl
Copy link
Contributor Author

lgrahl commented Jan 31, 2018

@stefhak I'm sorry but that comment is confusing the hell out of me. 😅 Can you rephrase it?

@annevk
Copy link
Member

annevk commented Jan 31, 2018

@stefhak you're focusing on representing a sequences of messages as a stream, whereas this proposal is about representing a single message as a stream.

@stefhak
Copy link
Contributor

stefhak commented Jan 31, 2018

@lgrahl sure :)
This example shows a WebSocket sourcing a ReadableStream.
I thought you should be able to do the same with a DataChannel, but understand from #1732 (comment) that it's not possible.
Why?

@lgrahl
Copy link
Contributor Author

lgrahl commented Jan 31, 2018

@stefhak Thanks! The main point is what @annevk just said. There are some other issues I've described in my answer towards Bernard (who brought up the same example), see #1732 (comment) (the links look all the same but point to different comments - maybe that's why you haven't seen it from my previous answer).

@aboba
Copy link
Contributor

aboba commented Jan 31, 2018

@annevk @lgrahl Is the goal to represent a single message as a stream or to allow a (large) stream to be carried in a single message? The latter would require browsers to support a much larger maxMessageSize than they do today.

@lgrahl
Copy link
Contributor Author

lgrahl commented Jan 31, 2018

@aboba The latter - represent a single message as a stream.

Edit: Actually, I don't see the difference between

represent a single message as a stream

and

allow a (large) stream to be carried in a single message

Can you elaborate what would be the distinction for you?

I've commented on maxMessageSize earlier: #1732 (comment) (2nd paragraph).

@alvestrand
Copy link
Contributor

Reminding everyone of the -rtcweb-transports draft:

For data transport over the WebRTC data channel
[I-D.ietf-rtcweb-data-channel], WebRTC endpoints MUST support SCTP
over DTLS over ICE. This encapsulation is specified in
[I-D.ietf-tsvwg-sctp-dtls-encaps]. Negotiation of this transport in
SDP is defined in [I-D.ietf-mmusic-sctp-sdp]. The SCTP extension for
NDATA, [I-D.ietf-tsvwg-sctp-ndata], MUST be supported.

@lgrahl
Copy link
Contributor Author

lgrahl commented Mar 13, 2018

FYI I'm planning to create a PR for this but it will take some time. :)

@jan-ivar
Copy link
Member

jan-ivar commented Oct 9, 2018

I've also been meaning to put together an extension spec for this, but have been occupied with other matters thus far. Hope to get to it at some point

@murillo128
Copy link

How this API would be applicable to unreliable DC?

My wild guess is that either when the SCTP transports reach the maxPacketLifeTime or maxRetransmits limits, an exception on the sender will be raised in the writable stream ready/send, and also on the receiver read stream when the abandon SACK is received? Would it be possible to abort\abandon an ongoing message?

Also, I assume that write/recv chunks boundaries are not preservesd(which is fine), right?

@lgrahl
Copy link
Contributor Author

lgrahl commented Jan 28, 2019

Your wild guess is correct. See the class definition for WritableStreamDefaultWriter.

Also, I assume that write/recv chunks boundaries are not preservesd(which is fine), right?

Yep.

@murillo128
Copy link

Async iteration has landed to the stream specs:
https://streams.spec.whatwg.org/#rs-asynciterator

So it could be possible to write the following code:

const dc = pc.createDataChannel(...);
dc.binaryType = 'stream';
// We only need one small buffer per channel
const buffer = new Uint8Array(myPreferredChunkSize);
let view = new DataView(buffer.buffer, 0);

// Note: The function is declared async!
dc.onmessage = async (event) => {
    for await (const chunk of event.data) {
       view = chunk.value; // We reclaim the buffer here
            // ... write the view's data to disk
    }
};`

@aboba
Copy link
Contributor

aboba commented Aug 13, 2019

Here is a recent proposal along these lines for WebSockets: https://groups.google.com/a/chromium.org/forum/#!topic/blink-dev/X7rWpAkMCyg

@aboba
Copy link
Contributor

aboba commented Aug 13, 2019

Moved to the WebRTC-NV Use Case repo: w3c/webrtc-nv-use-cases#44

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Development

No branches or pull requests

10 participants