From 520712b370c1e183a95b64d181c073143f6c5664 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 8 May 2023 11:47:29 +0200 Subject: [PATCH 1/6] Update to Node v18.16.0 Signed-off-by: Matteo Collina --- README.md | 4 +- build/files.mjs | 4 +- lib/internal/streams/add-abort-signal.js | 30 +- lib/internal/streams/compose.js | 143 +++++++--- lib/internal/streams/destroy.js | 19 +- lib/internal/streams/duplexify.js | 2 - lib/internal/streams/end-of-stream.js | 81 +++++- lib/internal/streams/operators.js | 25 +- lib/internal/streams/pipeline.js | 113 +++++++- lib/internal/streams/utils.js | 30 +- lib/internal/validators.js | 119 +++++++- lib/ours/primordials.js | 1 + lib/stream/promises.js | 11 +- test/common/fixtures.mjs | 18 +- test/common/index.js | 9 +- test/common/index.mjs | 17 +- test/fixtures/tz-version.txt | 1 + test/parallel/test-stream-asIndexedPairs.mjs | 100 +++---- test/parallel/test-stream-compose-operator.js | 133 +++++++++ test/parallel/test-stream-compose.js | 40 ++- test/parallel/test-stream-duplex-from.js | 32 ++- test/parallel/test-stream-finished.js | 17 ++ ...-stream-iterator-helpers-test262-tests.mjs | 159 +++++------ ...t-stream-pipeline-queued-end-in-destroy.js | 2 +- .../test-stream-pipeline-with-empty-string.js | 4 +- test/parallel/test-stream-promises.js | 67 +++++ test/parallel/test-stream-readable-unshift.js | 5 +- test/parallel/test-stream-some-find-every.mjs | 258 ++++++++---------- .../test-stream-writable-samecb-singletick.js | 2 +- test/parallel/test-stream2-transform.js | 21 ++ .../test-stream3-pipeline-async-iterator.js | 38 +++ 31 files changed, 1079 insertions(+), 426 deletions(-) create mode 100644 test/fixtures/tz-version.txt create mode 100644 test/parallel/test-stream-compose-operator.js create mode 100644 test/parallel/test-stream3-pipeline-async-iterator.js diff --git a/README.md b/README.md index 8f76a7729..b82a6f125 100644 --- a/README.md +++ b/README.md @@ -11,9 +11,9 @@ npm install readable-stream ``` -This package is a mirror of the streams implementations in Node.js 18.9.0. +This package is a mirror of the streams implementations in Node.js 18.16.0. -Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v18.9.0/docs/api/stream.html). +Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v18.16.0/docs/api/stream.html). If you want to guarantee a stable streams base, regardless of what version of Node you, or the users of your libraries are using, use **readable-stream** _only_ and avoid the _"stream"_ module in Node-core, for background see [this blogpost](http://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html). diff --git a/build/files.mjs b/build/files.mjs index f7d12c9d0..64fd2841d 100644 --- a/build/files.mjs +++ b/build/files.mjs @@ -25,7 +25,9 @@ export const skippedSources = [ 'test/parallel/test-stream-readable-async-iterators.js', 'test/parallel/test-stream-wrap-drain.js', 'test/parallel/test-stream-wrap-encoding.js', - 'test/parallel/test-stream-wrap.js' + 'test/parallel/test-stream-wrap.js', + 'test/parallel/test-stream-toWeb-allows-server-response.js', + 'test/parallel/test-readable-from-web-enqueue-then-close.js' ] export const aliases = {} diff --git a/lib/internal/streams/add-abort-signal.js b/lib/internal/streams/add-abort-signal.js index c6ba8b9c2..3a26a1d3e 100644 --- a/lib/internal/streams/add-abort-signal.js +++ b/lib/internal/streams/add-abort-signal.js @@ -1,6 +1,7 @@ 'use strict' const { AbortError, codes } = require('../../ours/errors') +const { isNodeStream, isWebStream, kControllerErrorFunction } = require('./utils') const eos = require('./end-of-stream') const { ERR_INVALID_ARG_TYPE } = codes @@ -12,13 +13,10 @@ const validateAbortSignal = (signal, name) => { throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal) } } -function isNodeStream(obj) { - return !!(obj && typeof obj.pipe === 'function') -} module.exports.addAbortSignal = function addAbortSignal(signal, stream) { validateAbortSignal(signal, 'signal') - if (!isNodeStream(stream)) { - throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream) + if (!isNodeStream(stream) && !isWebStream(stream)) { + throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream) } return module.exports.addAbortSignalNoValidate(signal, stream) } @@ -26,13 +24,21 @@ module.exports.addAbortSignalNoValidate = function (signal, stream) { if (typeof signal !== 'object' || !('aborted' in signal)) { return stream } - const onAbort = () => { - stream.destroy( - new AbortError(undefined, { - cause: signal.reason - }) - ) - } + const onAbort = isNodeStream(stream) + ? () => { + stream.destroy( + new AbortError(undefined, { + cause: signal.reason + }) + ) + } + : () => { + stream[kControllerErrorFunction]( + new AbortError(undefined, { + cause: signal.reason + }) + ) + } if (signal.aborted) { onAbort() } else { diff --git a/lib/internal/streams/compose.js b/lib/internal/streams/compose.js index 4a00aead8..f565c12ef 100644 --- a/lib/internal/streams/compose.js +++ b/lib/internal/streams/compose.js @@ -3,11 +3,20 @@ const { pipeline } = require('./pipeline') const Duplex = require('./duplex') const { destroyer } = require('./destroy') -const { isNodeStream, isReadable, isWritable } = require('./utils') +const { + isNodeStream, + isReadable, + isWritable, + isWebStream, + isTransformStream, + isWritableStream, + isReadableStream +} = require('./utils') const { AbortError, codes: { ERR_INVALID_ARG_VALUE, ERR_MISSING_ARGS } } = require('../../ours/errors') +const eos = require('./end-of-stream') module.exports = function compose(...streams) { if (streams.length === 0) { throw new ERR_MISSING_ARGS('streams') @@ -24,14 +33,17 @@ module.exports = function compose(...streams) { streams[idx] = Duplex.from(streams[idx]) } for (let n = 0; n < streams.length; ++n) { - if (!isNodeStream(streams[n])) { + if (!isNodeStream(streams[n]) && !isWebStream(streams[n])) { // TODO(ronag): Add checks for non streams. continue } - if (n < streams.length - 1 && !isReadable(streams[n])) { + if ( + n < streams.length - 1 && + !(isReadable(streams[n]) || isReadableStream(streams[n]) || isTransformStream(streams[n])) + ) { throw new ERR_INVALID_ARG_VALUE(`streams[${n}]`, orgStreams[n], 'must be readable') } - if (n > 0 && !isWritable(streams[n])) { + if (n > 0 && !(isWritable(streams[n]) || isWritableStream(streams[n]) || isTransformStream(streams[n]))) { throw new ERR_INVALID_ARG_VALUE(`streams[${n}]`, orgStreams[n], 'must be writable') } } @@ -53,8 +65,8 @@ module.exports = function compose(...streams) { } const head = streams[0] const tail = pipeline(streams, onfinished) - const writable = !!isWritable(head) - const readable = !!isReadable(tail) + const writable = !!(isWritable(head) || isWritableStream(head) || isTransformStream(head)) + const readable = !!(isReadable(tail) || isReadableStream(tail) || isTransformStream(tail)) // TODO(ronag): Avoid double buffering. // Implement Writable/Readable/Duplex traits. @@ -67,25 +79,49 @@ module.exports = function compose(...streams) { readable }) if (writable) { - d._write = function (chunk, encoding, callback) { - if (head.write(chunk, encoding)) { - callback() - } else { - ondrain = callback + if (isNodeStream(head)) { + d._write = function (chunk, encoding, callback) { + if (head.write(chunk, encoding)) { + callback() + } else { + ondrain = callback + } } - } - d._final = function (callback) { - head.end() - onfinish = callback - } - head.on('drain', function () { - if (ondrain) { - const cb = ondrain - ondrain = null - cb() + d._final = function (callback) { + head.end() + onfinish = callback } - }) - tail.on('finish', function () { + head.on('drain', function () { + if (ondrain) { + const cb = ondrain + ondrain = null + cb() + } + }) + } else if (isWebStream(head)) { + const writable = isTransformStream(head) ? head.writable : head + const writer = writable.getWriter() + d._write = async function (chunk, encoding, callback) { + try { + await writer.ready + writer.write(chunk).catch(() => {}) + callback() + } catch (err) { + callback(err) + } + } + d._final = async function (callback) { + try { + await writer.ready + writer.close().catch(() => {}) + onfinish = callback + } catch (err) { + callback(err) + } + } + } + const toRead = isTransformStream(tail) ? tail.readable : tail + eos(toRead, () => { if (onfinish) { const cb = onfinish onfinish = null @@ -94,25 +130,46 @@ module.exports = function compose(...streams) { }) } if (readable) { - tail.on('readable', function () { - if (onreadable) { - const cb = onreadable - onreadable = null - cb() - } - }) - tail.on('end', function () { - d.push(null) - }) - d._read = function () { - while (true) { - const buf = tail.read() - if (buf === null) { - onreadable = d._read - return + if (isNodeStream(tail)) { + tail.on('readable', function () { + if (onreadable) { + const cb = onreadable + onreadable = null + cb() + } + }) + tail.on('end', function () { + d.push(null) + }) + d._read = function () { + while (true) { + const buf = tail.read() + if (buf === null) { + onreadable = d._read + return + } + if (!d.push(buf)) { + return + } } - if (!d.push(buf)) { - return + } + } else if (isWebStream(tail)) { + const readable = isTransformStream(tail) ? tail.readable : tail + const reader = readable.getReader() + d._read = async function () { + while (true) { + try { + const { value, done } = await reader.read() + if (!d.push(value)) { + return + } + if (done) { + d.push(null) + return + } + } catch { + return + } } } } @@ -128,7 +185,9 @@ module.exports = function compose(...streams) { callback(err) } else { onclose = callback - destroyer(tail, err) + if (isNodeStream(tail)) { + destroyer(tail, err) + } } } return d diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 768f2d79d..db76c29f9 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -36,7 +36,7 @@ function destroy(err, cb) { const w = this._writableState // With duplex streams we use the writable side for state. const s = w || r - if ((w && w.destroyed) || (r && r.destroyed)) { + if ((w !== null && w !== undefined && w.destroyed) || (r !== null && r !== undefined && r.destroyed)) { if (typeof cb === 'function') { cb() } @@ -107,14 +107,14 @@ function emitCloseNT(self) { if (r) { r.closeEmitted = true } - if ((w && w.emitClose) || (r && r.emitClose)) { + if ((w !== null && w !== undefined && w.emitClose) || (r !== null && r !== undefined && r.emitClose)) { self.emit('close') } } function emitErrorNT(self, err) { const r = self._readableState const w = self._writableState - if ((w && w.errorEmitted) || (r && r.errorEmitted)) { + if ((w !== null && w !== undefined && w.errorEmitted) || (r !== null && r !== undefined && r.errorEmitted)) { return } if (w) { @@ -162,10 +162,11 @@ function errorOrDestroy(stream, err, sync) { const r = stream._readableState const w = stream._writableState - if ((w && w.destroyed) || (r && r.destroyed)) { + if ((w !== null && w !== undefined && w.destroyed) || (r !== null && r !== undefined && r.destroyed)) { return this } - if ((r && r.autoDestroy) || (w && w.autoDestroy)) stream.destroy(err) + if ((r !== null && r !== undefined && r.autoDestroy) || (w !== null && w !== undefined && w.autoDestroy)) + stream.destroy(err) else if (err) { // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 err.stack // eslint-disable-line no-unused-expressions @@ -228,16 +229,18 @@ function constructNT(stream) { } } try { - stream._construct(onConstruct) + stream._construct((err) => { + process.nextTick(onConstruct, err) + }) } catch (err) { - onConstruct(err) + process.nextTick(onConstruct, err) } } function emitConstructNT(stream) { stream.emit(kConstruct) } function isRequest(stream) { - return stream && stream.setHeader && typeof stream.abort === 'function' + return (stream === null || stream === undefined ? undefined : stream.setHeader) && typeof stream.abort === 'function' } function emitCloseLegacy(stream) { stream.emit('close') diff --git a/lib/internal/streams/duplexify.js b/lib/internal/streams/duplexify.js index 43300ddc8..599fb47ab 100644 --- a/lib/internal/streams/duplexify.js +++ b/lib/internal/streams/duplexify.js @@ -282,8 +282,6 @@ function _duplexify(pair) { cb(err) } else if (err) { d.destroy(err) - } else if (!readable && !writable) { - d.destroy() } } diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 57dbaa48a..043c9c4bd 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -10,20 +10,23 @@ const process = require('process/') const { AbortError, codes } = require('../../ours/errors') const { ERR_INVALID_ARG_TYPE, ERR_STREAM_PREMATURE_CLOSE } = codes const { kEmptyObject, once } = require('../../ours/util') -const { validateAbortSignal, validateFunction, validateObject } = require('../validators') -const { Promise } = require('../../ours/primordials') +const { validateAbortSignal, validateFunction, validateObject, validateBoolean } = require('../validators') +const { Promise, PromisePrototypeThen } = require('../../ours/primordials') const { isClosed, isReadable, isReadableNodeStream, + isReadableStream, isReadableFinished, isReadableErrored, isWritable, isWritableNodeStream, + isWritableStream, isWritableFinished, isWritableErrored, isNodeStream, - willEmitClose: _willEmitClose + willEmitClose: _willEmitClose, + kIsClosedPromise } = require('./utils') function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function' @@ -42,6 +45,12 @@ function eos(stream, options, callback) { validateFunction(callback, 'callback') validateAbortSignal(options.signal, 'options.signal') callback = once(callback) + if (isReadableStream(stream) || isWritableStream(stream)) { + return eosWeb(stream, options, callback) + } + if (!isNodeStream(stream)) { + throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream) + } const readable = (_options$readable = options.readable) !== null && _options$readable !== undefined ? _options$readable @@ -50,10 +59,6 @@ function eos(stream, options, callback) { (_options$writable = options.writable) !== null && _options$writable !== undefined ? _options$writable : isWritableNodeStream(stream) - if (!isNodeStream(stream)) { - // TODO: Webstreams. - throw new ERR_INVALID_ARG_TYPE('stream', 'Stream', stream) - } const wState = stream._writableState const rState = stream._readableState const onlegacyfinish = () => { @@ -117,6 +122,14 @@ function eos(stream, options, callback) { } callback.call(stream) } + const onclosed = () => { + closed = true + const errored = isWritableErrored(stream) || isReadableErrored(stream) + if (errored && typeof errored !== 'boolean') { + return callback.call(stream, errored) + } + callback.call(stream) + } const onrequest = () => { stream.req.on('finish', onfinish) } @@ -153,22 +166,22 @@ function eos(stream, options, callback) { (rState !== null && rState !== undefined && rState.errorEmitted) ) { if (!willEmitClose) { - process.nextTick(onclose) + process.nextTick(onclosed) } } else if ( !readable && (!willEmitClose || isReadable(stream)) && (writableFinished || isWritable(stream) === false) ) { - process.nextTick(onclose) + process.nextTick(onclosed) } else if ( !writable && (!willEmitClose || isWritable(stream)) && (readableFinished || isReadable(stream) === false) ) { - process.nextTick(onclose) + process.nextTick(onclosed) } else if (rState && stream.req && stream.aborted) { - process.nextTick(onclose) + process.nextTick(onclosed) } const cleanup = () => { callback = nop @@ -209,9 +222,53 @@ function eos(stream, options, callback) { } return cleanup } +function eosWeb(stream, options, callback) { + let isAborted = false + let abort = nop + if (options.signal) { + abort = () => { + isAborted = true + callback.call( + stream, + new AbortError(undefined, { + cause: options.signal.reason + }) + ) + } + if (options.signal.aborted) { + process.nextTick(abort) + } else { + const originalCallback = callback + callback = once((...args) => { + options.signal.removeEventListener('abort', abort) + originalCallback.apply(stream, args) + }) + options.signal.addEventListener('abort', abort) + } + } + const resolverFn = (...args) => { + if (!isAborted) { + process.nextTick(() => callback.apply(stream, args)) + } + } + PromisePrototypeThen(stream[kIsClosedPromise].promise, resolverFn, resolverFn) + return nop +} function finished(stream, opts) { + var _opts + let autoCleanup = false + if (opts === null) { + opts = kEmptyObject + } + if ((_opts = opts) !== null && _opts !== undefined && _opts.cleanup) { + validateBoolean(opts.cleanup, 'cleanup') + autoCleanup = opts.cleanup + } return new Promise((resolve, reject) => { - eos(stream, opts, (err) => { + const cleanup = eos(stream, opts, (err) => { + if (autoCleanup) { + cleanup() + } if (err) { reject(err) } else { diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 323a74a17..869cacb39 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -2,12 +2,15 @@ const AbortController = globalThis.AbortController || require('abort-controller').AbortController const { - codes: { ERR_INVALID_ARG_TYPE, ERR_MISSING_ARGS, ERR_OUT_OF_RANGE }, + codes: { ERR_INVALID_ARG_VALUE, ERR_INVALID_ARG_TYPE, ERR_MISSING_ARGS, ERR_OUT_OF_RANGE }, AbortError } = require('../../ours/errors') const { validateAbortSignal, validateInteger, validateObject } = require('../validators') const kWeakHandler = require('../../ours/primordials').Symbol('kWeak') const { finished } = require('./end-of-stream') +const staticCompose = require('./compose') +const { addAbortSignalNoValidate } = require('./add-abort-signal') +const { isWritable, isNodeStream } = require('./utils') const { ArrayPrototypePush, MathFloor, @@ -20,6 +23,23 @@ const { } = require('../../ours/primordials') const kEmpty = Symbol('kEmpty') const kEof = Symbol('kEof') +function compose(stream, options) { + if (options != null) { + validateObject(options, 'options') + } + if ((options === null || options === undefined ? undefined : options.signal) != null) { + validateAbortSignal(options.signal, 'options.signal') + } + if (isNodeStream(stream) && !isWritable(stream)) { + throw new ERR_INVALID_ARG_VALUE('stream', stream, 'must be writable') + } + const composedStream = staticCompose(this, stream) + if (options !== null && options !== undefined && options.signal) { + // Not validating as we already validated before + addAbortSignalNoValidate(options.signal, composedStream) + } + return composedStream +} function map(fn, options) { if (typeof fn !== 'function') { throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn) @@ -424,7 +444,8 @@ module.exports.streamReturningOperators = { filter, flatMap, map, - take + take, + compose } module.exports.promiseReturningOperators = { every, diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 016e96ee6..8393ba514 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -24,7 +24,16 @@ const { AbortError } = require('../../ours/errors') const { validateFunction, validateAbortSignal } = require('../validators') -const { isIterable, isReadable, isReadableNodeStream, isNodeStream } = require('./utils') +const { + isIterable, + isReadable, + isReadableNodeStream, + isNodeStream, + isTransformStream, + isWebStream, + isReadableStream, + isReadableEnded +} = require('./utils') const AbortController = globalThis.AbortController || require('abort-controller').AbortController let PassThrough let Readable @@ -74,7 +83,7 @@ async function* fromReadable(val) { } yield* Readable.prototype[SymbolAsyncIterator].call(val) } -async function pump(iterable, writable, finish, { end }) { +async function pumpToNode(iterable, writable, finish, { end }) { let error let onresolve = null const resume = (err) => { @@ -130,6 +139,31 @@ async function pump(iterable, writable, finish, { end }) { writable.off('drain', resume) } } +async function pumpToWeb(readable, writable, finish, { end }) { + if (isTransformStream(writable)) { + writable = writable.writable + } + // https://streams.spec.whatwg.org/#example-manual-write-with-backpressure + const writer = writable.getWriter() + try { + for await (const chunk of readable) { + await writer.ready + writer.write(chunk).catch(() => {}) + } + await writer.ready + if (end) { + await writer.close() + } + finish() + } catch (err) { + try { + await writer.abort(err) + finish(err) + } catch (err) { + finish(err) + } + } +} function pipeline(...streams) { return pipelineImpl(streams, once(popCallback(streams))) } @@ -215,13 +249,18 @@ function pipelineImpl(streams, callback, opts) { if (!isIterable(ret)) { throw new ERR_INVALID_RETURN_VALUE('Iterable, AsyncIterable or Stream', 'source', ret) } - } else if (isIterable(stream) || isReadableNodeStream(stream)) { + } else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) { ret = stream } else { ret = Duplex.from(stream) } } else if (typeof stream === 'function') { - ret = makeAsyncIterable(ret) + if (isTransformStream(ret)) { + var _ret + ret = makeAsyncIterable((_ret = ret) === null || _ret === undefined ? undefined : _ret.readable) + } else { + ret = makeAsyncIterable(ret) + } ret = stream(ret, { signal }) @@ -230,7 +269,7 @@ function pipelineImpl(streams, callback, opts) { throw new ERR_INVALID_RETURN_VALUE('AsyncIterable', `transform[${i - 1}]`, ret) } } else { - var _ret + var _ret2 if (!PassThrough) { PassThrough = require('./passthrough') } @@ -246,7 +285,7 @@ function pipelineImpl(streams, callback, opts) { // Handle Promises/A+ spec, `then` could be a getter that throws on // second use. - const then = (_ret = ret) === null || _ret === undefined ? undefined : _ret.then + const then = (_ret2 = ret) === null || _ret2 === undefined ? undefined : _ret2.then if (typeof then === 'function') { finishCount++ then.call( @@ -268,7 +307,13 @@ function pipelineImpl(streams, callback, opts) { ) } else if (isIterable(ret, true)) { finishCount++ - pump(ret, pt, finish, { + pumpToNode(ret, pt, finish, { + end + }) + } else if (isReadableStream(ret) || isTransformStream(ret)) { + const toRead = ret.readable || ret + finishCount++ + pumpToNode(toRead, pt, finish, { end }) } else { @@ -290,13 +335,47 @@ function pipelineImpl(streams, callback, opts) { if (isReadable(stream) && isLastStream) { lastStreamCleanup.push(cleanup) } + } else if (isTransformStream(ret) || isReadableStream(ret)) { + const toRead = ret.readable || ret + finishCount++ + pumpToNode(toRead, stream, finish, { + end + }) } else if (isIterable(ret)) { finishCount++ - pump(ret, stream, finish, { + pumpToNode(ret, stream, finish, { end }) } else { - throw new ERR_INVALID_ARG_TYPE('val', ['Readable', 'Iterable', 'AsyncIterable'], ret) + throw new ERR_INVALID_ARG_TYPE( + 'val', + ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], + ret + ) + } + ret = stream + } else if (isWebStream(stream)) { + if (isReadableNodeStream(ret)) { + finishCount++ + pumpToWeb(makeAsyncIterable(ret), stream, finish, { + end + }) + } else if (isReadableStream(ret) || isIterable(ret)) { + finishCount++ + pumpToWeb(ret, stream, finish, { + end + }) + } else if (isTransformStream(ret)) { + finishCount++ + pumpToWeb(ret.readable, stream, finish, { + end + }) + } else { + throw new ERR_INVALID_ARG_TYPE( + 'val', + ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], + ret + ) } ret = stream } else { @@ -320,16 +399,24 @@ function pipe(src, dst, finish, { end }) { } }) src.pipe(dst, { - end - }) + end: false + }) // If end is true we already will have a listener to end dst. + if (end) { // Compat. Before node v10.12.0 stdio used to throw an error so // pipe() did/does not end() stdio destinations. // Now they allow it but "secretly" don't close the underlying fd. - src.once('end', () => { + + function endFn() { ended = true dst.end() - }) + } + if (isReadableEnded(src)) { + // End the destination if the source has already ended. + process.nextTick(endFn) + } else { + src.once('end', endFn) + } } else { finish() } diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index f87e9fe68..e589ad96c 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -1,10 +1,12 @@ 'use strict' -const { Symbol, SymbolAsyncIterator, SymbolIterator } = require('../../ours/primordials') +const { Symbol, SymbolAsyncIterator, SymbolIterator, SymbolFor } = require('../../ours/primordials') const kDestroyed = Symbol('kDestroyed') const kIsErrored = Symbol('kIsErrored') const kIsReadable = Symbol('kIsReadable') const kIsDisturbed = Symbol('kIsDisturbed') +const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise') +const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction') function isReadableNodeStream(obj, strict = false) { var _obj$_readableState return !!( @@ -56,6 +58,24 @@ function isNodeStream(obj) { (typeof obj.pipe === 'function' && typeof obj.on === 'function')) ) } +function isReadableStream(obj) { + return !!( + obj && + !isNodeStream(obj) && + typeof obj.pipeThrough === 'function' && + typeof obj.getReader === 'function' && + typeof obj.cancel === 'function' + ) +} +function isWritableStream(obj) { + return !!(obj && !isNodeStream(obj) && typeof obj.getWriter === 'function' && typeof obj.abort === 'function') +} +function isTransformStream(obj) { + return !!(obj && !isNodeStream(obj) && typeof obj.readable === 'object' && typeof obj.writable === 'object') +} +function isWebStream(obj) { + return isReadableStream(obj) || isWritableStream(obj) || isTransformStream(obj) +} function isIterable(obj, isAsync) { if (obj == null) return false if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function' @@ -274,22 +294,28 @@ module.exports = { kIsErrored, isReadable, kIsReadable, + kIsClosedPromise, + kControllerErrorFunction, isClosed, isDestroyed, isDuplexNodeStream, isFinished, isIterable, isReadableNodeStream, + isReadableStream, isReadableEnded, isReadableFinished, isReadableErrored, isNodeStream, + isWebStream, isWritable, isWritableNodeStream, + isWritableStream, isWritableEnded, isWritableFinished, isWritableErrored, isServerRequest, isServerResponse, - willEmitClose + willEmitClose, + isTransformStream } diff --git a/lib/internal/validators.js b/lib/internal/validators.js index f9e6e5559..85b2e9cd5 100644 --- a/lib/internal/validators.js +++ b/lib/internal/validators.js @@ -1,3 +1,5 @@ +/* eslint jsdoc/require-jsdoc: "error" */ + 'use strict' const { @@ -199,6 +201,13 @@ const validateOneOf = hideStackFrames((value, name, oneOf) => { function validateBoolean(value, name) { if (typeof value !== 'boolean') throw new ERR_INVALID_ARG_TYPE(name, 'boolean', value) } + +/** + * @param {any} options + * @param {string} key + * @param {boolean} defaultValue + * @returns {boolean} + */ function getOwnPropertyValueOrDefault(options, key, defaultValue) { return options == null || !ObjectPrototypeHasOwnProperty(options, key) ? defaultValue : options[key] } @@ -228,6 +237,24 @@ const validateObject = hideStackFrames((value, name, options = null) => { } }) +/** + * @callback validateDictionary - We are using the Web IDL Standard definition + * of "dictionary" here, which means any value + * whose Type is either Undefined, Null, or + * Object (which includes functions). + * @param {*} value + * @param {string} name + * @see https://webidl.spec.whatwg.org/#es-dictionary + * @see https://tc39.es/ecma262/#table-typeof-operator-results + */ + +/** @type {validateDictionary} */ +const validateDictionary = hideStackFrames((value, name) => { + if (value != null && typeof value !== 'object' && typeof value !== 'function') { + throw new ERR_INVALID_ARG_TYPE(name, 'a dictionary', value) + } +}) + /** * @callback validateArray * @param {*} value @@ -247,7 +274,36 @@ const validateArray = hideStackFrames((value, name, minLength = 0) => { } }) -// eslint-disable-next-line jsdoc/require-returns-check +/** + * @callback validateStringArray + * @param {*} value + * @param {string} name + * @returns {asserts value is string[]} + */ + +/** @type {validateStringArray} */ +function validateStringArray(value, name) { + validateArray(value, name) + for (let i = 0; i < value.length; i++) { + validateString(value[i], `${name}[${i}]`) + } +} + +/** + * @callback validateBooleanArray + * @param {*} value + * @param {string} name + * @returns {asserts value is boolean[]} + */ + +/** @type {validateBooleanArray} */ +function validateBooleanArray(value, name) { + validateArray(value, name) + for (let i = 0; i < value.length; i++) { + validateBoolean(value[i], `${name}[${i}]`) + } +} + /** * @param {*} signal * @param {string} [name='signal'] @@ -370,13 +426,71 @@ function validateUnion(value, name, union) { throw new ERR_INVALID_ARG_TYPE(name, `('${ArrayPrototypeJoin(union, '|')}')`, value) } } + +/* + The rules for the Link header field are described here: + https://www.rfc-editor.org/rfc/rfc8288.html#section-3 + + This regex validates any string surrounded by angle brackets + (not necessarily a valid URI reference) followed by zero or more + link-params separated by semicolons. +*/ +const linkValueRegExp = /^(?:<[^>]*>)(?:\s*;\s*[^;"\s]+(?:=(")?[^;"\s]*\1)?)*$/ + +/** + * @param {any} value + * @param {string} name + */ +function validateLinkHeaderFormat(value, name) { + if (typeof value === 'undefined' || !RegExpPrototypeExec(linkValueRegExp, value)) { + throw new ERR_INVALID_ARG_VALUE( + name, + value, + 'must be an array or string of format "; rel=preload; as=style"' + ) + } +} + +/** + * @param {any} hints + * @return {string} + */ +function validateLinkHeaderValue(hints) { + if (typeof hints === 'string') { + validateLinkHeaderFormat(hints, 'hints') + return hints + } else if (ArrayIsArray(hints)) { + const hintsLength = hints.length + let result = '' + if (hintsLength === 0) { + return result + } + for (let i = 0; i < hintsLength; i++) { + const link = hints[i] + validateLinkHeaderFormat(link, 'hints') + result += link + if (i !== hintsLength - 1) { + result += ', ' + } + } + return result + } + throw new ERR_INVALID_ARG_VALUE( + 'hints', + hints, + 'must be an array or string of format "; rel=preload; as=style"' + ) +} module.exports = { isInt32, isUint32, parseFileMode, validateArray, + validateStringArray, + validateBooleanArray, validateBoolean, validateBuffer, + validateDictionary, validateEncoding, validateFunction, validateInt32, @@ -391,5 +505,6 @@ module.exports = { validateUint32, validateUndefined, validateUnion, - validateAbortSignal + validateAbortSignal, + validateLinkHeaderValue } diff --git a/lib/ours/primordials.js b/lib/ours/primordials.js index 6a98b0168..9464cc7fe 100644 --- a/lib/ours/primordials.js +++ b/lib/ours/primordials.js @@ -90,6 +90,7 @@ module.exports = { return self.trim() }, Symbol, + SymbolFor: Symbol.for, SymbolAsyncIterator: Symbol.asyncIterator, SymbolHasInstance: Symbol.hasInstance, SymbolIterator: Symbol.iterator, diff --git a/lib/stream/promises.js b/lib/stream/promises.js index d44dd8ad0..b85c51f47 100644 --- a/lib/stream/promises.js +++ b/lib/stream/promises.js @@ -1,15 +1,22 @@ 'use strict' const { ArrayPrototypePop, Promise } = require('../ours/primordials') -const { isIterable, isNodeStream } = require('../internal/streams/utils') +const { isIterable, isNodeStream, isWebStream } = require('../internal/streams/utils') const { pipelineImpl: pl } = require('../internal/streams/pipeline') const { finished } = require('../internal/streams/end-of-stream') +require('stream') function pipeline(...streams) { return new Promise((resolve, reject) => { let signal let end const lastArg = streams[streams.length - 1] - if (lastArg && typeof lastArg === 'object' && !isNodeStream(lastArg) && !isIterable(lastArg)) { + if ( + lastArg && + typeof lastArg === 'object' && + !isNodeStream(lastArg) && + !isIterable(lastArg) && + !isWebStream(lastArg) + ) { const options = ArrayPrototypePop(streams) signal = options.signal end = options.end diff --git a/test/common/fixtures.mjs b/test/common/fixtures.mjs index 372fabf88..d6f7f6c09 100644 --- a/test/common/fixtures.mjs +++ b/test/common/fixtures.mjs @@ -1,5 +1,17 @@ -import fixtures from './fixtures.js' +import fixtures from './fixtures.js'; -const { fixturesDir, path, fileURL, readSync, readKey } = fixtures +const { + fixturesDir, + path, + fileURL, + readSync, + readKey, +} = fixtures; -export { fixturesDir, path, fileURL, readSync, readKey } +export { + fixturesDir, + path, + fileURL, + readSync, + readKey, +}; diff --git a/test/common/index.js b/test/common/index.js index 5491b5666..9c949a171 100644 --- a/test/common/index.js +++ b/test/common/index.js @@ -33,7 +33,7 @@ const path = require('path') const { inspect } = require('util') const { isMainThread } = require('worker_threads') const tmpdir = require('./tmpdir') -const bits = ['arm64', 'mips', 'mipsel', 'ppc64', 'riscv64', 's390x', 'x64'].includes(process.arch) ? 64 : 32 +const bits = ['arm64', 'loong64', 'mips', 'mipsel', 'ppc64', 'riscv64', 's390x', 'x64'].includes(process.arch) ? 64 : 32 const hasIntl = !!process.config.variables.v8_enable_i18n_support const { atob, btoa } = require('buffer') @@ -110,6 +110,7 @@ const isFreeBSD = process.platform === 'freebsd' const isOpenBSD = process.platform === 'openbsd' const isLinux = process.platform === 'linux' const isOSX = process.platform === 'darwin' +const isAsan = process.env.ASAN !== undefined const isPi = (() => { try { var _exec @@ -119,10 +120,11 @@ const isPi = (() => { const cpuinfo = fs.readFileSync('/proc/cpuinfo', { encoding: 'utf8' }) - return ( + const ok = ((_exec = /^Hardware\s*:\s*(.*)$/im.exec(cpuinfo)) === null || _exec === undefined ? undefined : _exec[1]) === 'BCM2835' - ) + ;/^/.test('') // Clear RegExp.$_, some tests expect it to be empty. + return ok } catch { return false } @@ -826,6 +828,7 @@ const common = { invalidArgTypeHelper, isAIX, isAlive, + isAsan, isDumbTerminal, isFreeBSD, isLinux, diff --git a/test/common/index.mjs b/test/common/index.mjs index 41a63bce2..e77b1b298 100644 --- a/test/common/index.mjs +++ b/test/common/index.mjs @@ -1,7 +1,7 @@ -import { createRequire } from 'module' +import { createRequire } from 'module'; -const require = createRequire(import.meta.url) -const common = require('./index.js') +const require = createRequire(import.meta.url); +const common = require('./index.js'); const { isMainThread, @@ -49,8 +49,10 @@ const { getBufferSources, getTTYfd, runWithInvalidFD, - spawnPromisified -} = common + spawnPromisified, +} = common; + +const getPort = () => common.PORT; export { isMainThread, @@ -99,5 +101,6 @@ export { getTTYfd, runWithInvalidFD, createRequire, - spawnPromisified -} + spawnPromisified, + getPort, +}; diff --git a/test/fixtures/tz-version.txt b/test/fixtures/tz-version.txt new file mode 100644 index 000000000..b74fa117a --- /dev/null +++ b/test/fixtures/tz-version.txt @@ -0,0 +1 @@ +2022g diff --git a/test/parallel/test-stream-asIndexedPairs.mjs b/test/parallel/test-stream-asIndexedPairs.mjs index 35919114a..a103920ee 100644 --- a/test/parallel/test-stream-asIndexedPairs.mjs +++ b/test/parallel/test-stream-asIndexedPairs.mjs @@ -1,82 +1,64 @@ -import '../common/index.mjs' -import { Readable } from '../../lib/ours/index.js' -import { deepStrictEqual, rejects, throws } from 'assert' -import tap from 'tap' +import '../common/index.mjs'; +import { Readable }from '../../lib/ours/index.js'; +import { deepStrictEqual, rejects, throws } from 'assert'; +import tap from 'tap'; { // asIndexedPairs with a synchronous stream - const pairs = await Readable.from([1, 2, 3]).asIndexedPairs().toArray() - deepStrictEqual(pairs, [ - [0, 1], - [1, 2], - [2, 3] - ]) - const empty = await Readable.from([]).asIndexedPairs().toArray() - deepStrictEqual(empty, []) + const pairs = await Readable.from([1, 2, 3]).asIndexedPairs().toArray(); + deepStrictEqual(pairs, [[0, 1], [1, 2], [2, 3]]); + const empty = await Readable.from([]).asIndexedPairs().toArray(); + deepStrictEqual(empty, []); } { // asIndexedPairs works an asynchronous streams - const asyncFrom = (...args) => Readable.from(...args).map(async (x) => x) - const pairs = await asyncFrom([1, 2, 3]).asIndexedPairs().toArray() - deepStrictEqual(pairs, [ - [0, 1], - [1, 2], - [2, 3] - ]) - const empty = await asyncFrom([]).asIndexedPairs().toArray() - deepStrictEqual(empty, []) + const asyncFrom = (...args) => Readable.from(...args).map(async (x) => x); + const pairs = await asyncFrom([1, 2, 3]).asIndexedPairs().toArray(); + deepStrictEqual(pairs, [[0, 1], [1, 2], [2, 3]]); + const empty = await asyncFrom([]).asIndexedPairs().toArray(); + deepStrictEqual(empty, []); } { // Does not enumerate an infinite stream - const infinite = () => - Readable.from( - (async function* () { - while (true) yield 1 - })() - ) - const pairs = await infinite().asIndexedPairs().take(3).toArray() - deepStrictEqual(pairs, [ - [0, 1], - [1, 1], - [2, 1] - ]) - const empty = await infinite().asIndexedPairs().take(0).toArray() - deepStrictEqual(empty, []) + const infinite = () => Readable.from(async function* () { + while (true) yield 1; + }()); + const pairs = await infinite().asIndexedPairs().take(3).toArray(); + deepStrictEqual(pairs, [[0, 1], [1, 1], [2, 1]]); + const empty = await infinite().asIndexedPairs().take(0).toArray(); + deepStrictEqual(empty, []); } { // AbortSignal - await rejects( - async () => { - const ac = new AbortController() - const { signal } = ac - const p = Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray() - ac.abort() - await p - }, - { name: 'AbortError' } - ) + await rejects(async () => { + const ac = new AbortController(); + const { signal } = ac; + const p = Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray(); + ac.abort(); + await p; + }, { name: 'AbortError' }); await rejects(async () => { - const signal = AbortSignal.abort() - await Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray() - }, /AbortError/) + const signal = AbortSignal.abort(); + await Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray(); + }, /AbortError/); } { // Error cases - throws(() => Readable.from([1]).asIndexedPairs(1), /ERR_INVALID_ARG_TYPE/) - throws(() => Readable.from([1]).asIndexedPairs({ signal: true }), /ERR_INVALID_ARG_TYPE/) + throws(() => Readable.from([1]).asIndexedPairs(1), /ERR_INVALID_ARG_TYPE/); + throws(() => Readable.from([1]).asIndexedPairs({ signal: true }), /ERR_INVALID_ARG_TYPE/); } -/* replacement start */ -process.on('beforeExit', (code) => { - if (code === 0) { - tap.pass('test succeeded') - } else { - tap.fail(`test failed - exited code ${code}`) - } -}) -/* replacement end */ + /* replacement start */ + process.on('beforeExit', (code) => { + if(code === 0) { + tap.pass('test succeeded'); + } else { + tap.fail(`test failed - exited code ${code}`); + } + }); + /* replacement end */ diff --git a/test/parallel/test-stream-compose-operator.js b/test/parallel/test-stream-compose-operator.js new file mode 100644 index 000000000..d5def4583 --- /dev/null +++ b/test/parallel/test-stream-compose-operator.js @@ -0,0 +1,133 @@ +'use strict' + +const tap = require('tap') +const silentConsole = { + log() {}, + error() {} +} +const common = require('../common') +const { Readable, Transform } = require('../../lib/ours/index') +const assert = require('assert') +{ + // with async generator + const stream = Readable.from(['a', 'b', 'c', 'd']).compose(async function* (stream) { + let str = '' + for await (const chunk of stream) { + str += chunk + if (str.length === 2) { + yield str + str = '' + } + } + }) + const result = ['ab', 'cd'] + ;(async () => { + for await (const item of stream) { + assert.strictEqual(item, result.shift()) + } + })().then(common.mustCall()) +} +{ + // With Transformer + const stream = Readable.from(['a', 'b', 'c', 'd']).compose( + new Transform({ + objectMode: true, + transform: common.mustCall((chunk, encoding, callback) => { + callback(null, chunk) + }, 4) + }) + ) + const result = ['a', 'b', 'c', 'd'] + ;(async () => { + for await (const item of stream) { + assert.strictEqual(item, result.shift()) + } + })().then(common.mustCall()) +} +{ + // Throwing an error during `compose` (before waiting for data) + const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function* (stream) { + // eslint-disable-line require-yield + + throw new Error('boom') + }) + assert + .rejects(async () => { + for await (const item of stream) { + assert.fail('should not reach here, got ' + item) + } + }, /boom/) + .then(common.mustCall()) +} +{ + // Throwing an error during `compose` (when waiting for data) + const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function* (stream) { + for await (const chunk of stream) { + if (chunk === 3) { + throw new Error('boom') + } + yield chunk + } + }) + assert.rejects(stream.toArray(), /boom/).then(common.mustCall()) +} +{ + // Throwing an error during `compose` (after finishing all readable data) + const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function* (stream) { + // eslint-disable-line require-yield + + // eslint-disable-next-line no-unused-vars,no-empty + for await (const chunk of stream) { + } + throw new Error('boom') + }) + assert.rejects(stream.toArray(), /boom/).then(common.mustCall()) +} +{ + // AbortSignal + const ac = new AbortController() + const stream = Readable.from([1, 2, 3, 4, 5]).compose( + async function* (source) { + // Should not reach here + for await (const chunk of source) { + yield chunk + } + }, + { + signal: ac.signal + } + ) + ac.abort() + assert + .rejects( + async () => { + for await (const item of stream) { + assert.fail('should not reach here, got ' + item) + } + }, + { + name: 'AbortError' + } + ) + .then(common.mustCall()) +} +{ + assert.throws(() => Readable.from(['a']).compose(Readable.from(['b'])), { + code: 'ERR_INVALID_ARG_VALUE' + }) +} +{ + assert.throws(() => Readable.from(['a']).compose(), { + code: 'ERR_INVALID_ARG_TYPE' + }) +} + +/* replacement start */ +process.on('beforeExit', (code) => { + if (code === 0) { + tap.pass('test succeeded') + } else { + tap.fail(`test failed - exited code ${code}`) + } +}) +/* replacement end */ diff --git a/test/parallel/test-stream-compose.js b/test/parallel/test-stream-compose.js index b6b739ac7..85ec5134e 100644 --- a/test/parallel/test-stream-compose.js +++ b/test/parallel/test-stream-compose.js @@ -407,31 +407,29 @@ const assert = require('assert') ) } { - try { - compose() - } catch (err) { - assert.strictEqual(err.code, 'ERR_MISSING_ARGS') - } + assert.throws(() => compose(), { + code: 'ERR_MISSING_ARGS' + }) } { - try { - compose(new Writable(), new PassThrough()) - } catch (err) { - assert.strictEqual(err.code, 'ERR_INVALID_ARG_VALUE') - } + assert.throws(() => compose(new Writable(), new PassThrough()), { + code: 'ERR_INVALID_ARG_VALUE' + }) } { - try { - compose( - new PassThrough(), - new Readable({ - read() {} - }), - new PassThrough() - ) - } catch (err) { - assert.strictEqual(err.code, 'ERR_INVALID_ARG_VALUE') - } + assert.throws( + () => + compose( + new PassThrough(), + new Readable({ + read() {} + }), + new PassThrough() + ), + { + code: 'ERR_INVALID_ARG_VALUE' + } + ) } { let buf = '' diff --git a/test/parallel/test-stream-duplex-from.js b/test/parallel/test-stream-duplex-from.js index b527bd9c0..83ce0e82e 100644 --- a/test/parallel/test-stream-duplex-from.js +++ b/test/parallel/test-stream-duplex-from.js @@ -7,7 +7,7 @@ const silentConsole = { } const common = require('../common') const assert = require('assert') -const { Duplex, Readable, Writable, pipeline } = require('../../lib/ours/index') +const { Duplex, Readable, Writable, pipeline, PassThrough } = require('../../lib/ours/index') const Blob = globalThis.Blob || require('buffer').Blob { const d = Duplex.from({ @@ -178,7 +178,7 @@ const Blob = globalThis.Blob || require('buffer').Blob } assert.strictEqual(ret, 'abcdefghi') }, - common.mustCall(() => {}) + common.mustSucceed() ) } @@ -331,6 +331,34 @@ if (typeof Blob !== 'undefined') { ) duplex.write('test') } +{ + const through = new PassThrough({ + objectMode: true + }) + let res = '' + const d = Readable.from(['foo', 'bar'], { + objectMode: true + }).pipe( + Duplex.from({ + writable: through, + readable: through + }) + ) + d.on('data', (data) => { + d.pause() + setImmediate(() => { + d.resume() + }) + res += data + }) + .on( + 'end', + common.mustCall(() => { + assert.strictEqual(res, 'foobar') + }) + ) + .on('close', common.mustCall()) +} /* replacement start */ process.on('beforeExit', (code) => { diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index 3ee5d328c..26f286af5 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -744,6 +744,23 @@ testClosed( .end() }) } +{ + const stream = new Duplex({ + write(chunk, enc, cb) { + setImmediate(cb) + } + }) + stream.end('foo') + finished( + stream, + { + readable: false + }, + common.mustCall((err) => { + assert(!err) + }) + ) +} /* replacement start */ process.on('beforeExit', (code) => { diff --git a/test/parallel/test-stream-iterator-helpers-test262-tests.mjs b/test/parallel/test-stream-iterator-helpers-test262-tests.mjs index 8231f80ce..9f09abeab 100644 --- a/test/parallel/test-stream-iterator-helpers-test262-tests.mjs +++ b/test/parallel/test-stream-iterator-helpers-test262-tests.mjs @@ -1,7 +1,7 @@ -import { mustCall } from '../common/index.mjs' -import { Readable } from '../../lib/ours/index.js' -import assert from 'assert' -import tap from 'tap' +import { mustCall } from '../common/index.mjs'; +import { Readable }from '../../lib/ours/index.js'; +import assert from 'assert'; +import tap from 'tap'; // These tests are manually ported from the draft PR for the test262 test suite // Authored by Rick Waldron in https://github.com/tc39/test262/pull/2818/files @@ -46,131 +46,134 @@ import tap from 'tap' // * Ecma International Standards hereafter means Ecma International Standards // as well as Ecma Technical Reports + // Note all the tests that check AsyncIterator's prototype itself and things // that happen before stream conversion were not ported. { // asIndexedPairs/is-function - assert.strictEqual(typeof Readable.prototype.asIndexedPairs, 'function') + assert.strictEqual(typeof Readable.prototype.asIndexedPairs, 'function'); // asIndexedPairs/indexed-pairs.js - const iterator = Readable.from([0, 1]) - const indexedPairs = iterator.asIndexedPairs() + const iterator = Readable.from([0, 1]); + const indexedPairs = iterator.asIndexedPairs(); for await (const [i, v] of indexedPairs) { - assert.strictEqual(i, v) + assert.strictEqual(i, v); } // asIndexedPairs/length.js - assert.strictEqual(Readable.prototype.asIndexedPairs.length, 0) + assert.strictEqual(Readable.prototype.asIndexedPairs.length, 0); // asIndexedPairs/name.js - assert.strictEqual(Readable.prototype.asIndexedPairs.name, 'asIndexedPairs') - const descriptor = Object.getOwnPropertyDescriptor(Readable.prototype, 'asIndexedPairs') - assert.strictEqual(descriptor.enumerable, false) - assert.strictEqual(descriptor.configurable, true) - assert.strictEqual(descriptor.writable, true) + assert.strictEqual(Readable.prototype.asIndexedPairs.name, 'asIndexedPairs'); + const descriptor = Object.getOwnPropertyDescriptor( + Readable.prototype, + 'asIndexedPairs' + ); + assert.strictEqual(descriptor.enumerable, false); + assert.strictEqual(descriptor.configurable, true); + assert.strictEqual(descriptor.writable, true); } { // drop/length - assert.strictEqual(Readable.prototype.drop.length, 1) - const descriptor = Object.getOwnPropertyDescriptor(Readable.prototype, 'drop') - assert.strictEqual(descriptor.enumerable, false) - assert.strictEqual(descriptor.configurable, true) - assert.strictEqual(descriptor.writable, true) + assert.strictEqual(Readable.prototype.drop.length, 1); + const descriptor = Object.getOwnPropertyDescriptor( + Readable.prototype, + 'drop' + ); + assert.strictEqual(descriptor.enumerable, false); + assert.strictEqual(descriptor.configurable, true); + assert.strictEqual(descriptor.writable, true); // drop/limit-equals-total - const iterator = Readable.from([1, 2]).drop(2) - const result = await iterator[Symbol.asyncIterator]().next() - assert.deepStrictEqual(result, { done: true, value: undefined }) + const iterator = Readable.from([1, 2]).drop(2); + const result = await iterator[Symbol.asyncIterator]().next(); + assert.deepStrictEqual(result, { done: true, value: undefined }); // drop/limit-greater-than-total.js - const iterator2 = Readable.from([1, 2]).drop(3) - const result2 = await iterator2[Symbol.asyncIterator]().next() - assert.deepStrictEqual(result2, { done: true, value: undefined }) + const iterator2 = Readable.from([1, 2]).drop(3); + const result2 = await iterator2[Symbol.asyncIterator]().next(); + assert.deepStrictEqual(result2, { done: true, value: undefined }); // drop/limit-less-than-total.js - const iterator3 = Readable.from([1, 2]).drop(1) - const result3 = await iterator3[Symbol.asyncIterator]().next() - assert.deepStrictEqual(result3, { done: false, value: 2 }) + const iterator3 = Readable.from([1, 2]).drop(1); + const result3 = await iterator3[Symbol.asyncIterator]().next(); + assert.deepStrictEqual(result3, { done: false, value: 2 }); // drop/limit-rangeerror - assert.throws(() => Readable.from([1]).drop(-1), RangeError) + assert.throws(() => Readable.from([1]).drop(-1), RangeError); assert.throws(() => { Readable.from([1]).drop({ valueOf() { - throw new Error('boom') + throw new Error('boom'); } - }) - }, /boom/) + }); + }, /boom/); // drop/limit-tointeger - const two = await Readable.from([1, 2]) - .drop({ valueOf: () => 1 }) - .toArray() - assert.deepStrictEqual(two, [2]) + const two = await Readable.from([1, 2]).drop({ valueOf: () => 1 }).toArray(); + assert.deepStrictEqual(two, [2]); // drop/name - assert.strictEqual(Readable.prototype.drop.name, 'drop') + assert.strictEqual(Readable.prototype.drop.name, 'drop'); // drop/non-constructible - assert.throws(() => new Readable.prototype.drop(1), TypeError) + assert.throws(() => new Readable.prototype.drop(1), TypeError); // drop/proto - const proto = Object.getPrototypeOf(Readable.prototype.drop) - assert.strictEqual(proto, Function.prototype) + const proto = Object.getPrototypeOf(Readable.prototype.drop); + assert.strictEqual(proto, Function.prototype); } { // every/abrupt-iterator-close - const stream = Readable.from([1, 2, 3]) - const e = new Error() - await assert.rejects( - stream.every( - mustCall(() => { - throw e - }, 1) - ), - e - ) + const stream = Readable.from([1, 2, 3]); + const e = new Error(); + await assert.rejects(stream.every(mustCall(() => { + throw e; + }, 1)), e); } { // every/callable-fn - await assert.rejects(Readable.from([1, 2]).every({}), TypeError) + await assert.rejects(Readable.from([1, 2]).every({}), TypeError); } { // every/callable - Readable.prototype.every.call(Readable.from([]), () => {}) + Readable.prototype.every.call(Readable.from([]), () => {}); // eslint-disable-next-line array-callback-return - Readable.from([]).every(() => {}) + Readable.from([]).every(() => {}); assert.throws(() => { - const r = Readable.from([]) - new r.every(() => {}) - }, TypeError) + const r = Readable.from([]); + new r.every(() => {}); + }, TypeError); } { // every/false - const iterator = Readable.from([1, 2, 3]) - const result = await iterator.every((v) => v === 1) - assert.strictEqual(result, false) + const iterator = Readable.from([1, 2, 3]); + const result = await iterator.every((v) => v === 1); + assert.strictEqual(result, false); } { // every/every - const iterator = Readable.from([1, 2, 3]) - const result = await iterator.every((v) => true) - assert.strictEqual(result, true) + const iterator = Readable.from([1, 2, 3]); + const result = await iterator.every((v) => true); + assert.strictEqual(result, true); } { // every/is-function - assert.strictEqual(typeof Readable.prototype.every, 'function') + assert.strictEqual(typeof Readable.prototype.every, 'function'); } { // every/length - assert.strictEqual(Readable.prototype.every.length, 1) + assert.strictEqual(Readable.prototype.every.length, 1); // every/name - assert.strictEqual(Readable.prototype.every.name, 'every') + assert.strictEqual(Readable.prototype.every.name, 'every'); // every/propdesc - const descriptor = Object.getOwnPropertyDescriptor(Readable.prototype, 'every') - assert.strictEqual(descriptor.enumerable, false) - assert.strictEqual(descriptor.configurable, true) - assert.strictEqual(descriptor.writable, true) + const descriptor = Object.getOwnPropertyDescriptor( + Readable.prototype, + 'every' + ); + assert.strictEqual(descriptor.enumerable, false); + assert.strictEqual(descriptor.configurable, true); + assert.strictEqual(descriptor.writable, true); } -/* replacement start */ -process.on('beforeExit', (code) => { - if (code === 0) { - tap.pass('test succeeded') - } else { - tap.fail(`test failed - exited code ${code}`) - } -}) -/* replacement end */ + /* replacement start */ + process.on('beforeExit', (code) => { + if(code === 0) { + tap.pass('test succeeded'); + } else { + tap.fail(`test failed - exited code ${code}`); + } + }); + /* replacement end */ diff --git a/test/parallel/test-stream-pipeline-queued-end-in-destroy.js b/test/parallel/test-stream-pipeline-queued-end-in-destroy.js index 8305a75b6..1d5b71baf 100644 --- a/test/parallel/test-stream-pipeline-queued-end-in-destroy.js +++ b/test/parallel/test-stream-pipeline-queued-end-in-destroy.js @@ -15,7 +15,7 @@ const { Readable, Duplex, pipeline } = require('../../lib/ours/index') // Refs: https://github.com/nodejs/node/issues/24456 const readable = new Readable({ - read: common.mustCall(() => {}) + read: common.mustCall() }) const duplex = new Duplex({ write(chunk, enc, cb) { diff --git a/test/parallel/test-stream-pipeline-with-empty-string.js b/test/parallel/test-stream-pipeline-with-empty-string.js index 74cd6dfbd..2e650b324 100644 --- a/test/parallel/test-stream-pipeline-with-empty-string.js +++ b/test/parallel/test-stream-pipeline-with-empty-string.js @@ -13,10 +13,10 @@ async function runTest() { new PassThrough({ objectMode: true }), - common.mustCall(() => {}) + common.mustCall() ) } -runTest().then(common.mustCall(() => {})) +runTest().then(common.mustCall()) /* replacement start */ process.on('beforeExit', (code) => { diff --git a/test/parallel/test-stream-promises.js b/test/parallel/test-stream-promises.js index 240f931a3..1dc23f91e 100644 --- a/test/parallel/test-stream-promises.js +++ b/test/parallel/test-stream-promises.js @@ -89,6 +89,73 @@ assert.strictEqual(finished, promisify(stream.finished)) }) .then(common.mustCall()) } +{ + const streamObj = new Readable() + assert.throws( + () => { + // Passing cleanup option not as boolean + // should throw error + finished(streamObj, { + cleanup: 2 + }) + }, + { + code: 'ERR_INVALID_ARG_TYPE' + } + ) +} + +// Below code should not throw any errors as the +// streamObj is `Stream` and cleanup is boolean +{ + const streamObj = new Readable() + finished(streamObj, { + cleanup: true + }) +} + +// Cleanup function should not be called when cleanup is set to false +// listenerCount should be 1 after calling finish +{ + const streamObj = new Writable() + assert.strictEqual(streamObj.listenerCount('end'), 0) + finished(streamObj, { + cleanup: false + }).then( + common.mustCall(() => { + assert.strictEqual(streamObj.listenerCount('end'), 1) + }) + ) + streamObj.end() +} + +// Cleanup function should be called when cleanup is set to true +// listenerCount should be 0 after calling finish +{ + const streamObj = new Writable() + assert.strictEqual(streamObj.listenerCount('end'), 0) + finished(streamObj, { + cleanup: true + }).then( + common.mustCall(() => { + assert.strictEqual(streamObj.listenerCount('end'), 0) + }) + ) + streamObj.end() +} + +// Cleanup function should not be called when cleanup has not been set +// listenerCount should be 1 after calling finish +{ + const streamObj = new Writable() + assert.strictEqual(streamObj.listenerCount('end'), 0) + finished(streamObj).then( + common.mustCall(() => { + assert.strictEqual(streamObj.listenerCount('end'), 1) + }) + ) + streamObj.end() +} /* replacement start */ process.on('beforeExit', (code) => { diff --git a/test/parallel/test-stream-readable-unshift.js b/test/parallel/test-stream-readable-unshift.js index 2a554747f..624de8bb1 100644 --- a/test/parallel/test-stream-readable-unshift.js +++ b/test/parallel/test-stream-readable-unshift.js @@ -171,10 +171,7 @@ const { Readable } = require('../../lib/ours/index') } const stream = new ArrayReader() stream.once('readable', common.mustCall(onRead)) - stream.on( - 'end', - common.mustCall(() => {}) - ) + stream.on('end', common.mustCall()) } /* replacement start */ diff --git a/test/parallel/test-stream-some-find-every.mjs b/test/parallel/test-stream-some-find-every.mjs index 30298d0d0..34c8e2a8a 100644 --- a/test/parallel/test-stream-some-find-every.mjs +++ b/test/parallel/test-stream-some-find-every.mjs @@ -1,215 +1,183 @@ -import * as common from '../common/index.mjs' -import { setTimeout } from 'timers/promises' -import { Readable } from '../../lib/ours/index.js' -import assert from 'assert' -import tap from 'tap' +import * as common from '../common/index.mjs'; +import { setTimeout } from 'timers/promises'; +import { Readable }from '../../lib/ours/index.js'; +import assert from 'assert'; +import tap from 'tap'; + function oneTo5() { - return Readable.from([1, 2, 3, 4, 5]) + return Readable.from([1, 2, 3, 4, 5]); } function oneTo5Async() { return oneTo5().map(async (x) => { - await Promise.resolve() - return x - }) + await Promise.resolve(); + return x; + }); } { // Some, find, and every work with a synchronous stream and predicate - assert.strictEqual(await oneTo5().some((x) => x > 3), true) - assert.strictEqual(await oneTo5().every((x) => x > 3), false) - assert.strictEqual(await oneTo5().find((x) => x > 3), 4) - assert.strictEqual(await oneTo5().some((x) => x > 6), false) - assert.strictEqual(await oneTo5().every((x) => x < 6), true) - assert.strictEqual(await oneTo5().find((x) => x > 6), undefined) - assert.strictEqual(await Readable.from([]).some(() => true), false) - assert.strictEqual(await Readable.from([]).every(() => true), true) - assert.strictEqual(await Readable.from([]).find(() => true), undefined) + assert.strictEqual(await oneTo5().some((x) => x > 3), true); + assert.strictEqual(await oneTo5().every((x) => x > 3), false); + assert.strictEqual(await oneTo5().find((x) => x > 3), 4); + assert.strictEqual(await oneTo5().some((x) => x > 6), false); + assert.strictEqual(await oneTo5().every((x) => x < 6), true); + assert.strictEqual(await oneTo5().find((x) => x > 6), undefined); + assert.strictEqual(await Readable.from([]).some(() => true), false); + assert.strictEqual(await Readable.from([]).every(() => true), true); + assert.strictEqual(await Readable.from([]).find(() => true), undefined); } { // Some, find, and every work with an asynchronous stream and synchronous predicate - assert.strictEqual(await oneTo5Async().some((x) => x > 3), true) - assert.strictEqual(await oneTo5Async().every((x) => x > 3), false) - assert.strictEqual(await oneTo5Async().find((x) => x > 3), 4) - assert.strictEqual(await oneTo5Async().some((x) => x > 6), false) - assert.strictEqual(await oneTo5Async().every((x) => x < 6), true) - assert.strictEqual(await oneTo5Async().find((x) => x > 6), undefined) + assert.strictEqual(await oneTo5Async().some((x) => x > 3), true); + assert.strictEqual(await oneTo5Async().every((x) => x > 3), false); + assert.strictEqual(await oneTo5Async().find((x) => x > 3), 4); + assert.strictEqual(await oneTo5Async().some((x) => x > 6), false); + assert.strictEqual(await oneTo5Async().every((x) => x < 6), true); + assert.strictEqual(await oneTo5Async().find((x) => x > 6), undefined); } { // Some, find, and every work on synchronous streams with an asynchronous predicate - assert.strictEqual(await oneTo5().some(async (x) => x > 3), true) - assert.strictEqual(await oneTo5().every(async (x) => x > 3), false) - assert.strictEqual(await oneTo5().find(async (x) => x > 3), 4) - assert.strictEqual(await oneTo5().some(async (x) => x > 6), false) - assert.strictEqual(await oneTo5().every(async (x) => x < 6), true) - assert.strictEqual(await oneTo5().find(async (x) => x > 6), undefined) + assert.strictEqual(await oneTo5().some(async (x) => x > 3), true); + assert.strictEqual(await oneTo5().every(async (x) => x > 3), false); + assert.strictEqual(await oneTo5().find(async (x) => x > 3), 4); + assert.strictEqual(await oneTo5().some(async (x) => x > 6), false); + assert.strictEqual(await oneTo5().every(async (x) => x < 6), true); + assert.strictEqual(await oneTo5().find(async (x) => x > 6), undefined); } { // Some, find, and every work on asynchronous streams with an asynchronous predicate - assert.strictEqual(await oneTo5Async().some(async (x) => x > 3), true) - assert.strictEqual(await oneTo5Async().every(async (x) => x > 3), false) - assert.strictEqual(await oneTo5Async().find(async (x) => x > 3), 4) - assert.strictEqual(await oneTo5Async().some(async (x) => x > 6), false) - assert.strictEqual(await oneTo5Async().every(async (x) => x < 6), true) - assert.strictEqual(await oneTo5Async().find(async (x) => x > 6), undefined) + assert.strictEqual(await oneTo5Async().some(async (x) => x > 3), true); + assert.strictEqual(await oneTo5Async().every(async (x) => x > 3), false); + assert.strictEqual(await oneTo5Async().find(async (x) => x > 3), 4); + assert.strictEqual(await oneTo5Async().some(async (x) => x > 6), false); + assert.strictEqual(await oneTo5Async().every(async (x) => x < 6), true); + assert.strictEqual(await oneTo5Async().find(async (x) => x > 6), undefined); } { async function checkDestroyed(stream) { - await setTimeout() - assert.strictEqual(stream.destroyed, true) + await setTimeout(); + assert.strictEqual(stream.destroyed, true); } { // Some, find, and every short circuit - const someStream = oneTo5() - await someStream.some(common.mustCall((x) => x > 2, 3)) - await checkDestroyed(someStream) + const someStream = oneTo5(); + await someStream.some(common.mustCall((x) => x > 2, 3)); + await checkDestroyed(someStream); - const everyStream = oneTo5() - await everyStream.every(common.mustCall((x) => x < 3, 3)) - await checkDestroyed(everyStream) + const everyStream = oneTo5(); + await everyStream.every(common.mustCall((x) => x < 3, 3)); + await checkDestroyed(everyStream); - const findStream = oneTo5() - await findStream.find(common.mustCall((x) => x > 1, 2)) - await checkDestroyed(findStream) + const findStream = oneTo5(); + await findStream.find(common.mustCall((x) => x > 1, 2)); + await checkDestroyed(findStream); // When short circuit isn't possible the whole stream is iterated - await oneTo5().some(common.mustCall(() => false, 5)) - await oneTo5().every(common.mustCall(() => true, 5)) - await oneTo5().find(common.mustCall(() => false, 5)) + await oneTo5().some(common.mustCall(() => false, 5)); + await oneTo5().every(common.mustCall(() => true, 5)); + await oneTo5().find(common.mustCall(() => false, 5)); } { // Some, find, and every short circuit async stream/predicate - const someStream = oneTo5Async() - await someStream.some(common.mustCall(async (x) => x > 2, 3)) - await checkDestroyed(someStream) + const someStream = oneTo5Async(); + await someStream.some(common.mustCall(async (x) => x > 2, 3)); + await checkDestroyed(someStream); - const everyStream = oneTo5Async() - await everyStream.every(common.mustCall(async (x) => x < 3, 3)) - await checkDestroyed(everyStream) + const everyStream = oneTo5Async(); + await everyStream.every(common.mustCall(async (x) => x < 3, 3)); + await checkDestroyed(everyStream); - const findStream = oneTo5Async() - await findStream.find(common.mustCall(async (x) => x > 1, 2)) - await checkDestroyed(findStream) + const findStream = oneTo5Async(); + await findStream.find(common.mustCall(async (x) => x > 1, 2)); + await checkDestroyed(findStream); // When short circuit isn't possible the whole stream is iterated - await oneTo5Async().some(common.mustCall(async () => false, 5)) - await oneTo5Async().every(common.mustCall(async () => true, 5)) - await oneTo5Async().find(common.mustCall(async () => false, 5)) + await oneTo5Async().some(common.mustCall(async () => false, 5)); + await oneTo5Async().every(common.mustCall(async () => true, 5)); + await oneTo5Async().find(common.mustCall(async () => false, 5)); } } { // Concurrency doesn't affect which value is found. - const found = await Readable.from([1, 2]).find( - async (val) => { - if (val === 1) { - await setTimeout(100) - } - return true - }, - { concurrency: 2 } - ) - assert.strictEqual(found, 1) + const found = await Readable.from([1, 2]).find(async (val) => { + if (val === 1) { + await setTimeout(100); + } + return true; + }, { concurrency: 2 }); + assert.strictEqual(found, 1); } { // Support for AbortSignal for (const op of ['some', 'every', 'find']) { { - const ac = new AbortController() - assert - .rejects( - Readable.from([1, 2, 3])[op](() => new Promise(() => {}), { signal: ac.signal }), - { - name: 'AbortError' - }, - `${op} should abort correctly with sync abort` - ) - .then(common.mustCall()) - ac.abort() + const ac = new AbortController(); + assert.rejects(Readable.from([1, 2, 3])[op]( + () => new Promise(() => { }), + { signal: ac.signal } + ), { + name: 'AbortError', + }, `${op} should abort correctly with sync abort`).then(common.mustCall()); + ac.abort(); } { // Support for pre-aborted AbortSignal - assert - .rejects( - Readable.from([1, 2, 3])[op](() => new Promise(() => {}), { signal: AbortSignal.abort() }), - { - name: 'AbortError' - }, - `${op} should abort with pre-aborted abort controller` - ) - .then(common.mustCall()) + assert.rejects(Readable.from([1, 2, 3])[op]( + () => new Promise(() => { }), + { signal: AbortSignal.abort() } + ), { + name: 'AbortError', + }, `${op} should abort with pre-aborted abort controller`).then(common.mustCall()); } } } { // Error cases for (const op of ['some', 'every', 'find']) { - assert - .rejects( - async () => { - await Readable.from([1])[op](1) - }, - /ERR_INVALID_ARG_TYPE/, - `${op} should throw for invalid function` - ) - .then(common.mustCall()) - assert - .rejects( - async () => { - await Readable.from([1])[op]((x) => x, { - concurrency: 'Foo' - }) - }, - /ERR_OUT_OF_RANGE/, - `${op} should throw for invalid concurrency` - ) - .then(common.mustCall()) - assert - .rejects( - async () => { - await Readable.from([1])[op]((x) => x, 1) - }, - /ERR_INVALID_ARG_TYPE/, - `${op} should throw for invalid concurrency` - ) - .then(common.mustCall()) - assert - .rejects( - async () => { - await Readable.from([1])[op]((x) => x, { - signal: true - }) - }, - /ERR_INVALID_ARG_TYPE/, - `${op} should throw for invalid signal` - ) - .then(common.mustCall()) + assert.rejects(async () => { + await Readable.from([1])[op](1); + }, /ERR_INVALID_ARG_TYPE/, `${op} should throw for invalid function`).then(common.mustCall()); + assert.rejects(async () => { + await Readable.from([1])[op]((x) => x, { + concurrency: 'Foo' + }); + }, /ERR_OUT_OF_RANGE/, `${op} should throw for invalid concurrency`).then(common.mustCall()); + assert.rejects(async () => { + await Readable.from([1])[op]((x) => x, 1); + }, /ERR_INVALID_ARG_TYPE/, `${op} should throw for invalid concurrency`).then(common.mustCall()); + assert.rejects(async () => { + await Readable.from([1])[op]((x) => x, { + signal: true + }); + }, /ERR_INVALID_ARG_TYPE/, `${op} should throw for invalid signal`).then(common.mustCall()); } } { for (const op of ['some', 'every', 'find']) { - const stream = oneTo5() + const stream = oneTo5(); Object.defineProperty(stream, 'map', { - value: common.mustNotCall(() => {}) - }) + value: common.mustNotCall(() => {}), + }); // Check that map isn't getting called. - stream[op](() => {}) + stream[op](() => {}); } } -/* replacement start */ -process.on('beforeExit', (code) => { - if (code === 0) { - tap.pass('test succeeded') - } else { - tap.fail(`test failed - exited code ${code}`) - } -}) -/* replacement end */ + /* replacement start */ + process.on('beforeExit', (code) => { + if(code === 0) { + tap.pass('test succeeded'); + } else { + tap.fail(`test failed - exited code ${code}`); + } + }); + /* replacement end */ diff --git a/test/parallel/test-stream-writable-samecb-singletick.js b/test/parallel/test-stream-writable-samecb-singletick.js index e6b0f162d..e462dba85 100644 --- a/test/parallel/test-stream-writable-samecb-singletick.js +++ b/test/parallel/test-stream-writable-samecb-singletick.js @@ -21,7 +21,7 @@ const async_hooks = require('async_hooks') const checkTickCreated = common.mustCall() const hook = async_hooks .createHook({ - init(id, type, triggerId, resoure) { + init(id, type, triggerId, resource) { if (type === 'TickObject') checkTickCreated() } }) diff --git a/test/parallel/test-stream2-transform.js b/test/parallel/test-stream2-transform.js index cba8b939b..742ff01a2 100644 --- a/test/parallel/test-stream2-transform.js +++ b/test/parallel/test-stream2-transform.js @@ -483,6 +483,27 @@ const { PassThrough, Transform } = require('../../lib/ours/index') }) ) } +{ + const s = new Transform({ + objectMode: true, + construct(callback) { + this.push('header from constructor') + callback() + }, + transform: (row, encoding, callback) => { + callback(null, row) + } + }) + const expected = ['header from constructor', 'firstLine', 'secondLine'] + s.on( + 'data', + common.mustCall((data) => { + assert.strictEqual(data.toString(), expected.shift()) + }, 3) + ) + s.write('firstLine') + process.nextTick(() => s.write('secondLine')) +} /* replacement start */ process.on('beforeExit', (code) => { diff --git a/test/parallel/test-stream3-pipeline-async-iterator.js b/test/parallel/test-stream3-pipeline-async-iterator.js new file mode 100644 index 000000000..2179b4bf6 --- /dev/null +++ b/test/parallel/test-stream3-pipeline-async-iterator.js @@ -0,0 +1,38 @@ +/* eslint-disable node-core/require-common-first, require-yield */ + +'use strict' + +const tap = require('tap') +const silentConsole = { + log() {}, + error() {} +} +const { pipeline } = require('node:stream/promises') +{ + // Ensure that async iterators can act as readable and writable streams + async function* myCustomReadable() { + yield 'Hello' + yield 'World' + } + const messages = [] + async function* myCustomWritable(stream) { + for await (const chunk of stream) { + messages.push(chunk) + } + } + ;(async () => { + await pipeline(myCustomReadable, myCustomWritable) + // Importing here to avoid initializing streams + require('assert').deepStrictEqual(messages, ['Hello', 'World']) + })().then(require('../common').mustCall()) +} + +/* replacement start */ +process.on('beforeExit', (code) => { + if (code === 0) { + tap.pass('test succeeded') + } else { + tap.fail(`test failed - exited code ${code}`) + } +}) +/* replacement end */ From e41f84cf1f550a3b0fbcc810a359239aff716b73 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 8 May 2023 11:53:10 +0200 Subject: [PATCH 2/6] fixup Signed-off-by: Matteo Collina --- build/replacements.mjs | 2 ++ lib/ours/primordials.js | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/build/replacements.mjs b/build/replacements.mjs index aea2876ea..bb64d74ca 100644 --- a/build/replacements.mjs +++ b/build/replacements.mjs @@ -301,6 +301,8 @@ export const replacements = { testParallelTicksReenableConsoleLog, testParallelTickSaveHook ], + 'test/parallel/test-stream3-pipeline-async-iterator.js': [internalStreamsNoRequireAbortController], + 'test/parallel/test-stream-compose-operator.js': [internalStreamsNoRequireAbortController], 'test/parallel/test-stream2-readable-from-list.js': [testParallelReadableBufferListInspect], 'README.md': [readmeInfo, readmeLink] } diff --git a/lib/ours/primordials.js b/lib/ours/primordials.js index 9464cc7fe..6a98b0168 100644 --- a/lib/ours/primordials.js +++ b/lib/ours/primordials.js @@ -90,7 +90,6 @@ module.exports = { return self.trim() }, Symbol, - SymbolFor: Symbol.for, SymbolAsyncIterator: Symbol.asyncIterator, SymbolHasInstance: Symbol.hasInstance, SymbolIterator: Symbol.iterator, From fbb1f4c6cc8b73df36cbe3ddb9c582c1868aec63 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 8 May 2023 11:54:05 +0200 Subject: [PATCH 3/6] fixup Signed-off-by: Matteo Collina --- lib/ours/primordials.js | 1 + src/primordials.js | 1 + 2 files changed, 2 insertions(+) diff --git a/lib/ours/primordials.js b/lib/ours/primordials.js index 6a98b0168..9464cc7fe 100644 --- a/lib/ours/primordials.js +++ b/lib/ours/primordials.js @@ -90,6 +90,7 @@ module.exports = { return self.trim() }, Symbol, + SymbolFor: Symbol.for, SymbolAsyncIterator: Symbol.asyncIterator, SymbolHasInstance: Symbol.hasInstance, SymbolIterator: Symbol.iterator, diff --git a/src/primordials.js b/src/primordials.js index 14e2680bb..04f432256 100644 --- a/src/primordials.js +++ b/src/primordials.js @@ -91,6 +91,7 @@ module.exports = { return self.trim() }, Symbol, + SymbolFor: Symbol.for, SymbolAsyncIterator: Symbol.asyncIterator, SymbolHasInstance: Symbol.hasInstance, SymbolIterator: Symbol.iterator, From 55787233a13f682e98a0224a4910e3f267727a3e Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 8 May 2023 12:03:34 +0200 Subject: [PATCH 4/6] fixupo Signed-off-by: Matteo Collina --- build/replacements.mjs | 7 ++++++- test/parallel/test-stream3-pipeline-async-iterator.js | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/build/replacements.mjs b/build/replacements.mjs index bb64d74ca..8ba86f1e1 100644 --- a/build/replacements.mjs +++ b/build/replacements.mjs @@ -51,6 +51,8 @@ const internalStreamsRequireStream = ["require\\('stream'\\)", "require('../../s const internalStreamsRequireStreams = ["require\\('internal/streams/([^']+)'\\)", "require('./$1')"] +const streamSlashPromisesToStreamDotPromises= ["require\\('(node:)?stream/promises'\\)", "require('../../lib/stream').promises"] + const internalStreamsRequireUtil = [ "require\\('internal/util(?:/(?:debuglog|inspect))?'\\)", "require('../../ours/util')" @@ -301,7 +303,10 @@ export const replacements = { testParallelTicksReenableConsoleLog, testParallelTickSaveHook ], - 'test/parallel/test-stream3-pipeline-async-iterator.js': [internalStreamsNoRequireAbortController], + 'test/parallel/test-stream3-pipeline-async-iterator.js': [ + internalStreamsNoRequireAbortController, + streamSlashPromisesToStreamDotPromises + ], 'test/parallel/test-stream-compose-operator.js': [internalStreamsNoRequireAbortController], 'test/parallel/test-stream2-readable-from-list.js': [testParallelReadableBufferListInspect], 'README.md': [readmeInfo, readmeLink] diff --git a/test/parallel/test-stream3-pipeline-async-iterator.js b/test/parallel/test-stream3-pipeline-async-iterator.js index 2179b4bf6..77bec7f08 100644 --- a/test/parallel/test-stream3-pipeline-async-iterator.js +++ b/test/parallel/test-stream3-pipeline-async-iterator.js @@ -7,7 +7,7 @@ const silentConsole = { log() {}, error() {} } -const { pipeline } = require('node:stream/promises') +const { pipeline } = require('../../lib/stream').promises { // Ensure that async iterators can act as readable and writable streams async function* myCustomReadable() { From ba9a5f531d2eaf8dc4f5811ecb80458f2724652c Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 8 May 2023 12:10:37 +0200 Subject: [PATCH 5/6] fixup Signed-off-by: Matteo Collina --- build/replacements.mjs | 16 ++++++++++++++-- test/parallel/test-stream-compose-operator.js | 1 + .../test-stream3-pipeline-async-iterator.js | 1 + 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/build/replacements.mjs b/build/replacements.mjs index 8ba86f1e1..4a01dd127 100644 --- a/build/replacements.mjs +++ b/build/replacements.mjs @@ -17,6 +17,16 @@ const internalStreamsAbortControllerPolyfill = [ ` ] +const internalStreamsAbortControllerPolyfill2 = [ + "'use strict'", + ` + 'use strict' + + const AbortController = globalThis.AbortController || require(\'abort-controller\').AbortController; + + ` +] + const internalStreamsNoRequireBlob = [ "const \\{\\n isBlob,\\n\\} = require\\('internal/blob'\\);", ` @@ -304,10 +314,12 @@ export const replacements = { testParallelTickSaveHook ], 'test/parallel/test-stream3-pipeline-async-iterator.js': [ - internalStreamsNoRequireAbortController, + internalStreamsAbortControllerPolyfill2, streamSlashPromisesToStreamDotPromises ], - 'test/parallel/test-stream-compose-operator.js': [internalStreamsNoRequireAbortController], + 'test/parallel/test-stream-compose-operator.js': [ + internalStreamsAbortControllerPolyfill2 + ], 'test/parallel/test-stream2-readable-from-list.js': [testParallelReadableBufferListInspect], 'README.md': [readmeInfo, readmeLink] } diff --git a/test/parallel/test-stream-compose-operator.js b/test/parallel/test-stream-compose-operator.js index d5def4583..45cac28d6 100644 --- a/test/parallel/test-stream-compose-operator.js +++ b/test/parallel/test-stream-compose-operator.js @@ -1,5 +1,6 @@ 'use strict' +const AbortController = globalThis.AbortController || require('abort-controller').AbortController const tap = require('tap') const silentConsole = { log() {}, diff --git a/test/parallel/test-stream3-pipeline-async-iterator.js b/test/parallel/test-stream3-pipeline-async-iterator.js index 77bec7f08..981cfc919 100644 --- a/test/parallel/test-stream3-pipeline-async-iterator.js +++ b/test/parallel/test-stream3-pipeline-async-iterator.js @@ -2,6 +2,7 @@ 'use strict' +const AbortController = globalThis.AbortController || require('abort-controller').AbortController const tap = require('tap') const silentConsole = { log() {}, From 15b71e97186da14def339bc457bb0bf24205efaf Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 8 May 2023 13:02:50 +0200 Subject: [PATCH 6/6] format Signed-off-by: Matteo Collina --- src/errors.js | 12 +- test/common/fixtures.mjs | 18 +- test/common/index.mjs | 16 +- test/parallel/test-stream-asIndexedPairs.mjs | 100 ++++--- ...-stream-iterator-helpers-test262-tests.mjs | 159 ++++++----- test/parallel/test-stream-some-find-every.mjs | 258 ++++++++++-------- 6 files changed, 300 insertions(+), 263 deletions(-) diff --git a/src/errors.js b/src/errors.js index 030467266..c999d49d2 100644 --- a/src/errors.js +++ b/src/errors.js @@ -91,15 +91,17 @@ function E(code, message, Base) { value: Base.name, writable: true, enumerable: false, - configurable: true, + configurable: true }, toString: { - value() { return `${this.name} [${code}]: ${this.message}`; }, + value() { + return `${this.name} [${code}]: ${this.message}` + }, writable: true, enumerable: false, - configurable: true, - } - }); + configurable: true + } + }) NodeError.prototype.code = code NodeError.prototype[kIsNodeError] = true diff --git a/test/common/fixtures.mjs b/test/common/fixtures.mjs index d6f7f6c09..372fabf88 100644 --- a/test/common/fixtures.mjs +++ b/test/common/fixtures.mjs @@ -1,17 +1,5 @@ -import fixtures from './fixtures.js'; +import fixtures from './fixtures.js' -const { - fixturesDir, - path, - fileURL, - readSync, - readKey, -} = fixtures; +const { fixturesDir, path, fileURL, readSync, readKey } = fixtures -export { - fixturesDir, - path, - fileURL, - readSync, - readKey, -}; +export { fixturesDir, path, fileURL, readSync, readKey } diff --git a/test/common/index.mjs b/test/common/index.mjs index e77b1b298..d524b2ba5 100644 --- a/test/common/index.mjs +++ b/test/common/index.mjs @@ -1,7 +1,7 @@ -import { createRequire } from 'module'; +import { createRequire } from 'module' -const require = createRequire(import.meta.url); -const common = require('./index.js'); +const require = createRequire(import.meta.url) +const common = require('./index.js') const { isMainThread, @@ -49,10 +49,10 @@ const { getBufferSources, getTTYfd, runWithInvalidFD, - spawnPromisified, -} = common; + spawnPromisified +} = common -const getPort = () => common.PORT; +const getPort = () => common.PORT export { isMainThread, @@ -102,5 +102,5 @@ export { runWithInvalidFD, createRequire, spawnPromisified, - getPort, -}; + getPort +} diff --git a/test/parallel/test-stream-asIndexedPairs.mjs b/test/parallel/test-stream-asIndexedPairs.mjs index a103920ee..35919114a 100644 --- a/test/parallel/test-stream-asIndexedPairs.mjs +++ b/test/parallel/test-stream-asIndexedPairs.mjs @@ -1,64 +1,82 @@ -import '../common/index.mjs'; -import { Readable }from '../../lib/ours/index.js'; -import { deepStrictEqual, rejects, throws } from 'assert'; -import tap from 'tap'; +import '../common/index.mjs' +import { Readable } from '../../lib/ours/index.js' +import { deepStrictEqual, rejects, throws } from 'assert' +import tap from 'tap' { // asIndexedPairs with a synchronous stream - const pairs = await Readable.from([1, 2, 3]).asIndexedPairs().toArray(); - deepStrictEqual(pairs, [[0, 1], [1, 2], [2, 3]]); - const empty = await Readable.from([]).asIndexedPairs().toArray(); - deepStrictEqual(empty, []); + const pairs = await Readable.from([1, 2, 3]).asIndexedPairs().toArray() + deepStrictEqual(pairs, [ + [0, 1], + [1, 2], + [2, 3] + ]) + const empty = await Readable.from([]).asIndexedPairs().toArray() + deepStrictEqual(empty, []) } { // asIndexedPairs works an asynchronous streams - const asyncFrom = (...args) => Readable.from(...args).map(async (x) => x); - const pairs = await asyncFrom([1, 2, 3]).asIndexedPairs().toArray(); - deepStrictEqual(pairs, [[0, 1], [1, 2], [2, 3]]); - const empty = await asyncFrom([]).asIndexedPairs().toArray(); - deepStrictEqual(empty, []); + const asyncFrom = (...args) => Readable.from(...args).map(async (x) => x) + const pairs = await asyncFrom([1, 2, 3]).asIndexedPairs().toArray() + deepStrictEqual(pairs, [ + [0, 1], + [1, 2], + [2, 3] + ]) + const empty = await asyncFrom([]).asIndexedPairs().toArray() + deepStrictEqual(empty, []) } { // Does not enumerate an infinite stream - const infinite = () => Readable.from(async function* () { - while (true) yield 1; - }()); - const pairs = await infinite().asIndexedPairs().take(3).toArray(); - deepStrictEqual(pairs, [[0, 1], [1, 1], [2, 1]]); - const empty = await infinite().asIndexedPairs().take(0).toArray(); - deepStrictEqual(empty, []); + const infinite = () => + Readable.from( + (async function* () { + while (true) yield 1 + })() + ) + const pairs = await infinite().asIndexedPairs().take(3).toArray() + deepStrictEqual(pairs, [ + [0, 1], + [1, 1], + [2, 1] + ]) + const empty = await infinite().asIndexedPairs().take(0).toArray() + deepStrictEqual(empty, []) } { // AbortSignal - await rejects(async () => { - const ac = new AbortController(); - const { signal } = ac; - const p = Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray(); - ac.abort(); - await p; - }, { name: 'AbortError' }); + await rejects( + async () => { + const ac = new AbortController() + const { signal } = ac + const p = Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray() + ac.abort() + await p + }, + { name: 'AbortError' } + ) await rejects(async () => { - const signal = AbortSignal.abort(); - await Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray(); - }, /AbortError/); + const signal = AbortSignal.abort() + await Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray() + }, /AbortError/) } { // Error cases - throws(() => Readable.from([1]).asIndexedPairs(1), /ERR_INVALID_ARG_TYPE/); - throws(() => Readable.from([1]).asIndexedPairs({ signal: true }), /ERR_INVALID_ARG_TYPE/); + throws(() => Readable.from([1]).asIndexedPairs(1), /ERR_INVALID_ARG_TYPE/) + throws(() => Readable.from([1]).asIndexedPairs({ signal: true }), /ERR_INVALID_ARG_TYPE/) } - /* replacement start */ - process.on('beforeExit', (code) => { - if(code === 0) { - tap.pass('test succeeded'); - } else { - tap.fail(`test failed - exited code ${code}`); - } - }); - /* replacement end */ +/* replacement start */ +process.on('beforeExit', (code) => { + if (code === 0) { + tap.pass('test succeeded') + } else { + tap.fail(`test failed - exited code ${code}`) + } +}) +/* replacement end */ diff --git a/test/parallel/test-stream-iterator-helpers-test262-tests.mjs b/test/parallel/test-stream-iterator-helpers-test262-tests.mjs index 9f09abeab..8231f80ce 100644 --- a/test/parallel/test-stream-iterator-helpers-test262-tests.mjs +++ b/test/parallel/test-stream-iterator-helpers-test262-tests.mjs @@ -1,7 +1,7 @@ -import { mustCall } from '../common/index.mjs'; -import { Readable }from '../../lib/ours/index.js'; -import assert from 'assert'; -import tap from 'tap'; +import { mustCall } from '../common/index.mjs' +import { Readable } from '../../lib/ours/index.js' +import assert from 'assert' +import tap from 'tap' // These tests are manually ported from the draft PR for the test262 test suite // Authored by Rick Waldron in https://github.com/tc39/test262/pull/2818/files @@ -46,134 +46,131 @@ import tap from 'tap'; // * Ecma International Standards hereafter means Ecma International Standards // as well as Ecma Technical Reports - // Note all the tests that check AsyncIterator's prototype itself and things // that happen before stream conversion were not ported. { // asIndexedPairs/is-function - assert.strictEqual(typeof Readable.prototype.asIndexedPairs, 'function'); + assert.strictEqual(typeof Readable.prototype.asIndexedPairs, 'function') // asIndexedPairs/indexed-pairs.js - const iterator = Readable.from([0, 1]); - const indexedPairs = iterator.asIndexedPairs(); + const iterator = Readable.from([0, 1]) + const indexedPairs = iterator.asIndexedPairs() for await (const [i, v] of indexedPairs) { - assert.strictEqual(i, v); + assert.strictEqual(i, v) } // asIndexedPairs/length.js - assert.strictEqual(Readable.prototype.asIndexedPairs.length, 0); + assert.strictEqual(Readable.prototype.asIndexedPairs.length, 0) // asIndexedPairs/name.js - assert.strictEqual(Readable.prototype.asIndexedPairs.name, 'asIndexedPairs'); - const descriptor = Object.getOwnPropertyDescriptor( - Readable.prototype, - 'asIndexedPairs' - ); - assert.strictEqual(descriptor.enumerable, false); - assert.strictEqual(descriptor.configurable, true); - assert.strictEqual(descriptor.writable, true); + assert.strictEqual(Readable.prototype.asIndexedPairs.name, 'asIndexedPairs') + const descriptor = Object.getOwnPropertyDescriptor(Readable.prototype, 'asIndexedPairs') + assert.strictEqual(descriptor.enumerable, false) + assert.strictEqual(descriptor.configurable, true) + assert.strictEqual(descriptor.writable, true) } { // drop/length - assert.strictEqual(Readable.prototype.drop.length, 1); - const descriptor = Object.getOwnPropertyDescriptor( - Readable.prototype, - 'drop' - ); - assert.strictEqual(descriptor.enumerable, false); - assert.strictEqual(descriptor.configurable, true); - assert.strictEqual(descriptor.writable, true); + assert.strictEqual(Readable.prototype.drop.length, 1) + const descriptor = Object.getOwnPropertyDescriptor(Readable.prototype, 'drop') + assert.strictEqual(descriptor.enumerable, false) + assert.strictEqual(descriptor.configurable, true) + assert.strictEqual(descriptor.writable, true) // drop/limit-equals-total - const iterator = Readable.from([1, 2]).drop(2); - const result = await iterator[Symbol.asyncIterator]().next(); - assert.deepStrictEqual(result, { done: true, value: undefined }); + const iterator = Readable.from([1, 2]).drop(2) + const result = await iterator[Symbol.asyncIterator]().next() + assert.deepStrictEqual(result, { done: true, value: undefined }) // drop/limit-greater-than-total.js - const iterator2 = Readable.from([1, 2]).drop(3); - const result2 = await iterator2[Symbol.asyncIterator]().next(); - assert.deepStrictEqual(result2, { done: true, value: undefined }); + const iterator2 = Readable.from([1, 2]).drop(3) + const result2 = await iterator2[Symbol.asyncIterator]().next() + assert.deepStrictEqual(result2, { done: true, value: undefined }) // drop/limit-less-than-total.js - const iterator3 = Readable.from([1, 2]).drop(1); - const result3 = await iterator3[Symbol.asyncIterator]().next(); - assert.deepStrictEqual(result3, { done: false, value: 2 }); + const iterator3 = Readable.from([1, 2]).drop(1) + const result3 = await iterator3[Symbol.asyncIterator]().next() + assert.deepStrictEqual(result3, { done: false, value: 2 }) // drop/limit-rangeerror - assert.throws(() => Readable.from([1]).drop(-1), RangeError); + assert.throws(() => Readable.from([1]).drop(-1), RangeError) assert.throws(() => { Readable.from([1]).drop({ valueOf() { - throw new Error('boom'); + throw new Error('boom') } - }); - }, /boom/); + }) + }, /boom/) // drop/limit-tointeger - const two = await Readable.from([1, 2]).drop({ valueOf: () => 1 }).toArray(); - assert.deepStrictEqual(two, [2]); + const two = await Readable.from([1, 2]) + .drop({ valueOf: () => 1 }) + .toArray() + assert.deepStrictEqual(two, [2]) // drop/name - assert.strictEqual(Readable.prototype.drop.name, 'drop'); + assert.strictEqual(Readable.prototype.drop.name, 'drop') // drop/non-constructible - assert.throws(() => new Readable.prototype.drop(1), TypeError); + assert.throws(() => new Readable.prototype.drop(1), TypeError) // drop/proto - const proto = Object.getPrototypeOf(Readable.prototype.drop); - assert.strictEqual(proto, Function.prototype); + const proto = Object.getPrototypeOf(Readable.prototype.drop) + assert.strictEqual(proto, Function.prototype) } { // every/abrupt-iterator-close - const stream = Readable.from([1, 2, 3]); - const e = new Error(); - await assert.rejects(stream.every(mustCall(() => { - throw e; - }, 1)), e); + const stream = Readable.from([1, 2, 3]) + const e = new Error() + await assert.rejects( + stream.every( + mustCall(() => { + throw e + }, 1) + ), + e + ) } { // every/callable-fn - await assert.rejects(Readable.from([1, 2]).every({}), TypeError); + await assert.rejects(Readable.from([1, 2]).every({}), TypeError) } { // every/callable - Readable.prototype.every.call(Readable.from([]), () => {}); + Readable.prototype.every.call(Readable.from([]), () => {}) // eslint-disable-next-line array-callback-return - Readable.from([]).every(() => {}); + Readable.from([]).every(() => {}) assert.throws(() => { - const r = Readable.from([]); - new r.every(() => {}); - }, TypeError); + const r = Readable.from([]) + new r.every(() => {}) + }, TypeError) } { // every/false - const iterator = Readable.from([1, 2, 3]); - const result = await iterator.every((v) => v === 1); - assert.strictEqual(result, false); + const iterator = Readable.from([1, 2, 3]) + const result = await iterator.every((v) => v === 1) + assert.strictEqual(result, false) } { // every/every - const iterator = Readable.from([1, 2, 3]); - const result = await iterator.every((v) => true); - assert.strictEqual(result, true); + const iterator = Readable.from([1, 2, 3]) + const result = await iterator.every((v) => true) + assert.strictEqual(result, true) } { // every/is-function - assert.strictEqual(typeof Readable.prototype.every, 'function'); + assert.strictEqual(typeof Readable.prototype.every, 'function') } { // every/length - assert.strictEqual(Readable.prototype.every.length, 1); + assert.strictEqual(Readable.prototype.every.length, 1) // every/name - assert.strictEqual(Readable.prototype.every.name, 'every'); + assert.strictEqual(Readable.prototype.every.name, 'every') // every/propdesc - const descriptor = Object.getOwnPropertyDescriptor( - Readable.prototype, - 'every' - ); - assert.strictEqual(descriptor.enumerable, false); - assert.strictEqual(descriptor.configurable, true); - assert.strictEqual(descriptor.writable, true); + const descriptor = Object.getOwnPropertyDescriptor(Readable.prototype, 'every') + assert.strictEqual(descriptor.enumerable, false) + assert.strictEqual(descriptor.configurable, true) + assert.strictEqual(descriptor.writable, true) } - /* replacement start */ - process.on('beforeExit', (code) => { - if(code === 0) { - tap.pass('test succeeded'); - } else { - tap.fail(`test failed - exited code ${code}`); - } - }); - /* replacement end */ +/* replacement start */ +process.on('beforeExit', (code) => { + if (code === 0) { + tap.pass('test succeeded') + } else { + tap.fail(`test failed - exited code ${code}`) + } +}) +/* replacement end */ diff --git a/test/parallel/test-stream-some-find-every.mjs b/test/parallel/test-stream-some-find-every.mjs index 34c8e2a8a..30298d0d0 100644 --- a/test/parallel/test-stream-some-find-every.mjs +++ b/test/parallel/test-stream-some-find-every.mjs @@ -1,183 +1,215 @@ -import * as common from '../common/index.mjs'; -import { setTimeout } from 'timers/promises'; -import { Readable }from '../../lib/ours/index.js'; -import assert from 'assert'; -import tap from 'tap'; - +import * as common from '../common/index.mjs' +import { setTimeout } from 'timers/promises' +import { Readable } from '../../lib/ours/index.js' +import assert from 'assert' +import tap from 'tap' function oneTo5() { - return Readable.from([1, 2, 3, 4, 5]); + return Readable.from([1, 2, 3, 4, 5]) } function oneTo5Async() { return oneTo5().map(async (x) => { - await Promise.resolve(); - return x; - }); + await Promise.resolve() + return x + }) } { // Some, find, and every work with a synchronous stream and predicate - assert.strictEqual(await oneTo5().some((x) => x > 3), true); - assert.strictEqual(await oneTo5().every((x) => x > 3), false); - assert.strictEqual(await oneTo5().find((x) => x > 3), 4); - assert.strictEqual(await oneTo5().some((x) => x > 6), false); - assert.strictEqual(await oneTo5().every((x) => x < 6), true); - assert.strictEqual(await oneTo5().find((x) => x > 6), undefined); - assert.strictEqual(await Readable.from([]).some(() => true), false); - assert.strictEqual(await Readable.from([]).every(() => true), true); - assert.strictEqual(await Readable.from([]).find(() => true), undefined); + assert.strictEqual(await oneTo5().some((x) => x > 3), true) + assert.strictEqual(await oneTo5().every((x) => x > 3), false) + assert.strictEqual(await oneTo5().find((x) => x > 3), 4) + assert.strictEqual(await oneTo5().some((x) => x > 6), false) + assert.strictEqual(await oneTo5().every((x) => x < 6), true) + assert.strictEqual(await oneTo5().find((x) => x > 6), undefined) + assert.strictEqual(await Readable.from([]).some(() => true), false) + assert.strictEqual(await Readable.from([]).every(() => true), true) + assert.strictEqual(await Readable.from([]).find(() => true), undefined) } { // Some, find, and every work with an asynchronous stream and synchronous predicate - assert.strictEqual(await oneTo5Async().some((x) => x > 3), true); - assert.strictEqual(await oneTo5Async().every((x) => x > 3), false); - assert.strictEqual(await oneTo5Async().find((x) => x > 3), 4); - assert.strictEqual(await oneTo5Async().some((x) => x > 6), false); - assert.strictEqual(await oneTo5Async().every((x) => x < 6), true); - assert.strictEqual(await oneTo5Async().find((x) => x > 6), undefined); + assert.strictEqual(await oneTo5Async().some((x) => x > 3), true) + assert.strictEqual(await oneTo5Async().every((x) => x > 3), false) + assert.strictEqual(await oneTo5Async().find((x) => x > 3), 4) + assert.strictEqual(await oneTo5Async().some((x) => x > 6), false) + assert.strictEqual(await oneTo5Async().every((x) => x < 6), true) + assert.strictEqual(await oneTo5Async().find((x) => x > 6), undefined) } { // Some, find, and every work on synchronous streams with an asynchronous predicate - assert.strictEqual(await oneTo5().some(async (x) => x > 3), true); - assert.strictEqual(await oneTo5().every(async (x) => x > 3), false); - assert.strictEqual(await oneTo5().find(async (x) => x > 3), 4); - assert.strictEqual(await oneTo5().some(async (x) => x > 6), false); - assert.strictEqual(await oneTo5().every(async (x) => x < 6), true); - assert.strictEqual(await oneTo5().find(async (x) => x > 6), undefined); + assert.strictEqual(await oneTo5().some(async (x) => x > 3), true) + assert.strictEqual(await oneTo5().every(async (x) => x > 3), false) + assert.strictEqual(await oneTo5().find(async (x) => x > 3), 4) + assert.strictEqual(await oneTo5().some(async (x) => x > 6), false) + assert.strictEqual(await oneTo5().every(async (x) => x < 6), true) + assert.strictEqual(await oneTo5().find(async (x) => x > 6), undefined) } { // Some, find, and every work on asynchronous streams with an asynchronous predicate - assert.strictEqual(await oneTo5Async().some(async (x) => x > 3), true); - assert.strictEqual(await oneTo5Async().every(async (x) => x > 3), false); - assert.strictEqual(await oneTo5Async().find(async (x) => x > 3), 4); - assert.strictEqual(await oneTo5Async().some(async (x) => x > 6), false); - assert.strictEqual(await oneTo5Async().every(async (x) => x < 6), true); - assert.strictEqual(await oneTo5Async().find(async (x) => x > 6), undefined); + assert.strictEqual(await oneTo5Async().some(async (x) => x > 3), true) + assert.strictEqual(await oneTo5Async().every(async (x) => x > 3), false) + assert.strictEqual(await oneTo5Async().find(async (x) => x > 3), 4) + assert.strictEqual(await oneTo5Async().some(async (x) => x > 6), false) + assert.strictEqual(await oneTo5Async().every(async (x) => x < 6), true) + assert.strictEqual(await oneTo5Async().find(async (x) => x > 6), undefined) } { async function checkDestroyed(stream) { - await setTimeout(); - assert.strictEqual(stream.destroyed, true); + await setTimeout() + assert.strictEqual(stream.destroyed, true) } { // Some, find, and every short circuit - const someStream = oneTo5(); - await someStream.some(common.mustCall((x) => x > 2, 3)); - await checkDestroyed(someStream); + const someStream = oneTo5() + await someStream.some(common.mustCall((x) => x > 2, 3)) + await checkDestroyed(someStream) - const everyStream = oneTo5(); - await everyStream.every(common.mustCall((x) => x < 3, 3)); - await checkDestroyed(everyStream); + const everyStream = oneTo5() + await everyStream.every(common.mustCall((x) => x < 3, 3)) + await checkDestroyed(everyStream) - const findStream = oneTo5(); - await findStream.find(common.mustCall((x) => x > 1, 2)); - await checkDestroyed(findStream); + const findStream = oneTo5() + await findStream.find(common.mustCall((x) => x > 1, 2)) + await checkDestroyed(findStream) // When short circuit isn't possible the whole stream is iterated - await oneTo5().some(common.mustCall(() => false, 5)); - await oneTo5().every(common.mustCall(() => true, 5)); - await oneTo5().find(common.mustCall(() => false, 5)); + await oneTo5().some(common.mustCall(() => false, 5)) + await oneTo5().every(common.mustCall(() => true, 5)) + await oneTo5().find(common.mustCall(() => false, 5)) } { // Some, find, and every short circuit async stream/predicate - const someStream = oneTo5Async(); - await someStream.some(common.mustCall(async (x) => x > 2, 3)); - await checkDestroyed(someStream); + const someStream = oneTo5Async() + await someStream.some(common.mustCall(async (x) => x > 2, 3)) + await checkDestroyed(someStream) - const everyStream = oneTo5Async(); - await everyStream.every(common.mustCall(async (x) => x < 3, 3)); - await checkDestroyed(everyStream); + const everyStream = oneTo5Async() + await everyStream.every(common.mustCall(async (x) => x < 3, 3)) + await checkDestroyed(everyStream) - const findStream = oneTo5Async(); - await findStream.find(common.mustCall(async (x) => x > 1, 2)); - await checkDestroyed(findStream); + const findStream = oneTo5Async() + await findStream.find(common.mustCall(async (x) => x > 1, 2)) + await checkDestroyed(findStream) // When short circuit isn't possible the whole stream is iterated - await oneTo5Async().some(common.mustCall(async () => false, 5)); - await oneTo5Async().every(common.mustCall(async () => true, 5)); - await oneTo5Async().find(common.mustCall(async () => false, 5)); + await oneTo5Async().some(common.mustCall(async () => false, 5)) + await oneTo5Async().every(common.mustCall(async () => true, 5)) + await oneTo5Async().find(common.mustCall(async () => false, 5)) } } { // Concurrency doesn't affect which value is found. - const found = await Readable.from([1, 2]).find(async (val) => { - if (val === 1) { - await setTimeout(100); - } - return true; - }, { concurrency: 2 }); - assert.strictEqual(found, 1); + const found = await Readable.from([1, 2]).find( + async (val) => { + if (val === 1) { + await setTimeout(100) + } + return true + }, + { concurrency: 2 } + ) + assert.strictEqual(found, 1) } { // Support for AbortSignal for (const op of ['some', 'every', 'find']) { { - const ac = new AbortController(); - assert.rejects(Readable.from([1, 2, 3])[op]( - () => new Promise(() => { }), - { signal: ac.signal } - ), { - name: 'AbortError', - }, `${op} should abort correctly with sync abort`).then(common.mustCall()); - ac.abort(); + const ac = new AbortController() + assert + .rejects( + Readable.from([1, 2, 3])[op](() => new Promise(() => {}), { signal: ac.signal }), + { + name: 'AbortError' + }, + `${op} should abort correctly with sync abort` + ) + .then(common.mustCall()) + ac.abort() } { // Support for pre-aborted AbortSignal - assert.rejects(Readable.from([1, 2, 3])[op]( - () => new Promise(() => { }), - { signal: AbortSignal.abort() } - ), { - name: 'AbortError', - }, `${op} should abort with pre-aborted abort controller`).then(common.mustCall()); + assert + .rejects( + Readable.from([1, 2, 3])[op](() => new Promise(() => {}), { signal: AbortSignal.abort() }), + { + name: 'AbortError' + }, + `${op} should abort with pre-aborted abort controller` + ) + .then(common.mustCall()) } } } { // Error cases for (const op of ['some', 'every', 'find']) { - assert.rejects(async () => { - await Readable.from([1])[op](1); - }, /ERR_INVALID_ARG_TYPE/, `${op} should throw for invalid function`).then(common.mustCall()); - assert.rejects(async () => { - await Readable.from([1])[op]((x) => x, { - concurrency: 'Foo' - }); - }, /ERR_OUT_OF_RANGE/, `${op} should throw for invalid concurrency`).then(common.mustCall()); - assert.rejects(async () => { - await Readable.from([1])[op]((x) => x, 1); - }, /ERR_INVALID_ARG_TYPE/, `${op} should throw for invalid concurrency`).then(common.mustCall()); - assert.rejects(async () => { - await Readable.from([1])[op]((x) => x, { - signal: true - }); - }, /ERR_INVALID_ARG_TYPE/, `${op} should throw for invalid signal`).then(common.mustCall()); + assert + .rejects( + async () => { + await Readable.from([1])[op](1) + }, + /ERR_INVALID_ARG_TYPE/, + `${op} should throw for invalid function` + ) + .then(common.mustCall()) + assert + .rejects( + async () => { + await Readable.from([1])[op]((x) => x, { + concurrency: 'Foo' + }) + }, + /ERR_OUT_OF_RANGE/, + `${op} should throw for invalid concurrency` + ) + .then(common.mustCall()) + assert + .rejects( + async () => { + await Readable.from([1])[op]((x) => x, 1) + }, + /ERR_INVALID_ARG_TYPE/, + `${op} should throw for invalid concurrency` + ) + .then(common.mustCall()) + assert + .rejects( + async () => { + await Readable.from([1])[op]((x) => x, { + signal: true + }) + }, + /ERR_INVALID_ARG_TYPE/, + `${op} should throw for invalid signal` + ) + .then(common.mustCall()) } } { for (const op of ['some', 'every', 'find']) { - const stream = oneTo5(); + const stream = oneTo5() Object.defineProperty(stream, 'map', { - value: common.mustNotCall(() => {}), - }); + value: common.mustNotCall(() => {}) + }) // Check that map isn't getting called. - stream[op](() => {}); + stream[op](() => {}) } } - /* replacement start */ - process.on('beforeExit', (code) => { - if(code === 0) { - tap.pass('test succeeded'); - } else { - tap.fail(`test failed - exited code ${code}`); - } - }); - /* replacement end */ +/* replacement start */ +process.on('beforeExit', (code) => { + if (code === 0) { + tap.pass('test succeeded') + } else { + tap.fail(`test failed - exited code ${code}`) + } +}) +/* replacement end */