Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a realtime mode to ReadableStream.tee #1186

Open
youennf opened this issue Nov 16, 2021 · 5 comments
Open

Add a realtime mode to ReadableStream.tee #1186

youennf opened this issue Nov 16, 2021 · 5 comments

Comments

@youennf
Copy link
Contributor

youennf commented Nov 16, 2021

For realtime applications dealing with ReadableStreams of VideoFrames, it is very nice to be able to limit buffering between teed branches. This also ensures that both branches will receive the most recent video frame.
As discussed during today's WebRTC/WhatWG stream call, we could envision something like a tee({structuredClone: true, noBuffer: true}).

@MattiasBuelens
Copy link
Collaborator

In #1157 (comment), we discussed and prototyped a "synchronized" variant of tee(). That variant would only pull from the original stream if both branches are pulling, to make sure that both branches see the same chunks at the same time (by slowing down the faster reader).

If I understand correctly, this "realtime" variant is slightly different. It would pull from the original stream if at least one of the two branches is pulling, and only enqueue the received chunk to branches that are actively pulling. That way, we avoid overfilling the queue of the slower branch by "skipping" chunks.

What should happen if one branch is only slightly slower than the other? That is:

  1. Branch 1 starts pulling. tee() starts pulling from the original stream.
  2. We receive a chunk from the original stream. Only branch 1 is pulling, so we only enqueue it to that branch.
  3. Branch 2 starts pulling, but only slightly after we did the previous step.

I see two options:

  • Treat this as a new pull. Thus, tee() starts pulling the next chunk from the original stream, which will definitely go to branch 2. (If branch 1 manages to catch up in the meantime and also starts pulling, we'll also enqueue it to branch 1.)
  • Store the previous chunk, and enqueue it when branch 2 (eventually) starts pulling. This chunk can be overwritten when branch 1 pulls in more chunks in the meantime. However, if it doesn't get overwritten, then this previous chunk may be very old by the time branch 2 pulls it.

Or perhaps some combination of these two, with some sort of "max age" to decide between enqueuing the previous chunk or pulling a new chunk? 🤷

@youennf
Copy link
Contributor Author

youennf commented Nov 18, 2021

  • Store the previous chunk, and enqueue it when branch 2 (eventually) starts pulling. This chunk can be overwritten when branch 1 pulls in more chunks in the meantime.

My understanding was to go with this approach.

@ricea
Copy link
Collaborator

ricea commented Nov 18, 2021

  • Store the previous chunk, and enqueue it when branch 2 (eventually) starts pulling. This chunk can be overwritten when branch 1 pulls in more chunks in the meantime.

My understanding was to go with this approach.

This would imply that if structuredClone is true, we always need to clone the chunk. Which is a problem if explicit closure is needed.

@jan-ivar
Copy link
Contributor

  • Treat this as a new pull ... (If branch 1 manages to catch up in the meantime and also starts pulling, we'll also enqueue it to branch 1.)

Is "in the meantime" = anytime before a chunk newer than "new pull" is delivered to branch 2? (The pull promise is unreliable here imho)

If the underlying source's internal queue is not empty, this would seem to devolve into delivering different chunks to each branch, pulling at a rate faster than either of the branches would have individually. I don't think we ever want to steal chunks from the fast branch, even if upstream has backed up.

  • Store the previous chunk, and enqueue it when branch 2 (eventually) starts pulling. This chunk can be overwritten when branch 1 pulls in more chunks in the meantime.

This would appear to keep the pull rate = max(pull rate A, pull rate B), and expose mostly the same (cloned) frames, which seems desirable or at least easier to reason about.

This would imply that if structuredClone is true, we always need to clone the chunk. Which is a problem if explicit closure is needed.

Yes, though we need to solve that somehow anyway for when a stream is errored.

@MattiasBuelens
Copy link
Collaborator

  • Treat this as a new pull ... (If branch 1 manages to catch up in the meantime and also starts pulling, we'll also enqueue it to branch 1.)

Is "in the meantime" = anytime before a chunk newer than "new pull" is delivered to branch 2? (The pull promise is unreliable here imho)

Yes, that's what I meant.

If the underlying source's internal queue is not empty, this would seem to devolve into delivering different chunks to each branch, pulling at a rate faster than either of the branches would have individually. I don't think we ever want to steal chunks from the fast branch, even if upstream has backed up.

Agreed. We also definitely don't want to pull faster than the fastest branch, because that could get very bad if you do multiple tee()s in a "chain".

I probably should have thought a bit more about this option first, because now it sounds like an obviously bad idea. 😛

  • Store the previous chunk, and enqueue it when branch 2 (eventually) starts pulling. This chunk can be overwritten when branch 1 pulls in more chunks in the meantime.

This would appear to keep the pull rate = max(pull rate A, pull rate B), and expose mostly the same (cloned) frames, which seems desirable or at least easier to reason about.

Yes, this seems much better. 👍

This would imply that if structuredClone is true, we always need to clone the chunk. Which is a problem if explicit closure is needed.

Yes, though we need to solve that somehow anyway for when a stream is errored.

Yes, whatever solution we end up choosing for #1185 will also need to be integrated in tee(). We need to close the cloned chunk when it gets replaced by a newer one, or when the branch that would have received the clone gets cancelled.

We do have to be careful with how we design explicit closure. If we go with [Transferable, Closeable] as in #1187 (comment), it should be fine: we can use the same "close a chunk" algorithm inside ReadableStreamTee. If we go with something like UnderlyingSink.closeChunk(chunk), then we have to copy that algorithm over to the two branches. (Otherwise, we might run into problems if the original stream clears its algorithms before both branches are done.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

4 participants