diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 1ead5cdf9f3421..d855c2a08af147 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -132,9 +132,10 @@ function pipeline(...streams) { } let error; + let value; const destroys = []; - function finish(err, val, final) { + function finish(err, final) { if (!error && err) { error = err; } @@ -146,13 +147,13 @@ function pipeline(...streams) { } if (final) { - callback(error, val); + callback(error, value); } } function wrap(stream, reading, writing, final) { destroys.push(destroyer(stream, reading, writing, (err) => { - finish(err, null, final); + finish(err, final); })); } @@ -198,11 +199,10 @@ function pipeline(...streams) { if (isPromise(ret)) { ret .then((val) => { + value = val; pt.end(val); - finish(null, val, true); - }) - .catch((err) => { - finish(err, null, true); + }, (err) => { + pt.destroy(err); }); } else if (isIterable(ret, true)) { pump(ret, pt, finish); @@ -212,7 +212,7 @@ function pipeline(...streams) { } ret = pt; - wrap(ret, true, false, true); + wrap(ret, false, true, true); } } else if (isStream(stream)) { if (isReadable(ret)) { diff --git a/test/parallel/test-stream-pipeline-uncaught.js b/test/parallel/test-stream-pipeline-uncaught.js new file mode 100644 index 00000000000000..90d141ec44fef1 --- /dev/null +++ b/test/parallel/test-stream-pipeline-uncaught.js @@ -0,0 +1,25 @@ +'use strict'; + +const common = require('../common'); +const { + pipeline, + PassThrough +} = require('stream'); +const assert = require('assert'); + +process.on('uncaughtException', common.mustCall((err) => { + assert.strictEqual(err.message, 'error'); +})); + +// Ensure that pipeline that ends with Promise +// still propagates error to uncaughtException. +const s = new PassThrough(); +s.end('data'); +pipeline(s, async function(source) { + for await (const chunk of source) { + chunk; + } +}, common.mustCall((err) => { + assert.ifError(err); + throw new Error('error'); +})); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 19fc246e2bf3cd..b3d4064c6a9783 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -613,11 +613,9 @@ const { promisify } = require('util'); yield 'hello'; yield 'world'; }, async function*(source) { - const ret = []; for await (const chunk of source) { - ret.push(chunk.toUpperCase()); + yield chunk.toUpperCase(); } - yield ret; }, async function(source) { let ret = ''; for await (const chunk of source) { @@ -754,7 +752,6 @@ const { promisify } = require('util'); }, common.mustCall((err) => { assert.strictEqual(err, undefined); assert.strictEqual(ret, 'asd'); - assert.strictEqual(s.destroyed, true); })); } @@ -775,7 +772,6 @@ const { promisify } = require('util'); }, common.mustCall((err) => { assert.strictEqual(err, undefined); assert.strictEqual(ret, 'asd'); - assert.strictEqual(s.destroyed, true); })); }