Skip to content

Commit

Permalink
stream: pipeline with end option
Browse files Browse the repository at this point in the history
Currently pipeline cannot fully replace pipe due
to the missing end option. This PR adds the end
option to the promisified pipeline method.

PR-URL: #40886
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
ronag authored and targos committed Nov 26, 2021
1 parent 534409d commit 650c9bd
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 17 deletions.
40 changes: 25 additions & 15 deletions lib/internal/streams/pipeline.js
Expand Up @@ -109,7 +109,7 @@ async function* fromReadable(val) {
yield* Readable.prototype[SymbolAsyncIterator].call(val);
}

async function pump(iterable, writable, finish) {
async function pump(iterable, writable, finish, opts) {
let error;
let onresolve = null;

Expand Down Expand Up @@ -153,7 +153,9 @@ async function pump(iterable, writable, finish) {
}
}

writable.end();
if (opts?.end !== false) {
writable.end();
}

await wait();

Expand Down Expand Up @@ -227,17 +229,22 @@ function pipelineImpl(streams, callback, opts) {
const stream = streams[i];
const reading = i < streams.length - 1;
const writing = i > 0;
const end = reading || opts?.end !== false;

if (isNodeStream(stream)) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, (err) => {
if (!err && !reading && isReadableFinished(stream, false)) {
stream.read(0);
destroyer(stream, true, writing, finish);
} else {
finish(err);
}
}));
if (end) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, (err) => {
if (!err && !reading && isReadableFinished(stream, false)) {
stream.read(0);
destroyer(stream, true, writing, finish);
} else {
finish(err);
}
}));
} else {
stream.on('error', finish);
}
}

if (i === 0) {
Expand Down Expand Up @@ -282,14 +289,17 @@ function pipelineImpl(streams, callback, opts) {
then.call(ret,
(val) => {
value = val;
pt.end(val);
pt.write(val);
if (end) {
pt.end();
}
}, (err) => {
pt.destroy(err);
},
);
} else if (isIterable(ret, true)) {
finishCount++;
pump(ret, pt, finish);
pump(ret, pt, finish, { end });
} else {
throw new ERR_INVALID_RETURN_VALUE(
'AsyncIterable or Promise', 'destination', ret);
Expand All @@ -302,7 +312,7 @@ function pipelineImpl(streams, callback, opts) {
}
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
ret.pipe(stream);
ret.pipe(stream, { end });

// Compat. Before node v10.12.0 stdio used to throw an error so
// pipe() did/does not end() stdio destinations.
Expand All @@ -314,7 +324,7 @@ function pipelineImpl(streams, callback, opts) {
ret = makeAsyncIterable(ret);

finishCount++;
pump(ret, stream, finish);
pump(ret, stream, finish, { end });
}
ret = stream;
} else {
Expand Down
4 changes: 3 additions & 1 deletion lib/stream/promises.js
Expand Up @@ -16,11 +16,13 @@ const eos = require('internal/streams/end-of-stream');
function pipeline(...streams) {
return new Promise((resolve, reject) => {
let signal;
let end;
const lastArg = streams[streams.length - 1];
if (lastArg && typeof lastArg === 'object' &&
!isNodeStream(lastArg) && !isIterable(lastArg)) {
const options = ArrayPrototypePop(streams);
signal = options.signal;
end = options.end;
}

pl(streams, (err, value) => {
Expand All @@ -29,7 +31,7 @@ function pipeline(...streams) {
} else {
resolve(value);
}
}, { signal });
}, { signal, end });
});
}

Expand Down
23 changes: 22 additions & 1 deletion test/parallel/test-stream-pipeline.js
Expand Up @@ -1465,5 +1465,26 @@ const tsp = require('timers/promises');
assert.strictEqual(duplex.destroyed, true);
}

run();
run().then(common.mustCall());
}

{
const pipelinePromise = promisify(pipeline);

async function run() {
const read = new Readable({
read() {}
});

const duplex = new PassThrough();

read.push(null);

await pipelinePromise(read, duplex, { end: false });

assert.strictEqual(duplex.destroyed, false);
assert.strictEqual(duplex.writableEnded, false);
}

run().then(common.mustCall());
}

0 comments on commit 650c9bd

Please sign in to comment.