Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib: implement various stream utils for webstreams #39517

Closed
wants to merge 2 commits into from
Closed

lib: implement various stream utils for webstreams #39517

wants to merge 2 commits into from

Conversation

ghost
Copy link

@ghost ghost commented Jul 25, 2021

Implements the finished, addAbortSignal, and pipeline functions for webstreams, and adds the documentation and tests for each function. The promisified versions of finished and pipeline are also added.

Refs: #39316

Implements the finished, addAbortSignal, and pipeline functions for
webstreams, and adds the documentation and tests for each function.
The promisified versions of finished and pipeline are also added.

Refs: #39316
@nodejs-github-bot nodejs-github-bot added lib / src Issues and PRs related to general changes in the lib or src directory. needs-ci PRs that need a full CI run. labels Jul 25, 2021

throw new ERR_INVALID_ARG_TYPE(
'stream',
'ReadableStream|WritableStream|TransformStream',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
'ReadableStream|WritableStream|TransformStream',
['ReadableStream', 'WritableStream', 'TransformStream'],

if (streams.some(cannotWriteToStream)) {
throw new ERR_INVALID_ARG_TYPE(
'streams',
'WritableStream|TransformStream',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
'WritableStream|TransformStream',
['WritableStream', 'TransformStream'],

if (streams.some(cannotReadFromStream)) {
throw new ERR_INVALID_ARG_TYPE(
'streams',
'ReadableStream|TransformStream',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
'ReadableStream|TransformStream',
['ReadableStream', 'TransformStream'],

@VoltrexKeyva
Copy link
Member

A lot of unrelated style changes in lib/internal/webstreams/readablestream.js?


function readableStreamAddFinishedCallback(stream, callback) {
assert(isReadableStream(stream));
stream[kState].finishedCallbacks.push(callback);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
stream[kState].finishedCallbacks.push(callback);
ArrayPrototypePush(stream[kState].finishedCallbacks, callback);

controller[kState].started = true;
writableStreamDealWithRejection(stream, error);
});
}

function writableStreamAddFinishedCallback(stream, callback) {
assert(isWritableStream(stream));
stream[kState].finishedCallbacks.push(callback);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
stream[kState].finishedCallbacks.push(callback);
ArrayPrototypePush(stream[kState].finishedCallbacks, callback);

Copy link
Contributor

@aduh95 aduh95 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code contains utility functions that are only used once (such as signalIsAborted, validateInputs, throwErrorIfNotWebstream, addAbortCallbackToSignal, etc.), which makes the code hard to follow. Could you try to remove them? Instead you can use comments to explain what the code is doing.

@@ -1267,6 +1270,135 @@ added: REPLACEME
-->

* Type: {WritableStream}
### `webstream.finished(stream, callback)`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
### `webstream.finished(stream, callback)`
### `webstream.finished(stream, callback)`

Comment on lines +1297 to +1298
```
The `finished` API also provides promise version:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
```
The `finished` API also provides promise version:
```
The `finished` API also provides promise version:

Copy link
Member

@ronag ronag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should just add support for web streams in existing stream functions. Not implement new ones.

@ronag ronag added stream Issues and PRs related to the stream subsystem. web streams and removed lib / src Issues and PRs related to general changes in the lib or src directory. labels Jul 26, 2021
@ghost
Copy link
Author

ghost commented Jul 26, 2021

A lot of unrelated style changes in lib/internal/webstreams/readablestream.js?

@VoltrexMaster The only formatting tool I used was eslint with the given config file in the repo, so I'm unsure as to why this has happened. Let me know if I'm missing something, but otherwise I'll see if I can sort this out.

@ghost
Copy link
Author

ghost commented Jul 26, 2021

We should just add support for web streams in existing stream functions. Not implement new ones.

@ronag Understood. Is it satisfactory to add a conditional flow in the existing functions which calls the functions added in this request?

validateInputs(signal, stream);

const onAbort = () => {
stream.abort(new AbortError());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abort doesn't exist on web streams? Do they?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only on WritableStream. On ReadableStream it needs to be cancel().

There is another fundamental problem with this, however... if the ReadableStream has a Reader, or the WritableStream has a Writer, their cancel() and abort() methods cannot be called directly... and there's no public API for knowing if either is locked. If they are, there's no public API for getting the Reader or the Writer and those have to be used for doing the canceling/aborting.

Instead, in order for us to do this, we need to rely on the internal functions and cannot rely on the public API. This also means that these will only work for the core implementations of web streams (userland implementations would not be supported).

image

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jasnell do you think this finished for webstreams is still a worthwhile development given that this is the case?

@ronag
Copy link
Member

ronag commented Jul 26, 2021

We should just add support for web streams in existing stream functions. Not implement new ones.

@ronag Understood. Is it satisfactory to add a conditional flow in the existing functions which calls the functions added in this request?

I think for addAbortSignal it's ok.

For pipeline I prefer #39519.

For finished I'm unsure. You are just converting it into a stream.Readable. Isn't there a better way? If that's all you do then you might as well do this as a one liner (if val is ReadableStream then val = val.readable) in stream.finished.

@@ -33,6 +33,9 @@ There are three primary types of objects
* `WritableStream` - Represents a destination for streaming data.
* `TransformStream` - Represents an algorithm for transforming streaming data.

Additionally, this module includes the utility functions
`pipeline()`, `finished()`, and `addAbortSignal()`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm -1 on having this also exported on stream/web. Just have the stream exports support the additional types.

const {
DOMException,
} = internalBinding('messaging');
const { DOMException } = internalBinding('messaging');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated style changes. Please leave the require statements formatted as they are.

@@ -216,7 +207,8 @@ class ReadableStream {
setupReadableByteStreamControllerFromSource(
this,
source,
extractHighWaterMark(highWaterMark, 0));
extractHighWaterMark(highWaterMark, 0)
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the unrelated style changes in here make this difficult to review.

*/
abort(reason = undefined) {
this.cancel(reason);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot add methods to the standard class.

*/
abort(reason = undefined) {
this?.readable?.abort?.(reason);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot add methods to the standard API

stream[kState].finishedCallbacks?.forEach((cb) => {
cb(stream[kState].storedError);
});
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the wrong approach. the WritableStream and ReadableStream APIs already have mechanisms for monitoring when each are closed via the closed promises on the Writer and Reader.

Copy link
Member

@jasnell jasnell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate the work here but this has quite a few reasons why it's the wrong approach.

First, there are way too many unrelated style changes that make it impossible to effectively review. All of the purely style changes should be removed.

Second, this adds non-standard methods to the ReadableStream and TransportStream APIs that we just can't add.

Third, the abort controller implementation fails to account for locked streams.

Fourth, this should modify the existing require('stream').pipeline|finished|addAbortSignal() methods in place rather than creating new versions of them, creating new exports off stream/web, or introducing the new stream/web/promises.

Fifth, the way this adds the finish callbacks is problematic and definitely not ideal.

Copy link
Member

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm -1 for landing this. Support for webstreams should be added to the utilities we already have.

Please always tag @nodejs/streams when discussing streams PRs.

@ghost
Copy link
Author

ghost commented Jul 26, 2021

Thanks for all the feedback and sorry for it being a bit of a mess! Looks like I have some work to do...

Seeing as though it'll be a good few changes, I'm going to close this PR for now.

@ghost ghost closed this Jul 29, 2021
@zwhitchcox
Copy link

zwhitchcox commented Aug 25, 2021

Any updates on this @sam-stanford ? Is this still in progress?

@ghost
Copy link
Author

ghost commented Aug 25, 2021

Any updates on this @sam-stanford ? Is this still in progress?

@zwhitchcox

Yep! Been away since I last took a look at this, so a little behind where is expected, but I'm making progress.

I'm still learning the ropes for node and open source as a whole, so if this is something which needs doing quickly, feel free to take the reigns :)

@zwhitchcox
Copy link

All right, I'll take a stab at it... Great work so far though!

This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs-ci PRs that need a full CI run. stream Issues and PRs related to the stream subsystem. web streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants