Skip to content

Commit

Permalink
stream: track the number of finish() call in pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
mcollina committed Mar 3, 2020
1 parent 89b2368 commit 77019be
Showing 1 changed file with 27 additions and 20 deletions.
47 changes: 27 additions & 20 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
const {
ArrayIsArray,
SymbolAsyncIterator,
SymbolIterator,
Symbol
SymbolIterator
} = primordials;

let eos;
Expand All @@ -22,8 +21,6 @@ const {
ERR_STREAM_DESTROYED
} = require('internal/errors').codes;

const kSkipPrematureClose = Symbol('kSkipPrematureClose');

let EE;
let PassThrough;
let createReadableStreamAsyncIterator;
Expand Down Expand Up @@ -145,11 +142,15 @@ function pipeline(...streams) {
let value;
const destroys = [];

function finish(err, final) {
if (!error && err) {
let finishCount = streams.length;

function finish(err) {
if ((!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE') && err) {
error = err;
}

const final = --finishCount === 0;

if (error || final) {
for (const destroy of destroys) {
destroy(error);
Expand All @@ -161,16 +162,9 @@ function pipeline(...streams) {
}
}

function wrap(stream, reading, writing, final) {
function wrap(stream, reading, writing) {
const fn = destroyer(stream, reading, writing, (err) => {

// We need to skip the premature close error here to avoid swallowing
// any errors in the downstream async iterator.
if (err && err.code === 'ERR_STREAM_PREMATURE_CLOSE' &&
stream[kSkipPrematureClose]) {
err = undefined;
}
finish(err, final);
finish(err);
});
destroys.push(fn);
}
Expand All @@ -182,33 +176,46 @@ function pipeline(...streams) {
const writing = i > 0;

if (isStream(stream)) {
wrap(stream, reading, writing, !reading);
wrap(stream, reading, writing);
}

if (i === 0) {
if (typeof stream === 'function') {
ret = stream();
if (!isIterable(ret)) {
throw new ERR_INVALID_RETURN_VALUE(
'Iterable, AsyncIterable or Stream', 'source', ret);
}
} else if (isIterable(stream) || isReadable(stream)) {
finishCount--;
} else if (isReadable(stream)) {
ret = stream;
// The isIterable check must come after
// the isReadable because a Readable is iterable.
} else if (isIterable(stream)) {
ret = stream;
finishCount--;
} else {
throw new ERR_INVALID_ARG_TYPE(
'source', ['Stream', 'Iterable', 'AsyncIterable', 'Function'],
stream);
}
} else if (typeof stream === 'function') {
const wasStream = isStream(ret);

ret = makeAsyncIterable(ret);
ret[kSkipPrematureClose] = true;
// ret[kSkipPrematureClose] = true;
ret = stream(ret);

if (reading) {
if (!isIterable(ret, true)) {
throw new ERR_INVALID_RETURN_VALUE(
'AsyncIterable', `transform[${i - 1}]`, ret);
}

// We are not expecting a call to finish if we are connecting
// multiple async iterators
if (!wasStream) {
finishCount--;
}
} else {
if (!PassThrough) {
PassThrough = require('_stream_passthrough');
Expand All @@ -231,7 +238,7 @@ function pipeline(...streams) {
}

ret = pt;
wrap(ret, false, true, true);
wrap(ret, false, true);
}
} else if (isStream(stream)) {
if (isReadable(ret)) {
Expand Down

0 comments on commit 77019be

Please sign in to comment.