diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 860ea478692230..3c99296d73303b 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -6,8 +6,7 @@ const { ArrayIsArray, SymbolAsyncIterator, - SymbolIterator, - Symbol + SymbolIterator } = primordials; let eos; @@ -22,8 +21,6 @@ const { ERR_STREAM_DESTROYED } = require('internal/errors').codes; -const kSkipPrematureClose = Symbol('kSkipPrematureClose'); - let EE; let PassThrough; let createReadableStreamAsyncIterator; @@ -145,11 +142,15 @@ function pipeline(...streams) { let value; const destroys = []; - function finish(err, final) { - if (!error && err) { + let finishCount = streams.length; + + function finish(err) { + if ((!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE') && err) { error = err; } + const final = --finishCount === 0; + if (error || final) { for (const destroy of destroys) { destroy(error); @@ -161,16 +162,9 @@ function pipeline(...streams) { } } - function wrap(stream, reading, writing, final) { + function wrap(stream, reading, writing) { const fn = destroyer(stream, reading, writing, (err) => { - - // We need to skip the premature close error here to avoid swallowing - // any errors in the downstream async iterator. - if (err && err.code === 'ERR_STREAM_PREMATURE_CLOSE' && - stream[kSkipPrematureClose]) { - err = undefined; - } - finish(err, final); + finish(err); }); destroys.push(fn); } @@ -182,9 +176,8 @@ function pipeline(...streams) { const writing = i > 0; if (isStream(stream)) { - wrap(stream, reading, writing, !reading); + wrap(stream, reading, writing); } - if (i === 0) { if (typeof stream === 'function') { ret = stream(); @@ -192,16 +185,24 @@ function pipeline(...streams) { throw new ERR_INVALID_RETURN_VALUE( 'Iterable, AsyncIterable or Stream', 'source', ret); } - } else if (isIterable(stream) || isReadable(stream)) { + finishCount--; + } else if (isReadable(stream)) { + ret = stream; + // The isIterable check must come after + // the isReadable because a Readable is iterable. + } else if (isIterable(stream)) { ret = stream; + finishCount--; } else { throw new ERR_INVALID_ARG_TYPE( 'source', ['Stream', 'Iterable', 'AsyncIterable', 'Function'], stream); } } else if (typeof stream === 'function') { + const wasStream = isStream(ret); + ret = makeAsyncIterable(ret); - ret[kSkipPrematureClose] = true; + // ret[kSkipPrematureClose] = true; ret = stream(ret); if (reading) { @@ -209,6 +210,12 @@ function pipeline(...streams) { throw new ERR_INVALID_RETURN_VALUE( 'AsyncIterable', `transform[${i - 1}]`, ret); } + + // We are not expecting a call to finish if we are connecting + // multiple async iterators + if (!wasStream) { + finishCount--; + } } else { if (!PassThrough) { PassThrough = require('_stream_passthrough'); @@ -231,7 +238,7 @@ function pipeline(...streams) { } ret = pt; - wrap(ret, false, true, true); + wrap(ret, false, true); } } else if (isStream(stream)) { if (isReadable(ret)) {