diff --git a/.github/workflows/node.yml b/.github/workflows/node.yml index 76a9b50f5..f94148058 100644 --- a/.github/workflows/node.yml +++ b/.github/workflows/node.yml @@ -13,7 +13,7 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest, windows-latest, macos-latest] - node-version: [12.x, 14.x, 16.x, 18.x, 20.x] + node-version: [12.x, 14.x, 16.x, 18.x, 20.x, 21.x] exclude: - os: windows-latest node-version: 12.x diff --git a/README.md b/README.md index b82a6f125..339af1b6e 100644 --- a/README.md +++ b/README.md @@ -11,9 +11,9 @@ npm install readable-stream ``` -This package is a mirror of the streams implementations in Node.js 18.16.0. +This package is a mirror of the streams implementations in Node.js 18.19.0. -Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v18.16.0/docs/api/stream.html). +Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v18.19.0/docs/api/stream.html). If you want to guarantee a stable streams base, regardless of what version of Node you, or the users of your libraries are using, use **readable-stream** _only_ and avoid the _"stream"_ module in Node-core, for background see [this blogpost](http://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html). diff --git a/build/build.mjs b/build/build.mjs index b990fd327..edf61421b 100644 --- a/build/build.mjs +++ b/build/build.mjs @@ -69,6 +69,7 @@ async function extract(nodeVersion, tarFile) { }) await finished(tarFile.pipe(parser)) + info('extraction done') return contents } diff --git a/build/replacements.mjs b/build/replacements.mjs index 6e133b903..f221716d4 100644 --- a/build/replacements.mjs +++ b/build/replacements.mjs @@ -42,6 +42,11 @@ const internalStreamsNoRequireAbortController = [ 'const AbortController = globalThis.AbortController || require(\'abort-controller\').AbortController;' ] +const internalStreamsNoRequireAbortController2 = [ + 'const \\{ AbortController, AbortSignal \\} = .+', + 'const AbortController = globalThis.AbortController || require(\'abort-controller\').AbortController;' +] + const internalStreamsRequireInternal = ["require\\('internal/([^']+)'\\)", "require('../$1')"] const internalStreamsRequireErrors = ["require\\('internal/errors'\\)", "require('../../ours/errors')"] @@ -74,7 +79,12 @@ const internalStreamsRequireWebStream = ["require\\('internal/webstreams/adapter const internalStreamsWeakHandler = [ "const \\{ kWeakHandler \\} = require\\('../event_target'\\);", - "const kWeakHandler = require('../../ours/primordials').Symbol('kWeak');" + "require\\('../event_target'\\);const kWeakHandler = require('../../ours/primordials').Symbol('kWeak');" +] + +const internalStreamsWeakHandler2 = [ + "const \\{ kWeakHandler, kResistStopPropagation \\} = .*;", + "const kWeakHandler = require('../../ours/primordials').Symbol('kWeak');\nconst kResistStopPropagation = require('../../ours/primordials').Symbol('kResistStopPropagation');" ] const internalValidatorsNoCoalesceAssignment = [ @@ -142,6 +152,7 @@ const testCommonKnownGlobals = [ typeof AbortController !== 'undefined' ? AbortController : require('abort-controller').AbortController, typeof AbortSignal !== 'undefined' ? AbortSignal : require('abort-controller').AbortSignal, typeof EventTarget !== 'undefined' ? EventTarget : require('event-target-shim').EventTarget, + typeof navigator !== 'undefined' ? navigator : {}, ` ] @@ -238,6 +249,32 @@ const readmeLink = ['(\\[Node.js website\\]\\(https://nodejs.org/dist/v)(\\d+.\\ const streamRequire = [ "require\\('stream'\\)", "require('../../lib/stream.js')" ] +const removeWebStreamsFromDuplexFromTest= [ + 'const { ReadableStream, WritableStream } = .+;', + `function makeATestReadableStream(value) { + return Readable.from([value]) +} +function makeATestWritableStream(writeFunc) { + return new Writable({ + write(chunk, enc, cb) { + writeFunc(chunk) + cb() + } + }) +} +` +] + +const duplexFromTestWebStreamNeutralizeReadable = [ + 'makeATestReadableStream\\(value\\) {', + 'makeATestReadableStreamOff(value) {' +] + +const duplexFromTestWebStreamNeutralizeWritable = [ + 'makeATestWritableStream\\(writeFunc\\) {', + 'makeATestWritableStreamOff(writeFunc) {' +] + export const replacements = { 'lib/_stream.+': [legacyStreamsRequireStream], 'lib/internal/streams/duplexify.+': [ @@ -248,7 +285,9 @@ export const replacements = { ], 'lib/internal/streams/(operators|pipeline).+': [ internalStreamsAbortControllerPolyfill, - internalStreamsNoRequireAbortController + internalStreamsNoRequireAbortController, + internalStreamsNoRequireAbortController2, + internalStreamsWeakHandler2 ], 'lib/internal/streams/readable.js': [ removefromWebReadableMethod, @@ -307,7 +346,13 @@ export const replacements = { testParallelSilentConsole, testParallelTimersPromises ], - 'test/parallel/test-stream-duplex-from.js': [testParallelDuplexFromBlob, testParallelDuplexSkipWithoutBlob], + 'test/parallel/test-stream-duplex-from.js': [ + testParallelDuplexFromBlob, + testParallelDuplexSkipWithoutBlob, + duplexFromTestWebStreamNeutralizeReadable, + duplexFromTestWebStreamNeutralizeWritable, + removeWebStreamsFromDuplexFromTest + ], 'test/parallel/test-stream-finished.js': [testParallelFinishedEvent], 'test/parallel/test-stream-flatMap.js': [testParallelFlatMapWinLineSeparator], 'test/parallel/test-stream-preprocess.js': [testParallelPreprocessWinLineSeparator], diff --git a/lib/internal/streams/add-abort-signal.js b/lib/internal/streams/add-abort-signal.js index 3a26a1d3e..509eb9f57 100644 --- a/lib/internal/streams/add-abort-signal.js +++ b/lib/internal/streams/add-abort-signal.js @@ -1,9 +1,11 @@ 'use strict' +const { SymbolDispose } = require('../../ours/primordials') const { AbortError, codes } = require('../../ours/errors') const { isNodeStream, isWebStream, kControllerErrorFunction } = require('./utils') const eos = require('./end-of-stream') const { ERR_INVALID_ARG_TYPE } = codes +let addAbortListener // This method is inlined here for readable-stream // It also does not allow for signal to not exist on the stream @@ -42,8 +44,9 @@ module.exports.addAbortSignalNoValidate = function (signal, stream) { if (signal.aborted) { onAbort() } else { - signal.addEventListener('abort', onAbort) - eos(stream, () => signal.removeEventListener('abort', onAbort)) + addAbortListener ??= require('events').addAbortListener + const disposable = addAbortListener(signal, onAbort) + eos(stream, disposable[SymbolDispose]) } return stream } diff --git a/lib/internal/streams/compose.js b/lib/internal/streams/compose.js index f565c12ef..b399d540f 100644 --- a/lib/internal/streams/compose.js +++ b/lib/internal/streams/compose.js @@ -74,7 +74,7 @@ module.exports = function compose(...streams) { d = new Duplex({ // TODO (ronag): highWaterMark? writableObjectMode: !!(head !== null && head !== undefined && head.writableObjectMode), - readableObjectMode: !!(tail !== null && tail !== undefined && tail.writableObjectMode), + readableObjectMode: !!(tail !== null && tail !== undefined && tail.readableObjectMode), writable, readable }) diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index db76c29f9..38292315e 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -12,7 +12,7 @@ const { AbortError } = require('../../ours/errors') const { Symbol } = require('../../ours/primordials') -const { kDestroyed, isDestroyed, isFinished, isServerRequest } = require('./utils') +const { kIsDestroyed, isDestroyed, isFinished, isServerRequest } = require('./utils') const kDestroy = Symbol('kDestroy') const kConstruct = Symbol('kConstruct') function checkError(err, w, r) { @@ -278,7 +278,7 @@ function destroyer(stream, err) { process.nextTick(emitCloseLegacy, stream) } if (!stream.destroyed) { - stream[kDestroyed] = true + stream[kIsDestroyed] = true } } module.exports = { diff --git a/lib/internal/streams/duplexify.js b/lib/internal/streams/duplexify.js index 599fb47ab..05740d70f 100644 --- a/lib/internal/streams/duplexify.js +++ b/lib/internal/streams/duplexify.js @@ -13,7 +13,9 @@ const { isNodeStream, isReadableNodeStream, isWritableNodeStream, - isDuplexNodeStream + isDuplexNodeStream, + isReadableStream, + isWritableStream } = require('./utils') const eos = require('./end-of-stream') const { @@ -23,6 +25,7 @@ const { const { destroyer } = require('./destroy') const Duplex = require('./duplex') const Readable = require('./readable') +const Writable = require('./writable') const { createDeferredPromise } = require('../../ours/util') const from = require('./from') const Blob = globalThis.Blob || bufferModule.Blob @@ -77,17 +80,16 @@ module.exports = function duplexify(body, name) { readable: false }) } - - // TODO: Webstreams - // if (isReadableStream(body)) { - // return _duplexify({ readable: Readable.fromWeb(body) }); - // } - - // TODO: Webstreams - // if (isWritableStream(body)) { - // return _duplexify({ writable: Writable.fromWeb(body) }); - // } - + if (isReadableStream(body)) { + return _duplexify({ + readable: Readable.fromWeb(body) + }) + } + if (isWritableStream(body)) { + return _duplexify({ + writable: Writable.fromWeb(body) + }) + } if (typeof body === 'function') { const { value, write, final, destroy } = fromAsyncGen(body) if (isIterable(value)) { @@ -144,15 +146,12 @@ module.exports = function duplexify(body, name) { writable: false }) } - - // TODO: Webstreams. - // if ( - // isReadableStream(body?.readable) && - // isWritableStream(body?.writable) - // ) { - // return Duplexify.fromWeb(body); - // } - + if ( + isReadableStream(body === null || body === undefined ? undefined : body.readable) && + isWritableStream(body === null || body === undefined ? undefined : body.writable) + ) { + return Duplexify.fromWeb(body) + } if ( typeof (body === null || body === undefined ? undefined : body.writable) === 'object' || typeof (body === null || body === undefined ? undefined : body.readable) === 'object' diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 043c9c4bd..77c31616b 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -11,7 +11,7 @@ const { AbortError, codes } = require('../../ours/errors') const { ERR_INVALID_ARG_TYPE, ERR_STREAM_PREMATURE_CLOSE } = codes const { kEmptyObject, once } = require('../../ours/util') const { validateAbortSignal, validateFunction, validateObject, validateBoolean } = require('../validators') -const { Promise, PromisePrototypeThen } = require('../../ours/primordials') +const { Promise, PromisePrototypeThen, SymbolDispose } = require('../../ours/primordials') const { isClosed, isReadable, @@ -28,6 +28,7 @@ const { willEmitClose: _willEmitClose, kIsClosedPromise } = require('./utils') +let addAbortListener function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function' } @@ -212,12 +213,13 @@ function eos(stream, options, callback) { if (options.signal.aborted) { process.nextTick(abort) } else { + addAbortListener ??= require('events').addAbortListener + const disposable = addAbortListener(options.signal, abort) const originalCallback = callback callback = once((...args) => { - options.signal.removeEventListener('abort', abort) + disposable[SymbolDispose]() originalCallback.apply(stream, args) }) - options.signal.addEventListener('abort', abort) } } return cleanup @@ -238,12 +240,13 @@ function eosWeb(stream, options, callback) { if (options.signal.aborted) { process.nextTick(abort) } else { + addAbortListener ??= require('events').addAbortListener + const disposable = addAbortListener(options.signal, abort) const originalCallback = callback callback = once((...args) => { - options.signal.removeEventListener('abort', abort) + disposable[SymbolDispose]() originalCallback.apply(stream, args) }) - options.signal.addEventListener('abort', abort) } } const resolverFn = (...args) => { diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 869cacb39..beaa64871 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -7,17 +7,21 @@ const { } = require('../../ours/errors') const { validateAbortSignal, validateInteger, validateObject } = require('../validators') const kWeakHandler = require('../../ours/primordials').Symbol('kWeak') +const kResistStopPropagation = require('../../ours/primordials').Symbol('kResistStopPropagation') const { finished } = require('./end-of-stream') const staticCompose = require('./compose') const { addAbortSignalNoValidate } = require('./add-abort-signal') const { isWritable, isNodeStream } = require('./utils') +const { deprecate } = require('../../ours/util') const { ArrayPrototypePush, + Boolean, MathFloor, Number, NumberIsNaN, Promise, PromiseReject, + PromiseResolve, PromisePrototypeThen, Symbol } = require('../../ours/primordials') @@ -54,41 +58,43 @@ function map(fn, options) { if ((options === null || options === undefined ? undefined : options.concurrency) != null) { concurrency = MathFloor(options.concurrency) } - validateInteger(concurrency, 'concurrency', 1) + let highWaterMark = concurrency - 1 + if ((options === null || options === undefined ? undefined : options.highWaterMark) != null) { + highWaterMark = MathFloor(options.highWaterMark) + } + validateInteger(concurrency, 'options.concurrency', 1) + validateInteger(highWaterMark, 'options.highWaterMark', 0) + highWaterMark += concurrency return async function* map() { - var _options$signal, _options$signal2 - const ac = new AbortController() + const signal = AbortSignal.any( + [options === null || options === undefined ? undefined : options.signal].filter(Boolean) + ) const stream = this const queue = [] - const signal = ac.signal const signalOpt = { signal } - const abort = () => ac.abort() - if ( - options !== null && - options !== undefined && - (_options$signal = options.signal) !== null && - _options$signal !== undefined && - _options$signal.aborted - ) { - abort() - } - options === null || options === undefined - ? undefined - : (_options$signal2 = options.signal) === null || _options$signal2 === undefined - ? undefined - : _options$signal2.addEventListener('abort', abort) let next let resume let done = false - function onDone() { + let cnt = 0 + function onCatch() { done = true + afterItemProcessed() + } + function afterItemProcessed() { + cnt -= 1 + maybeResume() + } + function maybeResume() { + if (resume && !done && cnt < concurrency && queue.length < highWaterMark) { + resume() + resume = null + } } async function pump() { try { for await (let val of stream) { - var _val if (done) { return } @@ -97,21 +103,21 @@ function map(fn, options) { } try { val = fn(val, signalOpt) + if (val === kEmpty) { + continue + } + val = PromiseResolve(val) } catch (err) { val = PromiseReject(err) } - if (val === kEmpty) { - continue - } - if (typeof ((_val = val) === null || _val === undefined ? undefined : _val.catch) === 'function') { - val.catch(onDone) - } + cnt += 1 + PromisePrototypeThen(val, afterItemProcessed, onCatch) queue.push(val) if (next) { next() next = null } - if (!done && queue.length && queue.length >= concurrency) { + if (!done && (queue.length >= highWaterMark || cnt >= concurrency)) { await new Promise((resolve) => { resume = resolve }) @@ -120,20 +126,14 @@ function map(fn, options) { queue.push(kEof) } catch (err) { const val = PromiseReject(err) - PromisePrototypeThen(val, undefined, onDone) + PromisePrototypeThen(val, afterItemProcessed, onCatch) queue.push(val) } finally { - var _options$signal3 done = true if (next) { next() next = null } - options === null || options === undefined - ? undefined - : (_options$signal3 = options.signal) === null || _options$signal3 === undefined - ? undefined - : _options$signal3.removeEventListener('abort', abort) } } pump() @@ -151,17 +151,13 @@ function map(fn, options) { yield val } queue.shift() - if (resume) { - resume() - resume = null - } + maybeResume() } await new Promise((resolve) => { next = resolve }) } } finally { - ac.abort() done = true if (resume) { resume() @@ -180,13 +176,13 @@ function asIndexedPairs(options = undefined) { return async function* asIndexedPairs() { let index = 0 for await (const val of this) { - var _options$signal4 + var _options$signal if ( options !== null && options !== undefined && - (_options$signal4 = options.signal) !== null && - _options$signal4 !== undefined && - _options$signal4.aborted + (_options$signal = options.signal) !== null && + _options$signal !== undefined && + _options$signal.aborted ) { throw new AbortError({ cause: options.signal.reason @@ -254,7 +250,7 @@ class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS { } } async function reduce(reducer, initialValue, options) { - var _options$signal5 + var _options$signal2 if (typeof reducer !== 'function') { throw new ERR_INVALID_ARG_TYPE('reducer', ['Function', 'AsyncFunction'], reducer) } @@ -268,9 +264,9 @@ async function reduce(reducer, initialValue, options) { if ( options !== null && options !== undefined && - (_options$signal5 = options.signal) !== null && - _options$signal5 !== undefined && - _options$signal5.aborted + (_options$signal2 = options.signal) !== null && + _options$signal2 !== undefined && + _options$signal2.aborted ) { const err = new AbortError(undefined, { cause: options.signal.reason @@ -284,21 +280,22 @@ async function reduce(reducer, initialValue, options) { if (options !== null && options !== undefined && options.signal) { const opts = { once: true, - [kWeakHandler]: this + [kWeakHandler]: this, + [kResistStopPropagation]: true } options.signal.addEventListener('abort', () => ac.abort(), opts) } let gotAnyItemFromStream = false try { for await (const value of this) { - var _options$signal6 + var _options$signal3 gotAnyItemFromStream = true if ( options !== null && options !== undefined && - (_options$signal6 = options.signal) !== null && - _options$signal6 !== undefined && - _options$signal6.aborted + (_options$signal3 = options.signal) !== null && + _options$signal3 !== undefined && + _options$signal3.aborted ) { throw new AbortError() } @@ -328,13 +325,13 @@ async function toArray(options) { } const result = [] for await (const val of this) { - var _options$signal7 + var _options$signal4 if ( options !== null && options !== undefined && - (_options$signal7 = options.signal) !== null && - _options$signal7 !== undefined && - _options$signal7.aborted + (_options$signal4 = options.signal) !== null && + _options$signal4 !== undefined && + _options$signal4.aborted ) { throw new AbortError(undefined, { cause: options.signal.reason @@ -373,24 +370,24 @@ function drop(number, options = undefined) { } number = toIntegerOrInfinity(number) return async function* drop() { - var _options$signal8 + var _options$signal5 if ( options !== null && options !== undefined && - (_options$signal8 = options.signal) !== null && - _options$signal8 !== undefined && - _options$signal8.aborted + (_options$signal5 = options.signal) !== null && + _options$signal5 !== undefined && + _options$signal5.aborted ) { throw new AbortError() } for await (const val of this) { - var _options$signal9 + var _options$signal6 if ( options !== null && options !== undefined && - (_options$signal9 = options.signal) !== null && - _options$signal9 !== undefined && - _options$signal9.aborted + (_options$signal6 = options.signal) !== null && + _options$signal6 !== undefined && + _options$signal6.aborted ) { throw new AbortError() } @@ -409,37 +406,40 @@ function take(number, options = undefined) { } number = toIntegerOrInfinity(number) return async function* take() { - var _options$signal10 + var _options$signal7 if ( options !== null && options !== undefined && - (_options$signal10 = options.signal) !== null && - _options$signal10 !== undefined && - _options$signal10.aborted + (_options$signal7 = options.signal) !== null && + _options$signal7 !== undefined && + _options$signal7.aborted ) { throw new AbortError() } for await (const val of this) { - var _options$signal11 + var _options$signal8 if ( options !== null && options !== undefined && - (_options$signal11 = options.signal) !== null && - _options$signal11 !== undefined && - _options$signal11.aborted + (_options$signal8 = options.signal) !== null && + _options$signal8 !== undefined && + _options$signal8.aborted ) { throw new AbortError() } if (number-- > 0) { yield val - } else { + } + + // Don't get another item from iterator in case we reached the end + if (number <= 0) { return } } }.call(this) } module.exports.streamReturningOperators = { - asIndexedPairs, + asIndexedPairs: deprecate(asIndexedPairs, 'readable.asIndexedPairs will be removed in a future version.'), drop, filter, flatMap, diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 8393ba514..cb8d0bada 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -7,7 +7,7 @@ const process = require('process/') // permission from the author, Mathias Buus (@mafintosh). ;('use strict') -const { ArrayIsArray, Promise, SymbolAsyncIterator } = require('../../ours/primordials') +const { ArrayIsArray, Promise, SymbolAsyncIterator, SymbolDispose } = require('../../ours/primordials') const eos = require('./end-of-stream') const { once } = require('../../ours/util') const destroyImpl = require('./destroy') @@ -32,11 +32,12 @@ const { isTransformStream, isWebStream, isReadableStream, - isReadableEnded + isReadableFinished } = require('./utils') const AbortController = globalThis.AbortController || require('abort-controller').AbortController let PassThrough let Readable +let addAbortListener function destroyer(stream, reading, writing) { let finished = false stream.on('close', () => { @@ -129,8 +130,8 @@ async function pumpToNode(iterable, writable, finish, { end }) { } if (end) { writable.end() + await wait() } - await wait() finish() } catch (err) { finish(error !== err ? aggregateTwoErrors(error, err) : err) @@ -185,7 +186,11 @@ function pipelineImpl(streams, callback, opts) { function abort() { finishImpl(new AbortError()) } - outerSignal === null || outerSignal === undefined ? undefined : outerSignal.addEventListener('abort', abort) + addAbortListener ??= require('events').addAbortListener + let disposable + if (outerSignal) { + disposable = addAbortListener(outerSignal, abort) + } let error let value const destroys = [] @@ -194,6 +199,7 @@ function pipelineImpl(streams, callback, opts) { finishImpl(err, --finishCount === 0) } function finishImpl(err, final) { + var _disposable if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) { error = err } @@ -203,7 +209,7 @@ function pipelineImpl(streams, callback, opts) { while (destroys.length) { destroys.shift()(error) } - outerSignal === null || outerSignal === undefined ? undefined : outerSignal.removeEventListener('abort', abort) + ;(_disposable = disposable) === null || _disposable === undefined ? undefined : _disposable[SymbolDispose]() ac.abort() if (final) { if (!error) { @@ -411,7 +417,7 @@ function pipe(src, dst, finish, { end }) { ended = true dst.end() } - if (isReadableEnded(src)) { + if (isReadableFinished(src)) { // End the destination if the source has already ended. process.nextTick(endFn) } else { diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 3fc01d1f8..5b494ba3d 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -35,6 +35,7 @@ const { ObjectSetPrototypeOf, Promise, SafeSet, + SymbolAsyncDispose, SymbolAsyncIterator, Symbol } = require('../../ours/primordials') @@ -59,7 +60,8 @@ const { ERR_OUT_OF_RANGE, ERR_STREAM_PUSH_AFTER_EOF, ERR_STREAM_UNSHIFT_AFTER_END_EVENT - } + }, + AbortError } = require('../../ours/errors') const { validateObject } = require('../validators') const kPaused = Symbol('kPaused') @@ -69,6 +71,76 @@ ObjectSetPrototypeOf(Readable.prototype, Stream.prototype) ObjectSetPrototypeOf(Readable, Stream) const nop = () => {} const { errorOrDestroy } = destroyImpl +const kObjectMode = 1 << 0 +const kEnded = 1 << 1 +const kEndEmitted = 1 << 2 +const kReading = 1 << 3 +const kConstructed = 1 << 4 +const kSync = 1 << 5 +const kNeedReadable = 1 << 6 +const kEmittedReadable = 1 << 7 +const kReadableListening = 1 << 8 +const kResumeScheduled = 1 << 9 +const kErrorEmitted = 1 << 10 +const kEmitClose = 1 << 11 +const kAutoDestroy = 1 << 12 +const kDestroyed = 1 << 13 +const kClosed = 1 << 14 +const kCloseEmitted = 1 << 15 +const kMultiAwaitDrain = 1 << 16 +const kReadingMore = 1 << 17 +const kDataEmitted = 1 << 18 + +// TODO(benjamingr) it is likely slower to do it this way than with free functions +function makeBitMapDescriptor(bit) { + return { + enumerable: false, + get() { + return (this.state & bit) !== 0 + }, + set(value) { + if (value) this.state |= bit + else this.state &= ~bit + } + } +} +ObjectDefineProperties(ReadableState.prototype, { + objectMode: makeBitMapDescriptor(kObjectMode), + ended: makeBitMapDescriptor(kEnded), + endEmitted: makeBitMapDescriptor(kEndEmitted), + reading: makeBitMapDescriptor(kReading), + // Stream is still being constructed and cannot be + // destroyed until construction finished or failed. + // Async construction is opt in, therefore we start as + // constructed. + constructed: makeBitMapDescriptor(kConstructed), + // A flag to be able to tell if the event 'readable'/'data' is emitted + // immediately, or on a later tick. We set this to true at first, because + // any actions that shouldn't happen until "later" should generally also + // not happen before the first read call. + sync: makeBitMapDescriptor(kSync), + // Whenever we return null, then we set a flag to say + // that we're awaiting a 'readable' event emission. + needReadable: makeBitMapDescriptor(kNeedReadable), + emittedReadable: makeBitMapDescriptor(kEmittedReadable), + readableListening: makeBitMapDescriptor(kReadableListening), + resumeScheduled: makeBitMapDescriptor(kResumeScheduled), + // True if the error was already emitted and should not be thrown again. + errorEmitted: makeBitMapDescriptor(kErrorEmitted), + emitClose: makeBitMapDescriptor(kEmitClose), + autoDestroy: makeBitMapDescriptor(kAutoDestroy), + // Has it been destroyed. + destroyed: makeBitMapDescriptor(kDestroyed), + // Indicates whether the stream has finished destroying. + closed: makeBitMapDescriptor(kClosed), + // True if close has been emitted or would have been emitted + // depending on emitClose. + closeEmitted: makeBitMapDescriptor(kCloseEmitted), + multiAwaitDrain: makeBitMapDescriptor(kMultiAwaitDrain), + // If true, a maybeReadMore has been scheduled. + readingMore: makeBitMapDescriptor(kReadingMore), + dataEmitted: makeBitMapDescriptor(kDataEmitted) +}) function ReadableState(options, stream, isDuplex) { // Duplex streams are both readable and writable, but share // the same options object. @@ -77,10 +149,13 @@ function ReadableState(options, stream, isDuplex) { // These options can be provided separately as readableXXX and writableXXX. if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof require('./duplex') + // Bit map field to store ReadableState more effciently with 1 bit per field + // instead of a V8 slot per field. + this.state = kEmitClose | kAutoDestroy | kConstructed | kSync // Object stream flag. Used to make read(n) ignore n and to // make all the buffer merging and length checks go away. - this.objectMode = !!(options && options.objectMode) - if (isDuplex) this.objectMode = this.objectMode || !!(options && options.readableObjectMode) + if (options && options.objectMode) this.state |= kObjectMode + if (isDuplex && options && options.readableObjectMode) this.state |= kObjectMode // The point at which it stops calling _read() to fill the buffer // Note: 0 is a valid value, means "don't call _read preemptively ever" @@ -95,41 +170,13 @@ function ReadableState(options, stream, isDuplex) { this.length = 0 this.pipes = [] this.flowing = null - this.ended = false - this.endEmitted = false - this.reading = false - - // Stream is still being constructed and cannot be - // destroyed until construction finished or failed. - // Async construction is opt in, therefore we start as - // constructed. - this.constructed = true - - // A flag to be able to tell if the event 'readable'/'data' is emitted - // immediately, or on a later tick. We set this to true at first, because - // any actions that shouldn't happen until "later" should generally also - // not happen before the first read call. - this.sync = true - - // Whenever we return null, then we set a flag to say - // that we're awaiting a 'readable' event emission. - this.needReadable = false - this.emittedReadable = false - this.readableListening = false - this.resumeScheduled = false this[kPaused] = null - // True if the error was already emitted and should not be thrown again. - this.errorEmitted = false - // Should close be emitted on destroy. Defaults to true. - this.emitClose = !options || options.emitClose !== false + if (options && options.emitClose === false) this.state &= ~kEmitClose // Should .destroy() be called after 'end' (and potentially 'finish'). - this.autoDestroy = !options || options.autoDestroy !== false - - // Has it been destroyed. - this.destroyed = false + if (options && options.autoDestroy === false) this.state &= ~kAutoDestroy // Indicates whether the stream has errored. When true no further // _read calls, 'data' or 'readable' events should occur. This is needed @@ -137,13 +184,6 @@ function ReadableState(options, stream, isDuplex) { // stream has failed. this.errored = null - // Indicates whether the stream has finished destroying. - this.closed = false - - // True if close has been emitted or would have been emitted - // depending on emitClose. - this.closeEmitted = false - // Crypto is kind of old and crusty. Historically, its default string // encoding is 'binary' so we have to make this configurable. // Everything else in the universe uses 'utf8', though. @@ -152,11 +192,6 @@ function ReadableState(options, stream, isDuplex) { // Ref the piped dest which we need a drain event on it // type: null | Writable | Set. this.awaitDrainWriters = null - this.multiAwaitDrain = false - - // If true, a maybeReadMore has been scheduled. - this.readingMore = false - this.dataEmitted = false this.decoder = null this.encoding = null if (options && options.encoding) { @@ -192,6 +227,14 @@ Readable.prototype._destroy = function (err, cb) { Readable.prototype[EE.captureRejectionSymbol] = function (err) { this.destroy(err) } +Readable.prototype[SymbolAsyncDispose] = function () { + let error + if (!this.destroyed) { + error = this.readableEnded ? null : new AbortError() + this.destroy(error) + } + return new Promise((resolve, reject) => eos(this, (err) => (err && err !== error ? reject(err) : resolve(null)))) +} // Manually shove something into the read() buffer. // This returns true if the highWaterMark has not been hit yet, @@ -209,7 +252,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { debug('readableAddChunk', chunk) const state = stream._readableState let err - if (!state.objectMode) { + if ((state.state & kObjectMode) === 0) { if (typeof chunk === 'string') { encoding = encoding || state.defaultEncoding if (state.encoding !== encoding) { @@ -234,11 +277,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { if (err) { errorOrDestroy(stream, err) } else if (chunk === null) { - state.reading = false + state.state &= ~kReading onEofChunk(stream, state) - } else if (state.objectMode || (chunk && chunk.length > 0)) { + } else if ((state.state & kObjectMode) !== 0 || (chunk && chunk.length > 0)) { if (addToFront) { - if (state.endEmitted) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()) + if ((state.state & kEndEmitted) !== 0) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()) else if (state.destroyed || state.errored) return false else addChunk(stream, state, chunk, true) } else if (state.ended) { @@ -246,7 +289,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { } else if (state.destroyed || state.errored) { return false } else { - state.reading = false + state.state &= ~kReading if (state.decoder && !encoding) { chunk = state.decoder.write(chunk) if (state.objectMode || chunk.length !== 0) addChunk(stream, state, chunk, false) @@ -256,7 +299,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { } } } else if (!addToFront) { - state.reading = false + state.state &= ~kReading maybeReadMore(stream, state) } @@ -269,7 +312,7 @@ function addChunk(stream, state, chunk, addToFront) { if (state.flowing && state.length === 0 && !state.sync && stream.listenerCount('data') > 0) { // Use the guard to avoid creating `Set()` repeatedly // when we have multiple pipes. - if (state.multiAwaitDrain) { + if ((state.state & kMultiAwaitDrain) !== 0) { state.awaitDrainWriters.clear() } else { state.awaitDrainWriters = null @@ -281,7 +324,7 @@ function addChunk(stream, state, chunk, addToFront) { state.length += state.objectMode ? 1 : chunk.length if (addToFront) state.buffer.unshift(chunk) else state.buffer.push(chunk) - if (state.needReadable) emitReadable(stream) + if ((state.state & kNeedReadable) !== 0) emitReadable(stream) } maybeReadMore(stream, state) } @@ -331,7 +374,7 @@ function computeNewHighWaterMark(n) { // changes to the function body. function howMuchToRead(n, state) { if (n <= 0 || (state.length === 0 && state.ended)) return 0 - if (state.objectMode) return 1 + if ((state.state & kObjectMode) !== 0) return 1 if (NumberIsNaN(n)) { // Only flow one buffer at a time. if (state.flowing && state.length) return state.buffer.first().length @@ -356,7 +399,7 @@ Readable.prototype.read = function (n) { // If we're asking for more than the current hwm, then raise the hwm. if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n) - if (n !== 0) state.emittedReadable = false + if (n !== 0) state.state &= ~kEmittedReadable // If we're doing read(0) to trigger a readable event, but we // already have a bunch of data in the buffer, then just trigger @@ -402,7 +445,7 @@ Readable.prototype.read = function (n) { // 3. Actually pull the requested chunks out of the buffer and return. // if we need a readable event, then we need to do some reading. - let doRead = state.needReadable + let doRead = (state.state & kNeedReadable) !== 0 debug('need readable', doRead) // If we currently have less than the highWaterMark, then also read some. @@ -419,10 +462,9 @@ Readable.prototype.read = function (n) { debug('reading, ended or constructing', doRead) } else if (doRead) { debug('do read') - state.reading = true - state.sync = true + state.state |= kReading | kSync // If the length is currently zero, then we *need* a readable event. - if (state.length === 0) state.needReadable = true + if (state.length === 0) state.state |= kNeedReadable // Call internal read method try { @@ -430,7 +472,8 @@ Readable.prototype.read = function (n) { } catch (err) { errorOrDestroy(this, err) } - state.sync = false + state.state &= ~kSync + // If _read pushed data synchronously, then `reading` will be false, // and we need to re-evaluate how much data we can return to the user. if (!state.reading) n = howMuchToRead(nOrig, state) @@ -709,9 +752,7 @@ Readable.prototype.pipe = function (dest, pipeOpts) { // Start the flow if it hasn't been started already. if (dest.writableNeedDrain === true) { - if (state.flowing) { - pause() - } + pause() } else if (!state.flowing) { debug('pipe resume') src.resume() diff --git a/lib/internal/streams/state.js b/lib/internal/streams/state.js index 18c2d845f..79294a04b 100644 --- a/lib/internal/streams/state.js +++ b/lib/internal/streams/state.js @@ -1,12 +1,23 @@ 'use strict' const { MathFloor, NumberIsInteger } = require('../../ours/primordials') +const { validateInteger } = require('../validators') const { ERR_INVALID_ARG_VALUE } = require('../../ours/errors').codes +let defaultHighWaterMarkBytes = 16 * 1024 +let defaultHighWaterMarkObjectMode = 16 function highWaterMarkFrom(options, isDuplex, duplexKey) { return options.highWaterMark != null ? options.highWaterMark : isDuplex ? options[duplexKey] : null } function getDefaultHighWaterMark(objectMode) { - return objectMode ? 16 : 16 * 1024 + return objectMode ? defaultHighWaterMarkObjectMode : defaultHighWaterMarkBytes +} +function setDefaultHighWaterMark(objectMode, value) { + validateInteger(value, 'value', 0) + if (objectMode) { + defaultHighWaterMarkObjectMode = value + } else { + defaultHighWaterMarkBytes = value + } } function getHighWaterMark(state, options, duplexKey, isDuplex) { const hwm = highWaterMarkFrom(options, isDuplex, duplexKey) @@ -23,5 +34,6 @@ function getHighWaterMark(state, options, duplexKey, isDuplex) { } module.exports = { getHighWaterMark, - getDefaultHighWaterMark + getDefaultHighWaterMark, + setDefaultHighWaterMark } diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index e589ad96c..9ced0485a 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -1,10 +1,16 @@ 'use strict' -const { Symbol, SymbolAsyncIterator, SymbolIterator, SymbolFor } = require('../../ours/primordials') -const kDestroyed = Symbol('kDestroyed') -const kIsErrored = Symbol('kIsErrored') -const kIsReadable = Symbol('kIsReadable') -const kIsDisturbed = Symbol('kIsDisturbed') +const { SymbolAsyncIterator, SymbolIterator, SymbolFor } = require('../../ours/primordials') + +// We need to use SymbolFor to make these globally available +// for interopt with readable-stream, i.e. readable-stream +// and node core needs to be able to read/write private state +// from each other for proper interoperability. +const kIsDestroyed = SymbolFor('nodejs.stream.destroyed') +const kIsErrored = SymbolFor('nodejs.stream.errored') +const kIsReadable = SymbolFor('nodejs.stream.readable') +const kIsWritable = SymbolFor('nodejs.stream.writable') +const kIsDisturbed = SymbolFor('nodejs.stream.disturbed') const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise') const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction') function isReadableNodeStream(obj, strict = false) { @@ -87,7 +93,7 @@ function isDestroyed(stream) { const wState = stream._writableState const rState = stream._readableState const state = wState || rState - return !!(stream.destroyed || stream[kDestroyed] || (state !== null && state !== undefined && state.destroyed)) + return !!(stream.destroyed || stream[kIsDestroyed] || (state !== null && state !== undefined && state.destroyed)) } // Have been end():d. @@ -135,6 +141,7 @@ function isReadable(stream) { return isReadableNodeStream(stream) && stream.readable && !isReadableFinished(stream) } function isWritable(stream) { + if (stream && stream[kIsWritable] != null) return stream[kIsWritable] if (typeof (stream === null || stream === undefined ? undefined : stream.writable) !== 'boolean') return null if (isDestroyed(stream)) return false return isWritableNodeStream(stream) && stream.writable && !isWritableEnded(stream) @@ -287,7 +294,8 @@ function isErrored(stream) { ) } module.exports = { - kDestroyed, + isDestroyed, + kIsDestroyed, isDisturbed, kIsDisturbed, isErrored, @@ -296,8 +304,8 @@ module.exports = { kIsReadable, kIsClosedPromise, kControllerErrorFunction, + kIsWritable, isClosed, - isDestroyed, isDuplexNodeStream, isFinished, isIterable, diff --git a/lib/internal/validators.js b/lib/internal/validators.js index 85b2e9cd5..f90068445 100644 --- a/lib/internal/validators.js +++ b/lib/internal/validators.js @@ -50,7 +50,6 @@ const modeDesc = 'must be a 32-bit unsigned integer or an octal string' * converted to 32-bit unsigned integers or non-negative signed integers in the * C++ land, but any value higher than 0o777 will result in platform-specific * behaviors. - * * @param {*} value Values to be validated * @param {string} name Name of the argument * @param {number} [def] If specified, will be returned for invalid values @@ -304,6 +303,26 @@ function validateBooleanArray(value, name) { } } +/** + * @callback validateAbortSignalArray + * @param {*} value + * @param {string} name + * @returns {asserts value is AbortSignal[]} + */ + +/** @type {validateAbortSignalArray} */ +function validateAbortSignalArray(value, name) { + validateArray(value, name) + for (let i = 0; i < value.length; i++) { + const signal = value[i] + const indexedName = `${name}[${i}]` + if (signal == null) { + throw new ERR_INVALID_ARG_TYPE(indexedName, 'AbortSignal', signal) + } + validateAbortSignal(signal, indexedName) + } +} + /** * @param {*} signal * @param {string} [name='signal'] @@ -488,6 +507,7 @@ module.exports = { validateArray, validateStringArray, validateBooleanArray, + validateAbortSignalArray, validateBoolean, validateBuffer, validateDictionary, diff --git a/lib/ours/primordials.js b/lib/ours/primordials.js index 9464cc7fe..00b820533 100644 --- a/lib/ours/primordials.js +++ b/lib/ours/primordials.js @@ -71,6 +71,9 @@ module.exports = { PromiseReject(err) { return Promise.reject(err) }, + PromiseResolve(val) { + return Promise.resolve(val) + }, ReflectApply: Reflect.apply, RegExpPrototypeTest(self, value) { return self.test(value) @@ -94,8 +97,11 @@ module.exports = { SymbolAsyncIterator: Symbol.asyncIterator, SymbolHasInstance: Symbol.hasInstance, SymbolIterator: Symbol.iterator, + SymbolDispose: Symbol.dispose || Symbol('Symbol.dispose'), + SymbolAsyncDispose: Symbol.asyncDispose || Symbol('Symbol.asyncDispose'), TypedArrayPrototypeSet(self, buf, len) { return self.set(buf, len) }, + Boolean: Boolean, Uint8Array } diff --git a/lib/ours/util.js b/lib/ours/util.js index e125ce17a..893368077 100644 --- a/lib/ours/util.js +++ b/lib/ours/util.js @@ -123,6 +123,9 @@ module.exports = { return ArrayBuffer.isView(arr) } }, - isBlob + isBlob, + deprecate(fn, message) { + return fn + } } module.exports.promisify.custom = Symbol.for('nodejs.util.promisify.custom') diff --git a/lib/stream.js b/lib/stream.js index e9bb6ba90..b92b427be 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -34,6 +34,7 @@ const { codes: { ERR_ILLEGAL_CONSTRUCTOR } } = require('./ours/errors') const compose = require('./internal/streams/compose') +const { setDefaultHighWaterMark, getDefaultHighWaterMark } = require('./internal/streams/state') const { pipeline } = require('./internal/streams/pipeline') const { destroyer } = require('./internal/streams/destroy') const eos = require('./internal/streams/end-of-stream') @@ -41,9 +42,11 @@ const internalBuffer = {} const promises = require('./stream/promises') const utils = require('./internal/streams/utils') const Stream = (module.exports = require('./internal/streams/legacy').Stream) +Stream.isDestroyed = utils.isDestroyed Stream.isDisturbed = utils.isDisturbed Stream.isErrored = utils.isErrored Stream.isReadable = utils.isReadable +Stream.isWritable = utils.isWritable Stream.Readable = require('./internal/streams/readable') for (const key of ObjectKeys(streamReturningOperators)) { const op = streamReturningOperators[key] @@ -103,6 +106,8 @@ Stream.addAbortSignal = addAbortSignal Stream.finished = eos Stream.destroy = destroyer Stream.compose = compose +Stream.setDefaultHighWaterMark = setDefaultHighWaterMark +Stream.getDefaultHighWaterMark = getDefaultHighWaterMark ObjectDefineProperty(Stream, 'promises', { __proto__: null, configurable: true, diff --git a/src/primordials.js b/src/primordials.js index 04f432256..9e1b25b4b 100644 --- a/src/primordials.js +++ b/src/primordials.js @@ -72,6 +72,9 @@ module.exports = { PromiseReject(err) { return Promise.reject(err) }, + PromiseResolve(val) { + return Promise.resolve(val) + }, ReflectApply: Reflect.apply, RegExpPrototypeTest(self, value) { return self.test(value) @@ -95,8 +98,11 @@ module.exports = { SymbolAsyncIterator: Symbol.asyncIterator, SymbolHasInstance: Symbol.hasInstance, SymbolIterator: Symbol.iterator, + SymbolDispose: Symbol.dispose || Symbol('Symbol.dispose'), + SymbolAsyncDispose: Symbol.asyncDispose || Symbol('Symbol.asyncDispose'), TypedArrayPrototypeSet(self, buf, len) { return self.set(buf, len) }, + Boolean: Boolean, Uint8Array } diff --git a/src/util.js b/src/util.js index 968643cb6..350055440 100644 --- a/src/util.js +++ b/src/util.js @@ -133,7 +133,10 @@ module.exports = { return ArrayBuffer.isView(arr) } }, - isBlob + isBlob, + deprecate(fn, message) { + return fn + } } module.exports.promisify.custom = Symbol.for('nodejs.util.promisify.custom') diff --git a/test/common/fixtures.mjs b/test/common/fixtures.mjs index 372fabf88..d6f7f6c09 100644 --- a/test/common/fixtures.mjs +++ b/test/common/fixtures.mjs @@ -1,5 +1,17 @@ -import fixtures from './fixtures.js' +import fixtures from './fixtures.js'; -const { fixturesDir, path, fileURL, readSync, readKey } = fixtures +const { + fixturesDir, + path, + fileURL, + readSync, + readKey, +} = fixtures; -export { fixturesDir, path, fileURL, readSync, readKey } +export { + fixturesDir, + path, + fileURL, + readSync, + readKey, +}; diff --git a/test/common/index.js b/test/common/index.js index 9c949a171..c54a8bca3 100644 --- a/test/common/index.js +++ b/test/common/index.js @@ -45,8 +45,28 @@ const { atob, btoa } = require('buffer') if (isMainThread) process.umask(0o022) const noop = () => {} const hasCrypto = Boolean(process.versions.openssl) && !process.env.NODE_SKIP_CRYPTO -const hasOpenSSL3 = hasCrypto && require('crypto').constants.OPENSSL_VERSION_NUMBER >= 805306368 +const hasOpenSSL3 = hasCrypto && require('crypto').constants.OPENSSL_VERSION_NUMBER >= 0x30000000 +const hasOpenSSL31 = hasCrypto && require('crypto').constants.OPENSSL_VERSION_NUMBER >= 0x30100000 const hasQuic = hasCrypto && !!process.config.variables.openssl_quic +function parseTestFlags(filename = process.argv[1]) { + // The copyright notice is relatively big and the flags could come afterwards. + const bytesToRead = 1500 + const buffer = Buffer.allocUnsafe(bytesToRead) + const fd = fs.openSync(filename, 'r') + const bytesRead = fs.readSync(fd, buffer, 0, bytesToRead) + fs.closeSync(fd) + const source = buffer.toString('utf8', 0, bytesRead) + const flagStart = source.search(/\/\/ Flags:\s+--/) + 10 + if (flagStart === 9) { + return [] + } + let flagEnd = source.indexOf('\n', flagStart) + // Normalize different EOL. + if (source[flagEnd - 1] === '\r') { + flagEnd-- + } + return source.substring(flagStart, flagEnd).split(/\s+/).filter(Boolean) +} // Check for flags. Skip this for workers (both, the `cluster` module and // `worker_threads`) and child processes. @@ -60,51 +80,34 @@ if ( require('cluster').isPrimary && fs.existsSync(process.argv[1]) ) { - // The copyright notice is relatively big and the flags could come afterwards. - const bytesToRead = 1500 - const buffer = Buffer.allocUnsafe(bytesToRead) - const fd = fs.openSync(process.argv[1], 'r') - const bytesRead = fs.readSync(fd, buffer, 0, bytesToRead) - fs.closeSync(fd) - const source = buffer.toString('utf8', 0, bytesRead) - const flagStart = source.indexOf('// Flags: --') + 10 - if (flagStart !== 9) { - let flagEnd = source.indexOf('\n', flagStart) - // Normalize different EOL. - if (source[flagEnd - 1] === '\r') { - flagEnd-- - } - const flags = source.substring(flagStart, flagEnd).replace(/_/g, '-').split(' ') - const args = process.execArgv.map((arg) => arg.replace(/_/g, '-')) - for (const flag of flags) { - if ( - !args.includes(flag) && - // If the binary is build without `intl` the inspect option is - // invalid. The test itself should handle this case. - (process.features.inspector || !flag.startsWith('--inspect')) - ) { - console.log( - 'NOTE: The test started as a child_process using these flags:', - inspect(flags), - 'Use NODE_SKIP_FLAG_CHECK to run the test with the original flags.' - ) - const args = [...flags, ...process.execArgv, ...process.argv.slice(1)] - const options = { - encoding: 'utf8', - stdio: 'inherit' - } - const result = spawnSync(process.execPath, args, options) - if (result.signal) { - process.kill(0, result.signal) - } else { - process.exit(result.status) - } + const flags = parseTestFlags() + for (const flag of flags) { + if ( + !process.execArgv.includes(flag) && + // If the binary is build without `intl` the inspect option is + // invalid. The test itself should handle this case. + (process.features.inspector || !flag.startsWith('--inspect')) + ) { + console.log( + 'NOTE: The test started as a child_process using these flags:', + inspect(flags), + 'Use NODE_SKIP_FLAG_CHECK to run the test with the original flags.' + ) + const args = [...flags, ...process.execArgv, ...process.argv.slice(1)] + const options = { + encoding: 'utf8', + stdio: 'inherit' + } + const result = spawnSync(process.execPath, args, options) + if (result.signal) { + process.kill(0, result.signal) + } else { + process.exit(result.status) } } } } const isWindows = process.platform === 'win32' -const isAIX = process.platform === 'aix' const isSunOS = process.platform === 'sunos' const isFreeBSD = process.platform === 'freebsd' const isOpenBSD = process.platform === 'openbsd' @@ -240,7 +243,7 @@ function platformTimeout(ms) { seven: 7 } if (process.features.debug) ms = multipliers.two * ms - if (isAIX) return multipliers.two * ms // Default localhost speed is slower on AIX + if (exports.isAIX || exports.isIBMi) return multipliers.two * ms // Default localhost speed is slower on AIX if (isPi) return multipliers.two * ms // Raspberry Pi devices @@ -251,6 +254,7 @@ let knownGlobals = [ typeof AbortController !== 'undefined' ? AbortController : require('abort-controller').AbortController, typeof AbortSignal !== 'undefined' ? AbortSignal : require('abort-controller').AbortSignal, typeof EventTarget !== 'undefined' ? EventTarget : require('event-target-shim').EventTarget, + typeof navigator !== 'undefined' ? navigator : {}, atob, btoa, clearImmediate, @@ -823,10 +827,10 @@ const common = { hasIntl, hasCrypto, hasOpenSSL3, + hasOpenSSL31, hasQuic, hasMultiLocalhost, invalidArgTypeHelper, - isAIX, isAlive, isAsan, isDumbTerminal, @@ -846,6 +850,7 @@ const common = { mustSucceed, nodeProcessAborted, PIPE, + parseTestFlags, platformTimeout, printSkipMessage, pwdCommand, @@ -867,7 +872,14 @@ const common = { }, get hasIPv6() { const iFaces = require('os').networkInterfaces() - const re = isWindows ? /Loopback Pseudo-Interface/ : /lo/ + let re + if (isWindows) { + re = /Loopback Pseudo-Interface/ + } else if (this.isIBMi) { + re = /\*LOOPBACK/ + } else { + re = /lo/ + } return Object.keys(iFaces).some((name) => { return re.test(name) && iFaces[name].some(({ family }) => family === 'IPv6') }) @@ -882,7 +894,11 @@ const common = { return inFreeBSDJail }, // On IBMi, process.platform and os.platform() both return 'aix', + // when built with Python versions earlier than 3.9. // It is not enough to differentiate between IBMi and real AIX system. + get isAIX() { + return require('os').type() === 'AIX' + }, get isIBMi() { return require('os').type() === 'OS400' }, diff --git a/test/common/index.mjs b/test/common/index.mjs index d524b2ba5..ca2994f6e 100644 --- a/test/common/index.mjs +++ b/test/common/index.mjs @@ -1,106 +1,108 @@ -import { createRequire } from 'module' +import { createRequire } from 'module'; -const require = createRequire(import.meta.url) -const common = require('./index.js') +const require = createRequire(import.meta.url); +const common = require('./index.js'); const { - isMainThread, - isWindows, + allowGlobals, + buildType, + canCreateSymLink, + checkoutEOL, + childShouldThrowAndAbort, + createZeroFilledFile, + enoughTestMem, + expectsError, + expectWarning, + getArrayBufferViews, + getBufferSources, + getCallSite, + getTTYfd, + hasCrypto, + hasIPv6, + hasMultiLocalhost, isAIX, - isIBMi, - isLinuxPPCBE, - isSunOS, + isAlive, isDumbTerminal, isFreeBSD, - isOpenBSD, + isIBMi, isLinux, + isLinuxPPCBE, + isMainThread, + isOpenBSD, isOSX, - enoughTestMem, - buildType, + isSunOS, + isWindows, localIPv6Hosts, - opensslCli, - PIPE, - hasCrypto, - hasIPv6, - childShouldThrowAndAbort, - checkoutEOL, - createZeroFilledFile, - platformTimeout, - allowGlobals, mustCall, mustCallAtLeast, - mustSucceed, - hasMultiLocalhost, - skipIfDumbTerminal, - skipIfEslintMissing, - canCreateSymLink, - getCallSite, mustNotCall, mustNotMutateObjectDeep, + mustSucceed, + nodeProcessAborted, + opensslCli, + parseTestFlags, + PIPE, + platformTimeout, printSkipMessage, + runWithInvalidFD, skip, - nodeProcessAborted, - isAlive, - expectWarning, - expectsError, - skipIfInspectorDisabled, skipIf32Bits, - getArrayBufferViews, - getBufferSources, - getTTYfd, - runWithInvalidFD, - spawnPromisified -} = common + skipIfDumbTerminal, + skipIfEslintMissing, + skipIfInspectorDisabled, + spawnPromisified, +} = common; -const getPort = () => common.PORT +const getPort = () => common.PORT; export { - isMainThread, - isWindows, + allowGlobals, + buildType, + canCreateSymLink, + checkoutEOL, + childShouldThrowAndAbort, + createRequire, + createZeroFilledFile, + enoughTestMem, + expectsError, + expectWarning, + getArrayBufferViews, + getBufferSources, + getCallSite, + getPort, + getTTYfd, + hasCrypto, + hasIPv6, + hasMultiLocalhost, isAIX, - isIBMi, - isLinuxPPCBE, - isSunOS, + isAlive, isDumbTerminal, isFreeBSD, - isOpenBSD, + isIBMi, isLinux, + isLinuxPPCBE, + isMainThread, + isOpenBSD, isOSX, - enoughTestMem, - buildType, + isSunOS, + isWindows, localIPv6Hosts, - opensslCli, - PIPE, - hasCrypto, - hasIPv6, - childShouldThrowAndAbort, - checkoutEOL, - createZeroFilledFile, - platformTimeout, - allowGlobals, mustCall, mustCallAtLeast, - mustSucceed, - hasMultiLocalhost, - skipIfDumbTerminal, - skipIfEslintMissing, - canCreateSymLink, - getCallSite, mustNotCall, mustNotMutateObjectDeep, + mustSucceed, + nodeProcessAborted, + opensslCli, + parseTestFlags, + PIPE, + platformTimeout, printSkipMessage, + runWithInvalidFD, skip, - nodeProcessAborted, - isAlive, - expectWarning, - expectsError, - skipIfInspectorDisabled, skipIf32Bits, - getArrayBufferViews, - getBufferSources, - getTTYfd, - runWithInvalidFD, - createRequire, + skipIfDumbTerminal, + skipIfEslintMissing, + skipIfInspectorDisabled, spawnPromisified, - getPort -} +}; diff --git a/test/common/tmpdir.js b/test/common/tmpdir.js index 641d65e01..95931ec38 100644 --- a/test/common/tmpdir.js +++ b/test/common/tmpdir.js @@ -2,6 +2,7 @@ const fs = require('fs') const path = require('path') +const { pathToFileURL } = require('url') const { isMainThread } = require('worker_threads') function rmSync(pathname) { fs.rmSync(pathname, { @@ -45,7 +46,22 @@ function onexit() { throw e } } +function resolve(...paths) { + return path.resolve(tmpPath, ...paths) +} +function hasEnoughSpace(size) { + const { bavail, bsize } = fs.statfsSync(tmpPath) + return bavail >= Math.ceil(size / bsize) +} +function fileURL(...paths) { + // When called without arguments, add explicit trailing slash + const fullPath = path.resolve(tmpPath + path.sep, ...paths) + return pathToFileURL(fullPath) +} module.exports = { + fileURL, + hasEnoughSpace, path: tmpPath, - refresh + refresh, + resolve } diff --git a/test/fixtures/tz-version.txt b/test/fixtures/tz-version.txt index b74fa117a..7daa77e00 100644 --- a/test/fixtures/tz-version.txt +++ b/test/fixtures/tz-version.txt @@ -1 +1 @@ -2022g +2023c diff --git a/test/parallel/test-stream-asIndexedPairs.mjs b/test/parallel/test-stream-asIndexedPairs.mjs index 35919114a..a103920ee 100644 --- a/test/parallel/test-stream-asIndexedPairs.mjs +++ b/test/parallel/test-stream-asIndexedPairs.mjs @@ -1,82 +1,64 @@ -import '../common/index.mjs' -import { Readable } from '../../lib/ours/index.js' -import { deepStrictEqual, rejects, throws } from 'assert' -import tap from 'tap' +import '../common/index.mjs'; +import { Readable }from '../../lib/ours/index.js'; +import { deepStrictEqual, rejects, throws } from 'assert'; +import tap from 'tap'; { // asIndexedPairs with a synchronous stream - const pairs = await Readable.from([1, 2, 3]).asIndexedPairs().toArray() - deepStrictEqual(pairs, [ - [0, 1], - [1, 2], - [2, 3] - ]) - const empty = await Readable.from([]).asIndexedPairs().toArray() - deepStrictEqual(empty, []) + const pairs = await Readable.from([1, 2, 3]).asIndexedPairs().toArray(); + deepStrictEqual(pairs, [[0, 1], [1, 2], [2, 3]]); + const empty = await Readable.from([]).asIndexedPairs().toArray(); + deepStrictEqual(empty, []); } { // asIndexedPairs works an asynchronous streams - const asyncFrom = (...args) => Readable.from(...args).map(async (x) => x) - const pairs = await asyncFrom([1, 2, 3]).asIndexedPairs().toArray() - deepStrictEqual(pairs, [ - [0, 1], - [1, 2], - [2, 3] - ]) - const empty = await asyncFrom([]).asIndexedPairs().toArray() - deepStrictEqual(empty, []) + const asyncFrom = (...args) => Readable.from(...args).map(async (x) => x); + const pairs = await asyncFrom([1, 2, 3]).asIndexedPairs().toArray(); + deepStrictEqual(pairs, [[0, 1], [1, 2], [2, 3]]); + const empty = await asyncFrom([]).asIndexedPairs().toArray(); + deepStrictEqual(empty, []); } { // Does not enumerate an infinite stream - const infinite = () => - Readable.from( - (async function* () { - while (true) yield 1 - })() - ) - const pairs = await infinite().asIndexedPairs().take(3).toArray() - deepStrictEqual(pairs, [ - [0, 1], - [1, 1], - [2, 1] - ]) - const empty = await infinite().asIndexedPairs().take(0).toArray() - deepStrictEqual(empty, []) + const infinite = () => Readable.from(async function* () { + while (true) yield 1; + }()); + const pairs = await infinite().asIndexedPairs().take(3).toArray(); + deepStrictEqual(pairs, [[0, 1], [1, 1], [2, 1]]); + const empty = await infinite().asIndexedPairs().take(0).toArray(); + deepStrictEqual(empty, []); } { // AbortSignal - await rejects( - async () => { - const ac = new AbortController() - const { signal } = ac - const p = Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray() - ac.abort() - await p - }, - { name: 'AbortError' } - ) + await rejects(async () => { + const ac = new AbortController(); + const { signal } = ac; + const p = Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray(); + ac.abort(); + await p; + }, { name: 'AbortError' }); await rejects(async () => { - const signal = AbortSignal.abort() - await Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray() - }, /AbortError/) + const signal = AbortSignal.abort(); + await Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray(); + }, /AbortError/); } { // Error cases - throws(() => Readable.from([1]).asIndexedPairs(1), /ERR_INVALID_ARG_TYPE/) - throws(() => Readable.from([1]).asIndexedPairs({ signal: true }), /ERR_INVALID_ARG_TYPE/) + throws(() => Readable.from([1]).asIndexedPairs(1), /ERR_INVALID_ARG_TYPE/); + throws(() => Readable.from([1]).asIndexedPairs({ signal: true }), /ERR_INVALID_ARG_TYPE/); } -/* replacement start */ -process.on('beforeExit', (code) => { - if (code === 0) { - tap.pass('test succeeded') - } else { - tap.fail(`test failed - exited code ${code}`) - } -}) -/* replacement end */ + /* replacement start */ + process.on('beforeExit', (code) => { + if(code === 0) { + tap.pass('test succeeded'); + } else { + tap.fail(`test failed - exited code ${code}`); + } + }); + /* replacement end */ diff --git a/test/parallel/test-stream-compose.js b/test/parallel/test-stream-compose.js index 85ec5134e..cbd9e96bf 100644 --- a/test/parallel/test-stream-compose.js +++ b/test/parallel/test-stream-compose.js @@ -484,6 +484,77 @@ const assert = require('assert') }) ) } +{ + // In the new stream than should use the writeable of the first stream and readable of the last stream + // #46829 + ;(async () => { + const newStream = compose( + new PassThrough({ + // reading FROM you in object mode or not + readableObjectMode: false, + // writing TO you in object mode or not + writableObjectMode: false + }), + new Transform({ + // reading FROM you in object mode or not + readableObjectMode: true, + // writing TO you in object mode or not + writableObjectMode: false, + transform: (chunk, encoding, callback) => { + callback(null, { + value: chunk.toString() + }) + } + }) + ) + assert.strictEqual(newStream.writableObjectMode, false) + assert.strictEqual(newStream.readableObjectMode, true) + newStream.write('Steve Rogers') + newStream.write('On your left') + newStream.end() + assert.deepStrictEqual(await newStream.toArray(), [ + { + value: 'Steve Rogers' + }, + { + value: 'On your left' + } + ]) + })().then(common.mustCall()) +} +{ + // In the new stream than should use the writeable of the first stream and readable of the last stream + // #46829 + ;(async () => { + const newStream = compose( + new PassThrough({ + // reading FROM you in object mode or not + readableObjectMode: true, + // writing TO you in object mode or not + writableObjectMode: true + }), + new Transform({ + // reading FROM you in object mode or not + readableObjectMode: false, + // writing TO you in object mode or not + writableObjectMode: true, + transform: (chunk, encoding, callback) => { + callback(null, chunk.value) + } + }) + ) + assert.strictEqual(newStream.writableObjectMode, true) + assert.strictEqual(newStream.readableObjectMode, false) + newStream.write({ + value: 'Steve Rogers' + }) + newStream.write({ + value: 'On your left' + }) + newStream.end() + assert.deepStrictEqual(await newStream.toArray(), [Buffer.from('Steve RogersOn your left')]) + })().then(common.mustCall()) +} /* replacement start */ process.on('beforeExit', (code) => { diff --git a/test/parallel/test-stream-drop-take.js b/test/parallel/test-stream-drop-take.js index 49622b690..7d6f0a01e 100644 --- a/test/parallel/test-stream-drop-take.js +++ b/test/parallel/test-stream-drop-take.js @@ -19,7 +19,7 @@ const silentConsole = { } const common = require('../common') const { Readable } = require('../../lib/ours/index') -const { deepStrictEqual, rejects, throws } = require('assert') +const { deepStrictEqual, rejects, throws, strictEqual } = require('assert') const { from } = Readable const fromAsync = (...args) => from(...args).map(async (x) => x) const naturals = () => @@ -62,6 +62,30 @@ const naturals = () => deepStrictEqual(await naturals().take(5).take(1).toArray(), [1]) })().then(common.mustCall()) } + +// Don't wait for next item in the original stream when already consumed the requested take amount +{ + let reached = false + let resolve + const promise = new Promise((res) => (resolve = res)) + const stream = from( + (async function* () { + yield 1 + await promise + reached = true + yield 2 + })() + ) + stream + .take(1) + .toArray() + .then( + common.mustCall(() => { + strictEqual(reached, false) + }) + ) + .finally(() => resolve()) +} { // Coercion ;(async () => { diff --git a/test/parallel/test-stream-duplex-from.js b/test/parallel/test-stream-duplex-from.js index 83ce0e82e..d198586fb 100644 --- a/test/parallel/test-stream-duplex-from.js +++ b/test/parallel/test-stream-duplex-from.js @@ -8,6 +8,17 @@ const silentConsole = { const common = require('../common') const assert = require('assert') const { Duplex, Readable, Writable, pipeline, PassThrough } = require('../../lib/ours/index') +function makeATestReadableStream(value) { + return Readable.from([value]) +} +function makeATestWritableStream(writeFunc) { + return new Writable({ + write(chunk, enc, cb) { + writeFunc(chunk) + cb() + } + }) +} const Blob = globalThis.Blob || require('buffer').Blob { const d = Duplex.from({ @@ -359,6 +370,114 @@ if (typeof Blob !== 'undefined') { ) .on('close', common.mustCall()) } +function makeATestReadableStreamOff(value) { + return new ReadableStream({ + start(controller) { + controller.enqueue(value) + controller.close() + } + }) +} +function makeATestWritableStreamOff(writeFunc) { + return new WritableStream({ + write(chunk) { + writeFunc(chunk) + } + }) +} +{ + const d = Duplex.from({ + readable: makeATestReadableStream('foo') + }) + assert.strictEqual(d.readable, true) + assert.strictEqual(d.writable, false) + d.on( + 'data', + common.mustCall((data) => { + assert.strictEqual(data.toString(), 'foo') + }) + ) + d.on( + 'end', + common.mustCall(() => { + assert.strictEqual(d.readable, false) + }) + ) +} +{ + const d = Duplex.from(makeATestReadableStream('foo')) + assert.strictEqual(d.readable, true) + assert.strictEqual(d.writable, false) + d.on( + 'data', + common.mustCall((data) => { + assert.strictEqual(data.toString(), 'foo') + }) + ) + d.on( + 'end', + common.mustCall(() => { + assert.strictEqual(d.readable, false) + }) + ) +} +{ + let ret = '' + const d = Duplex.from({ + writable: makeATestWritableStream((chunk) => (ret += chunk)) + }) + assert.strictEqual(d.readable, false) + assert.strictEqual(d.writable, true) + d.end('foo') + d.on( + 'finish', + common.mustCall(() => { + assert.strictEqual(ret, 'foo') + assert.strictEqual(d.writable, false) + }) + ) +} +{ + let ret = '' + const d = Duplex.from(makeATestWritableStream((chunk) => (ret += chunk))) + assert.strictEqual(d.readable, false) + assert.strictEqual(d.writable, true) + d.end('foo') + d.on( + 'finish', + common.mustCall(() => { + assert.strictEqual(ret, 'foo') + assert.strictEqual(d.writable, false) + }) + ) +} +{ + let ret = '' + const d = Duplex.from({ + readable: makeATestReadableStream('foo'), + writable: makeATestWritableStream((chunk) => (ret += chunk)) + }) + d.end('bar') + d.on( + 'data', + common.mustCall((data) => { + assert.strictEqual(data.toString(), 'foo') + }) + ) + d.on( + 'end', + common.mustCall(() => { + assert.strictEqual(d.readable, false) + }) + ) + d.on( + 'finish', + common.mustCall(() => { + assert.strictEqual(ret, 'bar') + assert.strictEqual(d.writable, false) + }) + ) +} /* replacement start */ process.on('beforeExit', (code) => { diff --git a/test/parallel/test-stream-filter.js b/test/parallel/test-stream-filter.js index 277cc38a0..a6b30f897 100644 --- a/test/parallel/test-stream-filter.js +++ b/test/parallel/test-stream-filter.js @@ -201,7 +201,7 @@ function setTimeout(ms) { { const stream = Readable.from([1, 2, 3, 4, 5]) Object.defineProperty(stream, 'map', { - value: common.mustNotCall(() => {}) + value: common.mustNotCall() }) // Check that map isn't getting called. stream.filter(() => true) diff --git a/test/parallel/test-stream-flatMap.js b/test/parallel/test-stream-flatMap.js index f4e778712..446f30dc9 100644 --- a/test/parallel/test-stream-flatMap.js +++ b/test/parallel/test-stream-flatMap.js @@ -178,7 +178,7 @@ function oneTo5() { { const stream = oneTo5() Object.defineProperty(stream, 'map', { - value: common.mustNotCall(() => {}) + value: common.mustNotCall() }) // Check that map isn't getting called. stream.flatMap(() => true) diff --git a/test/parallel/test-stream-forEach.js b/test/parallel/test-stream-forEach.js index 784078a9f..80115e917 100644 --- a/test/parallel/test-stream-forEach.js +++ b/test/parallel/test-stream-forEach.js @@ -135,7 +135,8 @@ const { once } = require('events') }, { signal: ac.signal, - concurrency: 2 + concurrency: 2, + highWaterMark: 0 } ) // pump @@ -182,7 +183,7 @@ const { once } = require('events') { const stream = Readable.from([1, 2, 3, 4, 5]) Object.defineProperty(stream, 'map', { - value: common.mustNotCall(() => {}) + value: common.mustNotCall() }) // Check that map isn't getting called. stream.forEach(() => true) diff --git a/test/parallel/test-stream-iterator-helpers-test262-tests.mjs b/test/parallel/test-stream-iterator-helpers-test262-tests.mjs index 8231f80ce..5a17c5f7f 100644 --- a/test/parallel/test-stream-iterator-helpers-test262-tests.mjs +++ b/test/parallel/test-stream-iterator-helpers-test262-tests.mjs @@ -1,7 +1,7 @@ -import { mustCall } from '../common/index.mjs' -import { Readable } from '../../lib/ours/index.js' -import assert from 'assert' -import tap from 'tap' +import { mustCall } from '../common/index.mjs'; +import { Readable }from '../../lib/ours/index.js'; +import assert from 'assert'; +import tap from 'tap'; // These tests are manually ported from the draft PR for the test262 test suite // Authored by Rick Waldron in https://github.com/tc39/test262/pull/2818/files @@ -46,131 +46,132 @@ import tap from 'tap' // * Ecma International Standards hereafter means Ecma International Standards // as well as Ecma Technical Reports + // Note all the tests that check AsyncIterator's prototype itself and things // that happen before stream conversion were not ported. { // asIndexedPairs/is-function - assert.strictEqual(typeof Readable.prototype.asIndexedPairs, 'function') + assert.strictEqual(typeof Readable.prototype.asIndexedPairs, 'function'); // asIndexedPairs/indexed-pairs.js - const iterator = Readable.from([0, 1]) - const indexedPairs = iterator.asIndexedPairs() + const iterator = Readable.from([0, 1]); + const indexedPairs = iterator.asIndexedPairs(); for await (const [i, v] of indexedPairs) { - assert.strictEqual(i, v) + assert.strictEqual(i, v); } // asIndexedPairs/length.js - assert.strictEqual(Readable.prototype.asIndexedPairs.length, 0) - // asIndexedPairs/name.js - assert.strictEqual(Readable.prototype.asIndexedPairs.name, 'asIndexedPairs') - const descriptor = Object.getOwnPropertyDescriptor(Readable.prototype, 'asIndexedPairs') - assert.strictEqual(descriptor.enumerable, false) - assert.strictEqual(descriptor.configurable, true) - assert.strictEqual(descriptor.writable, true) + assert.strictEqual(Readable.prototype.asIndexedPairs.length, 0); + const descriptor = Object.getOwnPropertyDescriptor( + Readable.prototype, + 'asIndexedPairs' + ); + assert.strictEqual(descriptor.enumerable, false); + assert.strictEqual(descriptor.configurable, true); + assert.strictEqual(descriptor.writable, true); } { // drop/length - assert.strictEqual(Readable.prototype.drop.length, 1) - const descriptor = Object.getOwnPropertyDescriptor(Readable.prototype, 'drop') - assert.strictEqual(descriptor.enumerable, false) - assert.strictEqual(descriptor.configurable, true) - assert.strictEqual(descriptor.writable, true) + assert.strictEqual(Readable.prototype.drop.length, 1); + const descriptor = Object.getOwnPropertyDescriptor( + Readable.prototype, + 'drop' + ); + assert.strictEqual(descriptor.enumerable, false); + assert.strictEqual(descriptor.configurable, true); + assert.strictEqual(descriptor.writable, true); // drop/limit-equals-total - const iterator = Readable.from([1, 2]).drop(2) - const result = await iterator[Symbol.asyncIterator]().next() - assert.deepStrictEqual(result, { done: true, value: undefined }) + const iterator = Readable.from([1, 2]).drop(2); + const result = await iterator[Symbol.asyncIterator]().next(); + assert.deepStrictEqual(result, { done: true, value: undefined }); // drop/limit-greater-than-total.js - const iterator2 = Readable.from([1, 2]).drop(3) - const result2 = await iterator2[Symbol.asyncIterator]().next() - assert.deepStrictEqual(result2, { done: true, value: undefined }) + const iterator2 = Readable.from([1, 2]).drop(3); + const result2 = await iterator2[Symbol.asyncIterator]().next(); + assert.deepStrictEqual(result2, { done: true, value: undefined }); // drop/limit-less-than-total.js - const iterator3 = Readable.from([1, 2]).drop(1) - const result3 = await iterator3[Symbol.asyncIterator]().next() - assert.deepStrictEqual(result3, { done: false, value: 2 }) + const iterator3 = Readable.from([1, 2]).drop(1); + const result3 = await iterator3[Symbol.asyncIterator]().next(); + assert.deepStrictEqual(result3, { done: false, value: 2 }); // drop/limit-rangeerror - assert.throws(() => Readable.from([1]).drop(-1), RangeError) + assert.throws(() => Readable.from([1]).drop(-1), RangeError); assert.throws(() => { Readable.from([1]).drop({ valueOf() { - throw new Error('boom') + throw new Error('boom'); } - }) - }, /boom/) + }); + }, /boom/); // drop/limit-tointeger - const two = await Readable.from([1, 2]) - .drop({ valueOf: () => 1 }) - .toArray() - assert.deepStrictEqual(two, [2]) + const two = await Readable.from([1, 2]).drop({ valueOf: () => 1 }).toArray(); + assert.deepStrictEqual(two, [2]); // drop/name - assert.strictEqual(Readable.prototype.drop.name, 'drop') + assert.strictEqual(Readable.prototype.drop.name, 'drop'); // drop/non-constructible - assert.throws(() => new Readable.prototype.drop(1), TypeError) + assert.throws(() => new Readable.prototype.drop(1), TypeError); // drop/proto - const proto = Object.getPrototypeOf(Readable.prototype.drop) - assert.strictEqual(proto, Function.prototype) + const proto = Object.getPrototypeOf(Readable.prototype.drop); + assert.strictEqual(proto, Function.prototype); } { // every/abrupt-iterator-close - const stream = Readable.from([1, 2, 3]) - const e = new Error() - await assert.rejects( - stream.every( - mustCall(() => { - throw e - }, 1) - ), - e - ) + const stream = Readable.from([1, 2, 3]); + const e = new Error(); + await assert.rejects(stream.every(mustCall(() => { + throw e; + }, 1)), e); } { // every/callable-fn - await assert.rejects(Readable.from([1, 2]).every({}), TypeError) + await assert.rejects(Readable.from([1, 2]).every({}), TypeError); } { // every/callable - Readable.prototype.every.call(Readable.from([]), () => {}) + Readable.prototype.every.call(Readable.from([]), () => {}); // eslint-disable-next-line array-callback-return - Readable.from([]).every(() => {}) + Readable.from([]).every(() => {}); assert.throws(() => { - const r = Readable.from([]) - new r.every(() => {}) - }, TypeError) + const r = Readable.from([]); + new r.every(() => {}); + }, TypeError); } { // every/false - const iterator = Readable.from([1, 2, 3]) - const result = await iterator.every((v) => v === 1) - assert.strictEqual(result, false) + const iterator = Readable.from([1, 2, 3]); + const result = await iterator.every((v) => v === 1); + assert.strictEqual(result, false); } { // every/every - const iterator = Readable.from([1, 2, 3]) - const result = await iterator.every((v) => true) - assert.strictEqual(result, true) + const iterator = Readable.from([1, 2, 3]); + const result = await iterator.every((v) => true); + assert.strictEqual(result, true); } { // every/is-function - assert.strictEqual(typeof Readable.prototype.every, 'function') + assert.strictEqual(typeof Readable.prototype.every, 'function'); } { // every/length - assert.strictEqual(Readable.prototype.every.length, 1) + assert.strictEqual(Readable.prototype.every.length, 1); // every/name - assert.strictEqual(Readable.prototype.every.name, 'every') + assert.strictEqual(Readable.prototype.every.name, 'every'); // every/propdesc - const descriptor = Object.getOwnPropertyDescriptor(Readable.prototype, 'every') - assert.strictEqual(descriptor.enumerable, false) - assert.strictEqual(descriptor.configurable, true) - assert.strictEqual(descriptor.writable, true) + const descriptor = Object.getOwnPropertyDescriptor( + Readable.prototype, + 'every' + ); + assert.strictEqual(descriptor.enumerable, false); + assert.strictEqual(descriptor.configurable, true); + assert.strictEqual(descriptor.writable, true); } -/* replacement start */ -process.on('beforeExit', (code) => { - if (code === 0) { - tap.pass('test succeeded') - } else { - tap.fail(`test failed - exited code ${code}`) - } -}) -/* replacement end */ + /* replacement start */ + process.on('beforeExit', (code) => { + if(code === 0) { + tap.pass('test succeeded'); + } else { + tap.fail(`test failed - exited code ${code}`); + } + }); + /* replacement end */ diff --git a/test/parallel/test-stream-readable-destroy.js b/test/parallel/test-stream-readable-destroy.js index 4d71f3c44..94b90791c 100644 --- a/test/parallel/test-stream-readable-destroy.js +++ b/test/parallel/test-stream-readable-destroy.js @@ -243,7 +243,7 @@ const assert = require('assert') } { const read = new Readable({ - read: common.mustNotCall(function () {}) + read: common.mustNotCall() }) read.destroy() assert.strictEqual(read.destroyed, true) diff --git a/test/parallel/test-stream-readable-next-no-null.js b/test/parallel/test-stream-readable-next-no-null.js index 5e2fc01b1..4f7d01e3d 100644 --- a/test/parallel/test-stream-readable-next-no-null.js +++ b/test/parallel/test-stream-readable-next-no-null.js @@ -19,10 +19,7 @@ stream.on( message: 'May not write null values to stream' }) ) -stream.on( - 'data', - mustNotCall((chunk) => {}) -) +stream.on('data', mustNotCall()) stream.on('end', mustNotCall()) /* replacement start */ diff --git a/test/parallel/test-stream-some-find-every.mjs b/test/parallel/test-stream-some-find-every.mjs index 30298d0d0..c6168fbd6 100644 --- a/test/parallel/test-stream-some-find-every.mjs +++ b/test/parallel/test-stream-some-find-every.mjs @@ -1,215 +1,183 @@ -import * as common from '../common/index.mjs' -import { setTimeout } from 'timers/promises' -import { Readable } from '../../lib/ours/index.js' -import assert from 'assert' -import tap from 'tap' +import * as common from '../common/index.mjs'; +import { setTimeout } from 'timers/promises'; +import { Readable }from '../../lib/ours/index.js'; +import assert from 'assert'; +import tap from 'tap'; + function oneTo5() { - return Readable.from([1, 2, 3, 4, 5]) + return Readable.from([1, 2, 3, 4, 5]); } function oneTo5Async() { return oneTo5().map(async (x) => { - await Promise.resolve() - return x - }) + await Promise.resolve(); + return x; + }); } { // Some, find, and every work with a synchronous stream and predicate - assert.strictEqual(await oneTo5().some((x) => x > 3), true) - assert.strictEqual(await oneTo5().every((x) => x > 3), false) - assert.strictEqual(await oneTo5().find((x) => x > 3), 4) - assert.strictEqual(await oneTo5().some((x) => x > 6), false) - assert.strictEqual(await oneTo5().every((x) => x < 6), true) - assert.strictEqual(await oneTo5().find((x) => x > 6), undefined) - assert.strictEqual(await Readable.from([]).some(() => true), false) - assert.strictEqual(await Readable.from([]).every(() => true), true) - assert.strictEqual(await Readable.from([]).find(() => true), undefined) + assert.strictEqual(await oneTo5().some((x) => x > 3), true); + assert.strictEqual(await oneTo5().every((x) => x > 3), false); + assert.strictEqual(await oneTo5().find((x) => x > 3), 4); + assert.strictEqual(await oneTo5().some((x) => x > 6), false); + assert.strictEqual(await oneTo5().every((x) => x < 6), true); + assert.strictEqual(await oneTo5().find((x) => x > 6), undefined); + assert.strictEqual(await Readable.from([]).some(() => true), false); + assert.strictEqual(await Readable.from([]).every(() => true), true); + assert.strictEqual(await Readable.from([]).find(() => true), undefined); } { // Some, find, and every work with an asynchronous stream and synchronous predicate - assert.strictEqual(await oneTo5Async().some((x) => x > 3), true) - assert.strictEqual(await oneTo5Async().every((x) => x > 3), false) - assert.strictEqual(await oneTo5Async().find((x) => x > 3), 4) - assert.strictEqual(await oneTo5Async().some((x) => x > 6), false) - assert.strictEqual(await oneTo5Async().every((x) => x < 6), true) - assert.strictEqual(await oneTo5Async().find((x) => x > 6), undefined) + assert.strictEqual(await oneTo5Async().some((x) => x > 3), true); + assert.strictEqual(await oneTo5Async().every((x) => x > 3), false); + assert.strictEqual(await oneTo5Async().find((x) => x > 3), 4); + assert.strictEqual(await oneTo5Async().some((x) => x > 6), false); + assert.strictEqual(await oneTo5Async().every((x) => x < 6), true); + assert.strictEqual(await oneTo5Async().find((x) => x > 6), undefined); } { // Some, find, and every work on synchronous streams with an asynchronous predicate - assert.strictEqual(await oneTo5().some(async (x) => x > 3), true) - assert.strictEqual(await oneTo5().every(async (x) => x > 3), false) - assert.strictEqual(await oneTo5().find(async (x) => x > 3), 4) - assert.strictEqual(await oneTo5().some(async (x) => x > 6), false) - assert.strictEqual(await oneTo5().every(async (x) => x < 6), true) - assert.strictEqual(await oneTo5().find(async (x) => x > 6), undefined) + assert.strictEqual(await oneTo5().some(async (x) => x > 3), true); + assert.strictEqual(await oneTo5().every(async (x) => x > 3), false); + assert.strictEqual(await oneTo5().find(async (x) => x > 3), 4); + assert.strictEqual(await oneTo5().some(async (x) => x > 6), false); + assert.strictEqual(await oneTo5().every(async (x) => x < 6), true); + assert.strictEqual(await oneTo5().find(async (x) => x > 6), undefined); } { // Some, find, and every work on asynchronous streams with an asynchronous predicate - assert.strictEqual(await oneTo5Async().some(async (x) => x > 3), true) - assert.strictEqual(await oneTo5Async().every(async (x) => x > 3), false) - assert.strictEqual(await oneTo5Async().find(async (x) => x > 3), 4) - assert.strictEqual(await oneTo5Async().some(async (x) => x > 6), false) - assert.strictEqual(await oneTo5Async().every(async (x) => x < 6), true) - assert.strictEqual(await oneTo5Async().find(async (x) => x > 6), undefined) + assert.strictEqual(await oneTo5Async().some(async (x) => x > 3), true); + assert.strictEqual(await oneTo5Async().every(async (x) => x > 3), false); + assert.strictEqual(await oneTo5Async().find(async (x) => x > 3), 4); + assert.strictEqual(await oneTo5Async().some(async (x) => x > 6), false); + assert.strictEqual(await oneTo5Async().every(async (x) => x < 6), true); + assert.strictEqual(await oneTo5Async().find(async (x) => x > 6), undefined); } { async function checkDestroyed(stream) { - await setTimeout() - assert.strictEqual(stream.destroyed, true) + await setTimeout(); + assert.strictEqual(stream.destroyed, true); } { // Some, find, and every short circuit - const someStream = oneTo5() - await someStream.some(common.mustCall((x) => x > 2, 3)) - await checkDestroyed(someStream) + const someStream = oneTo5(); + await someStream.some(common.mustCall((x) => x > 2, 3)); + await checkDestroyed(someStream); - const everyStream = oneTo5() - await everyStream.every(common.mustCall((x) => x < 3, 3)) - await checkDestroyed(everyStream) + const everyStream = oneTo5(); + await everyStream.every(common.mustCall((x) => x < 3, 3)); + await checkDestroyed(everyStream); - const findStream = oneTo5() - await findStream.find(common.mustCall((x) => x > 1, 2)) - await checkDestroyed(findStream) + const findStream = oneTo5(); + await findStream.find(common.mustCall((x) => x > 1, 2)); + await checkDestroyed(findStream); // When short circuit isn't possible the whole stream is iterated - await oneTo5().some(common.mustCall(() => false, 5)) - await oneTo5().every(common.mustCall(() => true, 5)) - await oneTo5().find(common.mustCall(() => false, 5)) + await oneTo5().some(common.mustCall(() => false, 5)); + await oneTo5().every(common.mustCall(() => true, 5)); + await oneTo5().find(common.mustCall(() => false, 5)); } { // Some, find, and every short circuit async stream/predicate - const someStream = oneTo5Async() - await someStream.some(common.mustCall(async (x) => x > 2, 3)) - await checkDestroyed(someStream) + const someStream = oneTo5Async(); + await someStream.some(common.mustCall(async (x) => x > 2, 3)); + await checkDestroyed(someStream); - const everyStream = oneTo5Async() - await everyStream.every(common.mustCall(async (x) => x < 3, 3)) - await checkDestroyed(everyStream) + const everyStream = oneTo5Async(); + await everyStream.every(common.mustCall(async (x) => x < 3, 3)); + await checkDestroyed(everyStream); - const findStream = oneTo5Async() - await findStream.find(common.mustCall(async (x) => x > 1, 2)) - await checkDestroyed(findStream) + const findStream = oneTo5Async(); + await findStream.find(common.mustCall(async (x) => x > 1, 2)); + await checkDestroyed(findStream); // When short circuit isn't possible the whole stream is iterated - await oneTo5Async().some(common.mustCall(async () => false, 5)) - await oneTo5Async().every(common.mustCall(async () => true, 5)) - await oneTo5Async().find(common.mustCall(async () => false, 5)) + await oneTo5Async().some(common.mustCall(async () => false, 5)); + await oneTo5Async().every(common.mustCall(async () => true, 5)); + await oneTo5Async().find(common.mustCall(async () => false, 5)); } } { // Concurrency doesn't affect which value is found. - const found = await Readable.from([1, 2]).find( - async (val) => { - if (val === 1) { - await setTimeout(100) - } - return true - }, - { concurrency: 2 } - ) - assert.strictEqual(found, 1) + const found = await Readable.from([1, 2]).find(async (val) => { + if (val === 1) { + await setTimeout(100); + } + return true; + }, { concurrency: 2 }); + assert.strictEqual(found, 1); } { // Support for AbortSignal for (const op of ['some', 'every', 'find']) { { - const ac = new AbortController() - assert - .rejects( - Readable.from([1, 2, 3])[op](() => new Promise(() => {}), { signal: ac.signal }), - { - name: 'AbortError' - }, - `${op} should abort correctly with sync abort` - ) - .then(common.mustCall()) - ac.abort() + const ac = new AbortController(); + assert.rejects(Readable.from([1, 2, 3])[op]( + () => new Promise(() => { }), + { signal: ac.signal } + ), { + name: 'AbortError', + }, `${op} should abort correctly with sync abort`).then(common.mustCall()); + ac.abort(); } { // Support for pre-aborted AbortSignal - assert - .rejects( - Readable.from([1, 2, 3])[op](() => new Promise(() => {}), { signal: AbortSignal.abort() }), - { - name: 'AbortError' - }, - `${op} should abort with pre-aborted abort controller` - ) - .then(common.mustCall()) + assert.rejects(Readable.from([1, 2, 3])[op]( + () => new Promise(() => { }), + { signal: AbortSignal.abort() } + ), { + name: 'AbortError', + }, `${op} should abort with pre-aborted abort controller`).then(common.mustCall()); } } } { // Error cases for (const op of ['some', 'every', 'find']) { - assert - .rejects( - async () => { - await Readable.from([1])[op](1) - }, - /ERR_INVALID_ARG_TYPE/, - `${op} should throw for invalid function` - ) - .then(common.mustCall()) - assert - .rejects( - async () => { - await Readable.from([1])[op]((x) => x, { - concurrency: 'Foo' - }) - }, - /ERR_OUT_OF_RANGE/, - `${op} should throw for invalid concurrency` - ) - .then(common.mustCall()) - assert - .rejects( - async () => { - await Readable.from([1])[op]((x) => x, 1) - }, - /ERR_INVALID_ARG_TYPE/, - `${op} should throw for invalid concurrency` - ) - .then(common.mustCall()) - assert - .rejects( - async () => { - await Readable.from([1])[op]((x) => x, { - signal: true - }) - }, - /ERR_INVALID_ARG_TYPE/, - `${op} should throw for invalid signal` - ) - .then(common.mustCall()) + assert.rejects(async () => { + await Readable.from([1])[op](1); + }, /ERR_INVALID_ARG_TYPE/, `${op} should throw for invalid function`).then(common.mustCall()); + assert.rejects(async () => { + await Readable.from([1])[op]((x) => x, { + concurrency: 'Foo' + }); + }, /ERR_OUT_OF_RANGE/, `${op} should throw for invalid concurrency`).then(common.mustCall()); + assert.rejects(async () => { + await Readable.from([1])[op]((x) => x, 1); + }, /ERR_INVALID_ARG_TYPE/, `${op} should throw for invalid concurrency`).then(common.mustCall()); + assert.rejects(async () => { + await Readable.from([1])[op]((x) => x, { + signal: true + }); + }, /ERR_INVALID_ARG_TYPE/, `${op} should throw for invalid signal`).then(common.mustCall()); } } { for (const op of ['some', 'every', 'find']) { - const stream = oneTo5() + const stream = oneTo5(); Object.defineProperty(stream, 'map', { - value: common.mustNotCall(() => {}) - }) + value: common.mustNotCall(), + }); // Check that map isn't getting called. - stream[op](() => {}) + stream[op](() => {}); } } -/* replacement start */ -process.on('beforeExit', (code) => { - if (code === 0) { - tap.pass('test succeeded') - } else { - tap.fail(`test failed - exited code ${code}`) - } -}) -/* replacement end */ + /* replacement start */ + process.on('beforeExit', (code) => { + if(code === 0) { + tap.pass('test succeeded'); + } else { + tap.fail(`test failed - exited code ${code}`); + } + }); + /* replacement end */