diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 8073e174cc586f..31a8a11e4a0673 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -99,6 +99,9 @@ function ReadableState(options, stream, isDuplex) { this.endEmitted = false; this.reading = false; + // Flipped if an 'error' is emitted. + this.errorEmitted = false; + // a flag to be able to tell if the event 'readable'/'data' is emitted // immediately, or on a later tick. We set this to true at first, because // any actions that shouldn't happen until "later" should generally also @@ -1069,20 +1072,23 @@ function fromList(n, state) { function endReadable(stream) { var state = stream._readableState; - debug('endReadable', state.endEmitted); - if (!state.endEmitted) { + debug('endReadable', state.endEmitted, state.errorEmitted); + if (!state.endEmitted && !state.errorEmitted) { state.ended = true; process.nextTick(endReadableNT, state, stream); } } function endReadableNT(state, stream) { - debug('endReadableNT', state.endEmitted, state.length); + debug('endReadableNT', state.endEmitted, state.length, state.errorEmitted); // Check that we didn't get one last unshift. if (!state.endEmitted && state.length === 0) { - state.endEmitted = true; stream.readable = false; - stream.emit('end'); + + if (!state.errorEmitted) { + state.endEmitted = true; + stream.emit('end'); + } } } diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index d21daf0541d339..0891f85526f132 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -424,12 +424,22 @@ function onwriteError(stream, state, sync, er, cb) { // this can emit finish, and it will always happen // after error process.nextTick(finishMaybe, stream, state); + + // needed for duplex, fixes https://github.com/nodejs/node/issues/6083 + if (stream._readableState) { + stream._readableState.errorEmitted = true; + } stream._writableState.errorEmitted = true; stream.emit('error', er); } else { // the caller expect this to happen before if // it is async cb(er); + + // needed for duplex, fixes https://github.com/nodejs/node/issues/6083 + if (stream._readableState) { + stream._readableState.errorEmitted = true; + } stream._writableState.errorEmitted = true; stream.emit('error', er); // this can emit finish, but finish must diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 3a0383cc3cea70..2ab614e1d597da 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -8,10 +8,14 @@ function destroy(err, cb) { this._writableState.destroyed; if (readableDestroyed || writableDestroyed) { + const readableErrored = this._readableState && + this._readableState.errorEmitted; + const writableErrored = this._writableState && + this._writableState.errorEmitted; + if (cb) { cb(err); - } else if (err && - (!this._writableState || !this._writableState.errorEmitted)) { + } else if (err && !readableErrored && !writableErrored) { process.nextTick(emitErrorNT, this, err); } return this; @@ -32,6 +36,11 @@ function destroy(err, cb) { this._destroy(err || null, (err) => { if (!cb && err) { process.nextTick(emitErrorAndCloseNT, this, err); + + if (this._readableState) { + this._readableState.errorEmitted = true; + } + if (this._writableState) { this._writableState.errorEmitted = true; } @@ -65,6 +74,7 @@ function undestroy() { this._readableState.reading = false; this._readableState.ended = false; this._readableState.endEmitted = false; + this._readableState.errorEmitted = false; } if (this._writableState) { diff --git a/test/parallel/test-http2-client-destroy.js b/test/parallel/test-http2-client-destroy.js index eab413e2327d8f..09ca011c736c96 100644 --- a/test/parallel/test-http2-client-destroy.js +++ b/test/parallel/test-http2-client-destroy.js @@ -95,7 +95,6 @@ const Countdown = require('../common/countdown'); }); req.resume(); - req.on('end', common.mustCall()); req.on('close', common.mustCall(() => server.close())); })); } diff --git a/test/parallel/test-http2-client-onconnect-errors.js b/test/parallel/test-http2-client-onconnect-errors.js index a75dc590c669a1..f427bfb4907339 100644 --- a/test/parallel/test-http2-client-onconnect-errors.js +++ b/test/parallel/test-http2-client-onconnect-errors.js @@ -101,7 +101,6 @@ function runTest(test) { }); } - req.on('end', common.mustCall()); req.on('close', common.mustCall(() => { client.destroy(); diff --git a/test/parallel/test-http2-client-stream-destroy-before-connect.js b/test/parallel/test-http2-client-stream-destroy-before-connect.js index d834de5d11ebe7..9e81015ec58d28 100644 --- a/test/parallel/test-http2-client-stream-destroy-before-connect.js +++ b/test/parallel/test-http2-client-stream-destroy-before-connect.js @@ -45,5 +45,4 @@ server.listen(0, common.mustCall(() => { req.on('response', common.mustNotCall()); req.resume(); - req.on('end', common.mustCall()); })); diff --git a/test/parallel/test-http2-compat-serverresponse-destroy.js b/test/parallel/test-http2-compat-serverresponse-destroy.js index 8ee52a74ab4e81..49822082979a01 100644 --- a/test/parallel/test-http2-compat-serverresponse-destroy.js +++ b/test/parallel/test-http2-compat-serverresponse-destroy.js @@ -63,7 +63,6 @@ server.listen(0, common.mustCall(() => { req.on('close', common.mustCall(() => countdown.dec())); req.resume(); - req.on('end', common.mustCall()); } { @@ -78,6 +77,5 @@ server.listen(0, common.mustCall(() => { req.on('close', common.mustCall(() => countdown.dec())); req.resume(); - req.on('end', common.mustCall()); } })); diff --git a/test/parallel/test-http2-max-concurrent-streams.js b/test/parallel/test-http2-max-concurrent-streams.js index b270d6cc6aff31..2b576700aa4e00 100644 --- a/test/parallel/test-http2-max-concurrent-streams.js +++ b/test/parallel/test-http2-max-concurrent-streams.js @@ -45,7 +45,6 @@ server.listen(0, common.mustCall(() => { req.on('aborted', common.mustCall()); req.on('response', common.mustNotCall()); req.resume(); - req.on('end', common.mustCall()); req.on('close', common.mustCall(() => countdown.dec())); req.on('error', common.expectsError({ code: 'ERR_HTTP2_STREAM_ERROR', diff --git a/test/parallel/test-http2-misused-pseudoheaders.js b/test/parallel/test-http2-misused-pseudoheaders.js index e9253baa74ad1d..f81f91cdfa1cdf 100644 --- a/test/parallel/test-http2-misused-pseudoheaders.js +++ b/test/parallel/test-http2-misused-pseudoheaders.js @@ -46,7 +46,6 @@ server.listen(0, common.mustCall(() => { req.on('response', common.mustCall()); req.resume(); - req.on('end', common.mustCall()); req.on('close', common.mustCall(() => { server.close(); client.close(); diff --git a/test/parallel/test-http2-multi-content-length.js b/test/parallel/test-http2-multi-content-length.js index 7d8ff4858fedbb..908f6ecd64fea1 100644 --- a/test/parallel/test-http2-multi-content-length.js +++ b/test/parallel/test-http2-multi-content-length.js @@ -53,7 +53,6 @@ server.listen(0, common.mustCall(() => { // header to be set for non-payload bearing requests... const req = client.request({ 'content-length': 1 }); req.resume(); - req.on('end', common.mustCall()); req.on('close', common.mustCall(() => countdown.dec())); req.on('error', common.expectsError({ code: 'ERR_HTTP2_STREAM_ERROR', diff --git a/test/parallel/test-http2-respond-file-fd-invalid.js b/test/parallel/test-http2-respond-file-fd-invalid.js index 21fcf790b449eb..28d1c0f057dd23 100644 --- a/test/parallel/test-http2-respond-file-fd-invalid.js +++ b/test/parallel/test-http2-respond-file-fd-invalid.js @@ -40,7 +40,7 @@ server.listen(0, () => { req.on('response', common.mustCall()); req.on('error', common.mustCall(errorCheck)); req.on('data', common.mustNotCall()); - req.on('end', common.mustCall(() => { + req.on('close', common.mustCall(() => { assert.strictEqual(req.rstCode, NGHTTP2_INTERNAL_ERROR); client.close(); server.close(); diff --git a/test/parallel/test-http2-respond-nghttperrors.js b/test/parallel/test-http2-respond-nghttperrors.js index ad9eee0d59fecc..4adf678b681b09 100644 --- a/test/parallel/test-http2-respond-nghttperrors.js +++ b/test/parallel/test-http2-respond-nghttperrors.js @@ -87,7 +87,7 @@ function runTest(test) { req.resume(); req.end(); - req.on('end', common.mustCall(() => { + req.on('close', common.mustCall(() => { client.close(); if (!tests.length) { diff --git a/test/parallel/test-http2-respond-with-fd-errors.js b/test/parallel/test-http2-respond-with-fd-errors.js index 3a671a3e36490a..7e7394d29305cc 100644 --- a/test/parallel/test-http2-respond-with-fd-errors.js +++ b/test/parallel/test-http2-respond-with-fd-errors.js @@ -95,7 +95,7 @@ function runTest(test) { req.resume(); req.end(); - req.on('end', common.mustCall(() => { + req.on('close', common.mustCall(() => { client.close(); if (!tests.length) { diff --git a/test/parallel/test-http2-server-shutdown-before-respond.js b/test/parallel/test-http2-server-shutdown-before-respond.js index 33f224fc69a9d5..50b3a5572a58e6 100644 --- a/test/parallel/test-http2-server-shutdown-before-respond.js +++ b/test/parallel/test-http2-server-shutdown-before-respond.js @@ -32,5 +32,5 @@ server.on('listening', common.mustCall(() => { })); req.resume(); req.on('data', common.mustNotCall()); - req.on('end', common.mustCall(() => server.close())); + req.on('close', common.mustCall(() => server.close())); })); diff --git a/test/parallel/test-http2-server-socket-destroy.js b/test/parallel/test-http2-server-socket-destroy.js index 03afc1957b8af4..d631ef032b823b 100644 --- a/test/parallel/test-http2-server-socket-destroy.js +++ b/test/parallel/test-http2-server-socket-destroy.js @@ -52,5 +52,4 @@ server.on('listening', common.mustCall(() => { req.on('aborted', common.mustCall()); req.resume(); - req.on('end', common.mustCall()); })); diff --git a/test/parallel/test-stream-duplex-error-write.js b/test/parallel/test-stream-duplex-error-write.js new file mode 100644 index 00000000000000..5a80ce5c3e4989 --- /dev/null +++ b/test/parallel/test-stream-duplex-error-write.js @@ -0,0 +1,24 @@ +'use strict'; + +const common = require('../common'); +const { Duplex } = require('stream'); +const { strictEqual } = require('assert'); + +const duplex = new Duplex({ + write(chunk, enc, cb) { + cb(new Error('kaboom')); + }, + read() { + this.push(null); + } +}); + +duplex.on('error', common.mustCall(function() { + strictEqual(this._readableState.errorEmitted, true); + strictEqual(this._writableState.errorEmitted, true); +})); + +duplex.on('end', common.mustNotCall()); + +duplex.end('hello'); +duplex.resume(); diff --git a/test/parallel/test-stream-readable-destroy.js b/test/parallel/test-stream-readable-destroy.js index 026aa8ca1603b8..eecee04294e6fe 100644 --- a/test/parallel/test-stream-readable-destroy.js +++ b/test/parallel/test-stream-readable-destroy.js @@ -189,3 +189,18 @@ const { inherits } = require('util'); read.push('hi'); read.on('data', common.mustNotCall()); } + +{ + // double error case + const read = new Readable({ + read() {} + }); + + read.on('close', common.mustCall()); + read.on('error', common.mustCall()); + + read.destroy(new Error('kaboom 1')); + read.destroy(new Error('kaboom 2')); + assert.strictEqual(read._readableState.errorEmitted, true); + assert.strictEqual(read.destroyed, true); +}