From 3fadeba690060948973ee44b42ce8a8e486a9ad7 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 21 Apr 2020 13:29:43 +0200 Subject: [PATCH 1/4] stream: pipeline should only destroy un-finished streams. This PR logically reverts https://github.com/nodejs/node/pull/31940 which has caused lots of unnecessary breakage in the ecosystem. This PR also aligns better with the actual documented behavior: `stream.pipeline()` will call `stream.destroy(err)` on all streams except: * `Readable` streams which have emitted `'end'` or `'close'`. * `Writable` streams which have emitted `'finish'` or `'close'`. The behavior introduced in https://github.com/nodejs/node/pull/31940 was much more aggressive in terms of destroying streams. This was good for avoiding potential resources leaks however breaks some common assumputions in legacy streams. Furthermore, it makes the code simpler and removes some hacks. Fixes: https://github.com/nodejs/node/issues/32954 Fixes: https://github.com/nodejs/node/issues/32955 --- lib/internal/streams/pipeline.js | 56 +++++++++------------------ test/parallel/test-stream-pipeline.js | 2 +- 2 files changed, 19 insertions(+), 39 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index cdd5bcb791f451..e92fcb364953a3 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -25,43 +25,18 @@ let EE; let PassThrough; let createReadableStreamAsyncIterator; -function isIncoming(stream) { - return ( - stream.socket && - typeof stream.complete === 'boolean' && - ArrayIsArray(stream.rawTrailers) && - ArrayIsArray(stream.rawHeaders) - ); -} - -function isOutgoing(stream) { - return ( - stream.socket && - typeof stream.setHeader === 'function' - ); -} - -function destroyer(stream, reading, writing, final, callback) { - const _destroy = once((err) => { - if (!err && (isIncoming(stream) || isOutgoing(stream))) { - // http/1 request objects have a coupling to their response and should - // not be prematurely destroyed. Assume they will handle their own - // lifecycle. - return callback(); - } - - if (!err && reading && !writing && stream.writable) { - return callback(); - } +function destroyer(stream, reading, writing, callback) { + callback = once(callback); - if (err || !final || !stream.readable) { - destroyImpl.destroyer(stream, err); - } - callback(err); + let finished = false; + stream.on('close', () => { + finished = true; }); if (eos === undefined) eos = require('internal/streams/end-of-stream'); eos(stream, { readable: reading, writable: writing }, (err) => { + finished = !err; + const rState = stream._readableState; if ( err && @@ -78,14 +53,19 @@ function destroyer(stream, reading, writing, final, callback) { // eos will only fail with premature close on the reading side for // duplex streams. stream - .once('end', _destroy) - .once('error', _destroy); + .once('end', callback) + .once('error', callback); } else { - _destroy(err); + callback(err); } }); - return (err) => _destroy(err || new ERR_STREAM_DESTROYED('pipe')); + return once((err) => { + if (!finished) { + destroyImpl.destroyer(stream, err); + } + callback(err || new ERR_STREAM_DESTROYED('pipe')); + }); } function popCallback(streams) { @@ -204,7 +184,7 @@ function pipeline(...streams) { if (isStream(stream)) { finishCount++; - destroys.push(destroyer(stream, reading, writing, !reading, finish)); + destroys.push(destroyer(stream, reading, writing, finish)); } if (i === 0) { @@ -262,7 +242,7 @@ function pipeline(...streams) { ret = pt; finishCount++; - destroys.push(destroyer(ret, false, true, true, finish)); + destroys.push(destroyer(ret, false, true, finish)); } } else if (isStream(stream)) { if (isReadable(ret)) { diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index b273fddfa3b613..53350f23a278d5 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -916,7 +916,7 @@ const { promisify } = require('util'); const src = new PassThrough({ autoDestroy: false }); const dst = new PassThrough({ autoDestroy: false }); pipeline(src, dst, common.mustCall(() => { - assert.strictEqual(src.destroyed, true); + assert.strictEqual(src.destroyed, false); assert.strictEqual(dst.destroyed, false); })); src.end(); From 04b1e3d2e96abb3f305c167e98a8bce08420156b Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 21 Apr 2020 13:43:19 +0200 Subject: [PATCH 2/4] fixup: test --- test/parallel/test-stream-pipeline.js | 93 +++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 53350f23a278d5..453ac30b3f4d64 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -13,6 +13,7 @@ const { const assert = require('assert'); const http = require('http'); const { promisify } = require('util'); +const net = require('net'); { let finished = false; @@ -1118,3 +1119,95 @@ const { promisify } = require('util'); assert.strictEqual(closed, true); })); } + +{ + const server = net.createServer(common.mustCall((socket) => { + // echo server + pipeline(socket, socket, common.mustCall()); + // 13 force destroys the socket before it has a chance to emit finish + socket.on('finish', common.mustCall(() => { + server.close(); + })); + })).listen(0, common.mustCall(() => { + const socket = net.connect(server.address().port); + socket.end(); + })); +} + +{ + const d = new Duplex({ + autoDestroy: false, + write: common.mustCall((data, enc, cb) => { + d.push(data); + cb(); + }), + read: common.mustCall(() => { + d.push(null); + }), + final: common.mustCall((cb) => { + setTimeout(() => { + assert.strictEqual(d.destroyed, false); + cb(); + }, 1000); + }), + destroy: common.mustNotCall() + }); + + const sink = new Writable({ + write: common.mustCall((data, enc, cb) => { + cb(); + }) + }); + + pipeline(d, sink, common.mustCall()); + + d.write('test'); + d.end(); +} + +{ + const server = net.createServer(common.mustCall((socket) => { + // echo server + pipeline(socket, socket, common.mustCall()); + socket.on('finish', common.mustCall(() => { + server.close(); + })); + })).listen(0, common.mustCall(() => { + const socket = net.connect(server.address().port); + socket.end(); + })); +} + +{ + const d = new Duplex({ + autoDestroy: false, + write: common.mustCall((data, enc, cb) => { + d.push(data); + cb(); + }), + read: common.mustCall(() => { + d.push(null); + }), + final: common.mustCall((cb) => { + setTimeout(() => { + assert.strictEqual(d.destroyed, false); + cb(); + }, 1000); + }), + // `destroy()` won't be invoked by pipeline since + // the writable side has not completed when + // the pipeline has completed. + destroy: common.mustNotCall() + }); + + const sink = new Writable({ + write: common.mustCall((data, enc, cb) => { + cb(); + }) + }); + + pipeline(d, sink, common.mustCall()); + + d.write('test'); + d.end(); +} From 6b92c9960cd90763efb5a8db7d79e5a78cc62479 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 21 Apr 2020 13:46:30 +0200 Subject: [PATCH 3/4] fixup: simplify --- lib/internal/streams/pipeline.js | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index e92fcb364953a3..4fd0b3bb3232f9 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -61,9 +61,8 @@ function destroyer(stream, reading, writing, callback) { }); return once((err) => { - if (!finished) { - destroyImpl.destroyer(stream, err); - } + if (finished) return; + destroyImpl.destroyer(stream, err); callback(err || new ERR_STREAM_DESTROYED('pipe')); }); } From 76906a5cb27b202feb7c139882dbd297a9275099 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 21 Apr 2020 14:05:17 +0200 Subject: [PATCH 4/4] fixup: unnecessary once --- lib/internal/streams/pipeline.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 4fd0b3bb3232f9..cf9d7868916a9e 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -60,11 +60,12 @@ function destroyer(stream, reading, writing, callback) { } }); - return once((err) => { + return (err) => { if (finished) return; + finished = true; destroyImpl.destroyer(stream, err); callback(err || new ERR_STREAM_DESTROYED('pipe')); - }); + }; } function popCallback(streams) {