From f07a2d1b95b171012ba699cc423c5d7101a452ab Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 21 Feb 2020 12:50:34 +0100 Subject: [PATCH 1/4] stream: fix broken pipeline error propagation If the destination was an async function any error thrown from that function would be swallowed. --- lib/internal/streams/pipeline.js | 17 +++++++------ .../parallel/test-stream-pipeline-uncaught.js | 25 +++++++++++++++++++ test/parallel/test-stream-pipeline.js | 6 +---- 3 files changed, 35 insertions(+), 13 deletions(-) create mode 100644 test/parallel/test-stream-pipeline-uncaught.js diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 1ead5cdf9f3421..c7da6eb8794651 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -132,9 +132,10 @@ function pipeline(...streams) { } let error; + let value; const destroys = []; - function finish(err, val, final) { + function finish(err, final) { if (!error && err) { error = err; } @@ -146,13 +147,13 @@ function pipeline(...streams) { } if (final) { - callback(error, val); + callback(error, value); } } function wrap(stream, reading, writing, final) { destroys.push(destroyer(stream, reading, writing, (err) => { - finish(err, null, final); + finish(err, final); })); } @@ -197,12 +198,12 @@ function pipeline(...streams) { const pt = new PassThrough(); if (isPromise(ret)) { ret + .catch((err) => { + pt.destroy(err); + }) .then((val) => { + value = val; pt.end(val); - finish(null, val, true); - }) - .catch((err) => { - finish(err, null, true); }); } else if (isIterable(ret, true)) { pump(ret, pt, finish); @@ -212,7 +213,7 @@ function pipeline(...streams) { } ret = pt; - wrap(ret, true, false, true); + wrap(ret, false, true, true); } } else if (isStream(stream)) { if (isReadable(ret)) { diff --git a/test/parallel/test-stream-pipeline-uncaught.js b/test/parallel/test-stream-pipeline-uncaught.js new file mode 100644 index 00000000000000..5ce08475e1e6b3 --- /dev/null +++ b/test/parallel/test-stream-pipeline-uncaught.js @@ -0,0 +1,25 @@ +'use strict'; + +const common = require('../common'); +const { + pipeline, + PassThrough +} = require('stream'); +const assert = require('assert'); + +process.on('uncaughtException', common.mustCall((err) => { + assert.strictEqual(err.message, 'asd'); +})); + +// Ensure that pipeline that ends with Promise +// still propagates error to uncaughtException. +const s = new PassThrough(); +s.end('asd'); +pipeline(s, async function(source) { + for await (const chunk of source) { + chunk; + } +}, common.mustCall((err) => { + assert.ok(!err); + throw new Error('asd'); +})); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 19fc246e2bf3cd..b3d4064c6a9783 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -613,11 +613,9 @@ const { promisify } = require('util'); yield 'hello'; yield 'world'; }, async function*(source) { - const ret = []; for await (const chunk of source) { - ret.push(chunk.toUpperCase()); + yield chunk.toUpperCase(); } - yield ret; }, async function(source) { let ret = ''; for await (const chunk of source) { @@ -754,7 +752,6 @@ const { promisify } = require('util'); }, common.mustCall((err) => { assert.strictEqual(err, undefined); assert.strictEqual(ret, 'asd'); - assert.strictEqual(s.destroyed, true); })); } @@ -775,7 +772,6 @@ const { promisify } = require('util'); }, common.mustCall((err) => { assert.strictEqual(err, undefined); assert.strictEqual(ret, 'asd'); - assert.strictEqual(s.destroyed, true); })); } From 86f40d3942473a41c1e4e5409b19d3f6276d0d8e Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 22 Feb 2020 21:55:18 +0100 Subject: [PATCH 2/4] fixup: don't call end if destroyed --- lib/internal/streams/pipeline.js | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index c7da6eb8794651..d855c2a08af147 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -198,12 +198,11 @@ function pipeline(...streams) { const pt = new PassThrough(); if (isPromise(ret)) { ret - .catch((err) => { - pt.destroy(err); - }) .then((val) => { value = val; pt.end(val); + }, (err) => { + pt.destroy(err); }); } else if (isIterable(ret, true)) { pump(ret, pt, finish); From f4a12450c77d81d1fdcfebc3b93e3c22afc20351 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 22 Feb 2020 22:23:15 +0100 Subject: [PATCH 3/4] Update test/parallel/test-stream-pipeline-uncaught.js Co-Authored-By: Denys Otrishko --- test/parallel/test-stream-pipeline-uncaught.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/parallel/test-stream-pipeline-uncaught.js b/test/parallel/test-stream-pipeline-uncaught.js index 5ce08475e1e6b3..34b51195e27138 100644 --- a/test/parallel/test-stream-pipeline-uncaught.js +++ b/test/parallel/test-stream-pipeline-uncaught.js @@ -20,6 +20,6 @@ pipeline(s, async function(source) { chunk; } }, common.mustCall((err) => { - assert.ok(!err); + assert.ifError(err); throw new Error('asd'); })); From 40c0b13c90ad5d394e137851a6ecd0f49becd376 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 22 Feb 2020 22:23:52 +0100 Subject: [PATCH 4/4] fixup: nits --- test/parallel/test-stream-pipeline-uncaught.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/parallel/test-stream-pipeline-uncaught.js b/test/parallel/test-stream-pipeline-uncaught.js index 34b51195e27138..90d141ec44fef1 100644 --- a/test/parallel/test-stream-pipeline-uncaught.js +++ b/test/parallel/test-stream-pipeline-uncaught.js @@ -8,18 +8,18 @@ const { const assert = require('assert'); process.on('uncaughtException', common.mustCall((err) => { - assert.strictEqual(err.message, 'asd'); + assert.strictEqual(err.message, 'error'); })); // Ensure that pipeline that ends with Promise // still propagates error to uncaughtException. const s = new PassThrough(); -s.end('asd'); +s.end('data'); pipeline(s, async function(source) { for await (const chunk of source) { chunk; } }, common.mustCall((err) => { assert.ifError(err); - throw new Error('asd'); + throw new Error('error'); }));