Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib: implement various stream utils for webstreams #39517

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions doc/api/webstreams.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ There are three primary types of objects
* `WritableStream` - Represents a destination for streaming data.
* `TransformStream` - Represents an algorithm for transforming streaming data.

Additionally, this module includes the utility functions
`pipeline()`, `finished()`, and `addAbortSignal()`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm -1 on having this also exported on stream/web. Just have the stream exports support the additional types.


### Example `ReadableStream`

This example creates a simple `ReadableStream` that pushes the current
Expand Down Expand Up @@ -1267,6 +1270,135 @@ added: REPLACEME
-->

* Type: {WritableStream}
### `webstream.finished(stream, callback)`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
### `webstream.finished(stream, callback)`
### `webstream.finished(stream, callback)`


* `stream` {Stream} A ReadableStream, WritableStream, or TransformStream.
* `callback` {Function} A callback function that takes an optional error
argument.
* Returns: {Function} A cleanup function which removes all callbacks with
the given stream.

A function to get notified when a stream is no longer readable, writable
or has experienced an error or a premature close event.

```js
const { finished, ReadableStream } = require('stream/web');

const rs = new ReadableStream();

finished(rs, (err) => {
// This will be called when the stream finishes
if (err) {
console.error('Stream failed.', err);
} else {
console.log('Stream is done reading.');
}
});
```
The `finished` API also provides promise version:
Comment on lines +1297 to +1298
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
```
The `finished` API also provides promise version:
```
The `finished` API also provides promise version:


```js
const { finished, ReadableStream } = require('stream/web/promises');

const rs = new ReadableStream();

async function run() {
await finished(rs);
console.log('Stream is done reading.');
}

run().catch(console.error);
```

Unlike `stream.finished()`, `webstream.finished()` does not
leaves dangling event listeners after `callback` has been invoked.
However, to unify the behaviour of the two functions, a cleanup
function is returned by `webstream.finished()`. This cleanup
function removes **all** registered callbacks from the stream.

```js
const removeCallbacks = finished(rs, callback);
removeCallbacks(); // Callback will no longer call on finish
```

### `webstream.pipeline(source[, ...transforms], destination, callback)`
### `webstream.pipeline(streams, callback)`

* `streams` {Stream[]}
* `source` {Stream}
* `...transforms` {Stream}
* `destination` {Stream}
* `callback` {Function} Called when the pipeline is fully done.
* `err` {Error}
* `val` Resolved value of `Promise` returned by `destination`.

A module method to pipe between streams, forwarding errors and
properly cleaning up and provide a callback when the pipeline is complete.

```js
const { pipeline } = require('stream/web');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.

pipeline(
new ReadableStream(),
new TransformStream(),
new WritableStream(),
(err) => {
if (err) {
console.error('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
}
);
```

The `pipeline` API also provides a promise version.

```js
const {
ReadableStream,
WritableStream,
TransformStream,
} = require('stream/web');
const { pipeline } = require('stream/web/promises');

async function run() {
await pipeline(
new ReadableStream(),
new TransformStream(),
new WritableStream(),
);
console.log('Pipeline succeeded.');
}

run().catch(console.error);
```

### `webstream.addAbortSignal(signal, stream)`
* `signal` {AbortSignal} A signal representing possible cancellation
* `stream` {Stream} a stream to attach a signal to

Attaches an AbortSignal to a readable or writeable stream. This lets code
control stream destruction using an `AbortController`.

Calling `abort` on the `AbortController` corresponding to the passed
`AbortSignal` will abort the stream using the most appropriate functionality
for the stream.

```js
const { ReadableStream, addAbortSignal } = require('stream/web');

const controller = new AbortController();
addAbortSignal(
controller.signal,
new ReadableStream(),
);
// Later, abort the operation closing the stream
controller.abort();
```

[Streams]: stream.md
[WHATWG Streams Standard]: https://streams.spec.whatwg.org/
55 changes: 55 additions & 0 deletions lib/internal/webstreams/add-abort-signal.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
'use strict';

const { validateAbortSignal } = require('internal/validators');
const { isWebstream } = require('internal/webstreams/util');
const { finished } = require('internal/webstreams/finished');
const { AbortError, codes } = require('internal/errors');
const { ERR_INVALID_ARG_TYPE } = codes;

function addAbortSignal(signal, stream) {
validateInputs(signal, stream);

const onAbort = () => {
stream.abort(new AbortError());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abort doesn't exist on web streams? Do they?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only on WritableStream. On ReadableStream it needs to be cancel().

There is another fundamental problem with this, however... if the ReadableStream has a Reader, or the WritableStream has a Writer, their cancel() and abort() methods cannot be called directly... and there's no public API for knowing if either is locked. If they are, there's no public API for getting the Reader or the Writer and those have to be used for doing the canceling/aborting.

Instead, in order for us to do this, we need to rely on the internal functions and cannot rely on the public API. This also means that these will only work for the core implementations of web streams (userland implementations would not be supported).

image

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jasnell do you think this finished for webstreams is still a worthwhile development given that this is the case?

};

if (signalIsAborted(signal)) {
onAbort();
} else {
addAbortCallbackToSignal(signal, stream, onAbort);
}

return stream;
}

function validateInputs(signal, stream) {
validateAbortSignal(signal, 'signal');
throwErrorIfNotWebstream(stream);
}

function throwErrorIfNotWebstream(stream) {
if (isWebstream(stream)) {
return;
}

throw new ERR_INVALID_ARG_TYPE(
'stream',
'ReadableStream|WritableStream|TransformStream',
stream,
);
}

function signalIsAborted(signal) {
return signal.aborted;
}

function addAbortCallbackToSignal(signal, stream, callback) {
signal.addEventListener('abort', callback);
finished(stream, () => {
signal.removeEventListener('abort', callback);
});
}

module.exports = {
addAbortSignal
};
118 changes: 118 additions & 0 deletions lib/internal/webstreams/finished.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
'use strict';

const { validateFunction } = require('internal/validators');
const { isWebstream } = require('internal/webstreams/util');
const { isTransformStream } = require('internal/webstreams/transformstream');
const {
isWritableStreamClosed,
isWritableStreamErrored,
writableStreamAddFinishedCallback,
writableStreamRemoveAllFinishedCallbacks,
} = require('internal/webstreams/writablestream');
const {
isReadableStream,
isReadableStreamClosed,
isReadableStreamErrored,
readableStreamAddFinishedCallback,
readableStreamRemoveAllFinishedCallbacks,
} = require('internal/webstreams/readablestream');
const { once } = require('internal/util');
const { codes } = require('internal/errors');
const { ERR_INVALID_ARG_TYPE, ERR_MISSING_ARGS } = codes;

function finished(stream, callback) {
validateAndFixInputs(stream, callback);

if (isTransformStream(stream)) {
return finishedForTransformStream(stream, callback);
}

addCallbackToStream(stream, callback);
callCallbackIfStreamIsFinished(stream, callback);

return getCleanupFuncForStream(stream);
}

function validateAndFixInputs(stream, callback) {
throwErrorIfNullStream(stream);
callback = replaceWithNoOpIfNull(callback);
validateInputs(stream, callback);
return stream, once(callback);
}

function throwErrorIfNullStream(stream) {
if (!stream) {
throw new ERR_MISSING_ARGS('stream');
}
}

function replaceWithNoOpIfNull(callback) {
if (!callback) {
callback = () => {};
}
return callback;
}

function validateInputs(stream, callback) {
validateFunction(callback, 'callback');
throwErrorIfNotWebstream(stream);
}

function throwErrorIfNotWebstream(stream) {
if (isWebstream(stream)) {
return;
}

throw new ERR_INVALID_ARG_TYPE(
'stream',
'ReadableStream|WritableStream|TransformStream',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
'ReadableStream|WritableStream|TransformStream',
['ReadableStream', 'WritableStream', 'TransformStream'],

stream
);
}

function finishedForTransformStream(transformStream, callback) {
return finished(transformStream.readable, callback);
}

function addCallbackToStream(stream, callback) {
if (isReadableStream(stream)) {
readableStreamAddFinishedCallback(stream, callback);
return;
}
writableStreamAddFinishedCallback(stream, callback);
}

function callCallbackIfStreamIsFinished(stream, callback) {
if (isWebstreamFinished(stream)) {
callback();
}
}

function isWebstreamFinished(stream) {
if (isReadableStream(stream)) {
return isReadableStreamFinished(stream);
}
return isWritableStreamFinished(stream);
}

function isReadableStreamFinished(rs) {
return isReadableStreamClosed(rs) || isReadableStreamErrored(rs);
}

function isWritableStreamFinished(ws) {
return isWritableStreamClosed(ws) || isWritableStreamErrored(ws);
}

function getCleanupFuncForStream(stream) {
if (isReadableStream(stream)) {
return () => {
readableStreamRemoveAllFinishedCallbacks(stream);
};
}

return () => {
writableStreamRemoveAllFinishedCallbacks(stream);
};
}

module.exports = { finished };
Loading