From 7fc6286c34133f82a46ad16bc3d51ed9c2700755 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Wed, 9 Oct 2019 19:52:32 +0200 Subject: [PATCH 1/2] Revert "stream: remove ambiguous code" This reverts commit ce62e963a13044817b43b7a7c6ef794eaa5ae905. --- lib/internal/streams/end-of-stream.js | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 585a52ae2f2382..949ab638148d24 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -59,9 +59,9 @@ function eos(stream, opts, callback) { }; } - const readable = opts.readable || + let readable = opts.readable || (opts.readable !== false && isReadable(stream)); - const writable = opts.writable || + let writable = opts.writable || (opts.writable !== false && isWritable(stream)); const onlegacyfinish = () => { @@ -69,13 +69,15 @@ function eos(stream, opts, callback) { }; const onfinish = () => { + writable = false; writableFinished = true; - if (!readable || readableEnded) callback.call(stream); + if (!readable) callback.call(stream); }; const onend = () => { + readable = false; readableEnded = true; - if (!writable || writableFinished) callback.call(stream); + if (!writable) callback.call(stream); }; const onclose = () => { From fe76c85d6641ab1eb5e4710caba432417d44a3ef Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 26 Sep 2019 11:23:45 +0200 Subject: [PATCH 2/2] Revert "stream: make finished call the callback if the stream is closed" This reverts commit b03845b9376aec590b89f753a4b7c1b47729c5f8. --- lib/internal/streams/async_iterator.js | 10 ++ lib/internal/streams/end-of-stream.js | 58 +++---- test/parallel/test-http-client-finished.js | 106 ------------ test/parallel/test-stream-finished.js | 188 +-------------------- 4 files changed, 32 insertions(+), 330 deletions(-) diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index 083befb89cc93f..07f2191e7134ce 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -112,6 +112,16 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({ return() { return new Promise((resolve, reject) => { const stream = this[kStream]; + + // TODO(ronag): Remove this check once finished() handles + // already ended and/or destroyed streams. + const ended = stream.destroyed || stream.readableEnded || + (stream._readableState && stream._readableState.endEmitted); + if (ended) { + resolve(createIterResult(undefined, true)); + return; + } + finished(stream, (err) => { if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { reject(err); diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 949ab638148d24..3f1c0f316cd3c6 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -13,18 +13,6 @@ function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function'; } -function isReadable(stream) { - return typeof stream.readable === 'boolean' || - typeof stream.readableEnded === 'boolean' || - !!stream._readableState; -} - -function isWritable(stream) { - return typeof stream.writable === 'boolean' || - typeof stream.writableEnded === 'boolean' || - !!stream._writableState; -} - function eos(stream, opts, callback) { if (arguments.length === 2) { callback = opts; @@ -40,51 +28,43 @@ function eos(stream, opts, callback) { callback = once(callback); - const onerror = (err) => { - callback.call(stream, err); - }; - - let writableFinished = stream.writableFinished || - (stream._writableState && stream._writableState.finished); - let readableEnded = stream.readableEnded || - (stream._readableState && stream._readableState.endEmitted); - - if (writableFinished || readableEnded || stream.destroyed || - stream.aborted) { - if (opts.error !== false) stream.on('error', onerror); - // A destroy(err) call emits error in nextTick. - process.nextTick(callback.bind(stream)); - return () => { - stream.removeListener('error', onerror); - }; - } - - let readable = opts.readable || - (opts.readable !== false && isReadable(stream)); - let writable = opts.writable || - (opts.writable !== false && isWritable(stream)); + let readable = opts.readable || (opts.readable !== false && stream.readable); + let writable = opts.writable || (opts.writable !== false && stream.writable); const onlegacyfinish = () => { if (!stream.writable) onfinish(); }; + var writableEnded = stream._writableState && stream._writableState.finished; const onfinish = () => { writable = false; - writableFinished = true; + writableEnded = true; if (!readable) callback.call(stream); }; + var readableEnded = stream.readableEnded || + (stream._readableState && stream._readableState.endEmitted); const onend = () => { readable = false; readableEnded = true; if (!writable) callback.call(stream); }; + const onerror = (err) => { + callback.call(stream, err); + }; + const onclose = () => { + let err; if (readable && !readableEnded) { - callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); - } else if (writable && !writableFinished) { - callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); + if (!stream._readableState || !stream._readableState.ended) + err = new ERR_STREAM_PREMATURE_CLOSE(); + return callback.call(stream, err); + } + if (writable && !writableEnded) { + if (!stream._writableState || !stream._writableState.ended) + err = new ERR_STREAM_PREMATURE_CLOSE(); + return callback.call(stream, err); } }; diff --git a/test/parallel/test-http-client-finished.js b/test/parallel/test-http-client-finished.js index 337f7b596d7442..2d7e5b95b3ca33 100644 --- a/test/parallel/test-http-client-finished.js +++ b/test/parallel/test-http-client-finished.js @@ -25,109 +25,3 @@ const { finished } = require('stream'); .end(); })); } - -{ - // Test abort before finished. - - const server = http.createServer(function(req, res) { - }); - - server.listen(0, common.mustCall(function() { - const req = http.request({ - port: this.address().port - }, common.mustNotCall()); - req.abort(); - finished(req, common.mustCall(() => { - server.close(); - })); - })); -} - -{ - // Test abort after request. - - const server = http.createServer(function(req, res) { - }); - - server.listen(0, common.mustCall(function() { - const req = http.request({ - port: this.address().port - }).end(); - finished(req, (err) => { - common.expectsError({ - type: Error, - code: 'ERR_STREAM_PREMATURE_CLOSE' - })(err); - finished(req, common.mustCall(() => { - server.close(); - })); - }); - req.abort(); - })); -} - -{ - // Test abort before end. - - const server = http.createServer(function(req, res) { - res.write('test'); - }); - - server.listen(0, common.mustCall(function() { - const req = http.request({ - port: this.address().port - }).on('response', common.mustCall((res) => { - req.abort(); - finished(res, common.mustCall(() => { - finished(res, common.mustCall(() => { - server.close(); - })); - })); - })).end(); - })); -} - -{ - // Test destroy before end. - - const server = http.createServer(function(req, res) { - res.write('test'); - }); - - server.listen(0, common.mustCall(function() { - http.request({ - port: this.address().port - }).on('response', common.mustCall((res) => { - // TODO(ronag): Bug? Won't emit 'close' unless read. - res.on('data', () => {}); - res.destroy(); - finished(res, common.mustCall(() => { - finished(res, common.mustCall(() => { - server.close(); - })); - })); - })).end(); - })); -} - -{ - // Test finish after end. - - const server = http.createServer(function(req, res) { - res.end('asd'); - }); - - server.listen(0, common.mustCall(function() { - http.request({ - port: this.address().port - }).on('response', common.mustCall((res) => { - // TODO(ronag): Bug? Won't emit 'close' unless read. - res.on('data', () => {}); - finished(res, common.mustCall(() => { - finished(res, common.mustCall(() => { - server.close(); - })); - })); - })).end(); - })); -} diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index 12a40b35b97fd7..d6361ea303635d 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -97,18 +97,6 @@ const { promisify } = require('util'); })); } -{ - const rs = new Readable(); - - finished(rs, common.mustCall((err) => { - assert(err, 'premature close error'); - })); - - rs.push(null); - rs.emit('close'); - rs.resume(); -} - { const rs = new Readable(); @@ -117,9 +105,7 @@ const { promisify } = require('util'); })); rs.push(null); - rs.on('end', common.mustCall(() => { - rs.emit('close'); // Should not trigger an error - })); + rs.emit('close'); // Should not trigger an error rs.resume(); } @@ -169,9 +155,8 @@ const { promisify } = require('util'); rs.resume(); } +// Test that calling returned function removes listeners { - // Nothing happens if disposed. - const ws = new Writable({ write(data, env, cb) { cb(); @@ -183,8 +168,6 @@ const { promisify } = require('util'); } { - // Nothing happens if disposed. - const rs = new Readable(); const removeListeners = finished(rs, common.mustNotCall()); removeListeners(); @@ -195,174 +178,9 @@ const { promisify } = require('util'); } { - // Completed if readable-like is ended before. - const streamLike = new EE(); streamLike.readableEnded = true; streamLike.readable = true; - finished(streamLike, common.mustCall()); -} - -{ - // Completed if readable-like is never ended. - - const streamLike = new EE(); - streamLike.readableEnded = false; - streamLike.readable = true; - finished(streamLike, common.expectsError({ - code: 'ERR_STREAM_PREMATURE_CLOSE' - })); + finished(streamLike, common.mustCall); streamLike.emit('close'); } - -{ - // Completed if writable-like is destroyed before. - - const streamLike = new EE(); - streamLike.destroyed = true; - streamLike.writable = true; - finished(streamLike, common.mustCall()); -} - -{ - // Completed if readable-like is aborted before. - - const streamLike = new EE(); - streamLike.destroyed = true; - streamLike.readable = true; - finished(streamLike, common.mustCall()); -} - -{ - // Completed if writable-like is aborted before. - - const streamLike = new EE(); - streamLike.aborted = true; - streamLike.writable = true; - finished(streamLike, common.mustCall()); -} - -{ - // Completed if readable-like is aborted before. - - const streamLike = new EE(); - streamLike.aborted = true; - streamLike.readable = true; - finished(streamLike, common.mustCall()); -} - -{ - // Completed if streamlike is finished before. - - const streamLike = new EE(); - streamLike.writableFinished = true; - streamLike.writable = true; - finished(streamLike, common.mustCall()); -} - -{ - // Premature close if stream is not finished. - - const streamLike = new EE(); - streamLike.writableFinished = false; - streamLike.writable = true; - finished(streamLike, common.expectsError({ - code: 'ERR_STREAM_PREMATURE_CLOSE' - })); - streamLike.emit('close'); -} - -{ - // Premature close if stream never emitted 'finish' - // even if writableFinished says something else. - - const streamLike = new EE(); - streamLike.writable = true; - finished(streamLike, common.expectsError({ - code: 'ERR_STREAM_PREMATURE_CLOSE' - })); - streamLike.writableFinished = true; - streamLike.emit('close'); -} - - -{ - // Premature close if stream never emitted 'end' - // even if readableEnded says something else. - - const streamLike = new EE(); - streamLike.readable = true; - finished(streamLike, common.expectsError({ - code: 'ERR_STREAM_PREMATURE_CLOSE' - })); - streamLike.readableEnded = true; - streamLike.emit('close'); -} - -{ - // Completes if already finished. - - const w = new Writable(); - finished(w, common.mustCall(() => { - finished(w, common.mustCall()); - })); - w.destroy(); -} - -{ - // Completes if already ended. - - const r = new Readable(); - finished(r, common.mustCall(() => { - finished(r, common.mustCall()); - })); - r.destroy(); -} - -{ - // Test is readable check through readable - const streamLike = new EE(); - streamLike.readable = false; - finished(streamLike, common.mustCall()); - streamLike.emit('end'); -} - -{ - // Test is readable check through readableEnded - const streamLike = new EE(); - streamLike.readableEnded = true; - finished(streamLike, common.mustCall()); - streamLike.emit('end'); -} - -{ - // Test is readable check through _readableState - const streamLike = new EE(); - streamLike._readableState = {}; - finished(streamLike, common.mustCall()); - streamLike.emit('end'); -} - -{ - // Test is writable check through writable - const streamLike = new EE(); - streamLike.writable = false; - finished(streamLike, common.mustCall()); - streamLike.emit('finish'); -} - -{ - // Test is writable check through writableEnded - const streamLike = new EE(); - streamLike.writableEnded = true; - finished(streamLike, common.mustCall()); - streamLike.emit('finish'); -} - -{ - // Test is writable check through _writableState - const streamLike = new EE(); - streamLike._writableState = {}; - finished(streamLike, common.mustCall()); - streamLike.emit('finish'); -}