Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream ReadableStream instances #51

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

phryneas
Copy link

@phryneas phryneas commented Nov 8, 2024

This allows for ReadableStream instances to be passed through the stream.

@@ -238,22 +271,24 @@ export function encode(
if (Array.isArray(id)) {
controller.enqueue(
textEncoder.encode(
`${TYPE_ERROR}${deferredId}:[["${TYPE_PREVIOUS_RESOLVED}",${id[0]}]]\n`
`${TYPE_PROMISE}${TYPE_ERROR}${deferredId}:[["${TYPE_PREVIOUS_RESOLVED}",${id[0]}]]\n`
Copy link
Author

@phryneas phryneas Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did change the existing transport format a bit here - this is not strictly necessary, but this way, we have less of an explosion of types & type cases, and a bit more consistency.

It went from
E5 to PE5 to signal that Promise 5 Errored.

Further down I introduced
TE5 to signal that sTream 5 Errored.

This "combination of characters" is kinda necessary as otherwise I would have to introduce another new signal for "stream done" as well as "stream errored" and re-use a few characters in ways that do not necessarily make sense

So instead, further down I introduce

  • T5 for "new value in sTream 5"
  • TO5 for "sTream 5 is dOne"
  • TE5 for "sTream 5 Errored"

if (done) {
controller.enqueue(
textEncoder.encode(
`${TYPE_STREAM}${TYPE_DONE}${streamId}:[]\n`
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
`${TYPE_STREAM}${TYPE_DONE}${streamId}:[]\n`
`${TYPE_STREAM}${TYPE_DONE}${streamId}\n`

This could be modified like this to save a few characters, but it seemed more consistent that way - I'm very open for input there :)

Comment on lines +146 to +155
const [left, right] = input.tee();
input.getReader = left.getReader.bind(left);
input.cancel = left.cancel.bind(left);
input.pipeThrough = left.pipeThrough.bind(left);
input.pipeTo = left.pipeTo.bind(left);
input.tee = left.tee.bind(left);
Object.defineProperty(input, "locked", {
get: () => left.locked,
});
streams[index] = right;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also be omitted if there were a consensus that encoding is a destructive action - React does so - but I honestly felt a bit more comfortable with this tee approach.

Suggested change
const [left, right] = input.tee();
input.getReader = left.getReader.bind(left);
input.cancel = left.cancel.bind(left);
input.pipeThrough = left.pipeThrough.bind(left);
input.pipeTo = left.pipeTo.bind(left);
input.tee = left.tee.bind(left);
Object.defineProperty(input, "locked", {
get: () => left.locked,
});
streams[index] = right;
streams[index] = input;

Comment on lines +655 to +656
const inputValue = await inputReader.read();
const decodedValue = await decodedReader.read();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests currently test the "non-destructive" approach with tee, which makes it possible to still consume the original input stream.

@@ -88,8 +91,10 @@ async function decodeDeferred(
const line = read.value;
switch (line[0]) {
case TYPE_PROMISE: {
const isError = line[1] === TYPE_ERROR;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case TYPE_ERROR: was extremely similar, and falls into the TYPE_PROMISE case now that I slightly changed the format, so I could delete that second case with a few minor adjustments.

await decoded.done;
});

test("streams in a stream", async () => {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this works now 🤣

@phryneas
Copy link
Author

phryneas commented Nov 8, 2024

Okay, I think I'm happy with this. Ready for review/discussion :)

@phryneas phryneas marked this pull request as ready for review November 8, 2024 14:49
@remorses
Copy link

remorses commented Dec 3, 2024

I would love to use this in Remix to use streaming loaders and actions, my use case is running a very slow loader while showing feedback to the user (like percentage completed)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants