From 03b85ec4db64714c3ea9695aa1765aa00f307885 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 22 Nov 2019 19:13:00 +0100 Subject: [PATCH 1/2] stream: invoke buffered write callbacks on error Buffered write callbacks were only invoked upon error if `autoDestroy` was invoked. Backport-PR-URL: https://github.com/nodejs/node/pull/31179 PR-URL: https://github.com/nodejs/node/pull/30596 Reviewed-By: Matteo Collina Reviewed-By: Anna Henningsen Reviewed-By: Luigi Pinca Reviewed-By: Rich Trott --- lib/_stream_writable.js | 32 +++++-- test/parallel/test-stream-writable-destroy.js | 95 +++++++++++++++++++ 2 files changed, 121 insertions(+), 6 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 5c36ae66070134..01d30085079eaf 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -458,6 +458,11 @@ function onwriteError(stream, state, er, cb) { --state.pendingcb; cb(er); + // Ensure callbacks are invoked even when autoDestroy is + // not enabled. Passing `er` here doesn't make sense since + // it's related to one specific write, not to the buffered + // writes. + errorBuffer(state, new ERR_STREAM_DESTROYED('write')); // This can emit error, but error must always follow cb. errorOrDestroy(stream, er); } @@ -529,9 +534,29 @@ function afterWrite(stream, state, count, cb) { cb(); } + if (state.destroyed) { + errorBuffer(state, new ERR_STREAM_DESTROYED('write')); + } + finishMaybe(stream, state); } +// If there's something in the buffer waiting, then invoke callbacks. +function errorBuffer(state, err) { + if (state.writing || !state.bufferedRequest) { + return; + } + + for (let entry = state.bufferedRequest; entry; entry = entry.next) { + const len = state.objectMode ? 1 : entry.chunk.length; + state.length -= len; + entry.callback(err); + } + state.bufferedRequest = null; + state.lastBufferedRequest = null; + state.bufferedRequestCount = 0; +} + // If there's something in the buffer waiting, then process it function clearBuffer(stream, state) { state.bufferProcessing = true; @@ -781,12 +806,7 @@ const destroy = destroyImpl.destroy; Writable.prototype.destroy = function(err, cb) { const state = this._writableState; if (!state.destroyed) { - for (let entry = state.bufferedRequest; entry; entry = entry.next) { - process.nextTick(entry.callback, new ERR_STREAM_DESTROYED('write')); - } - state.bufferedRequest = null; - state.lastBufferedRequest = null; - state.bufferedRequestCount = 0; + process.nextTick(errorBuffer, state, new ERR_STREAM_DESTROYED('write')); } destroy.call(this, err, cb); return this; diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index 4a2c7b0884b417..83936eaff3380c 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -292,3 +292,98 @@ const assert = require('assert'); })); write.uncork(); } + +{ + // Call end(cb) after error & destroy + + const write = new Writable({ + write(chunk, enc, cb) { cb(new Error('asd')); } + }); + write.on('error', common.mustCall(() => { + write.destroy(); + let ticked = false; + write.end(common.mustCall((err) => { + assert.strictEqual(ticked, true); + assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); + })); + ticked = true; + })); + write.write('asd'); +} + +{ + // Call end(cb) after finish & destroy + + const write = new Writable({ + write(chunk, enc, cb) { cb(); } + }); + write.on('finish', common.mustCall(() => { + write.destroy(); + let ticked = false; + write.end(common.mustCall((err) => { + assert.strictEqual(ticked, false); + assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED'); + })); + ticked = true; + })); + write.end(); +} + +{ + // Call end(cb) after error & destroy and don't trigger + // unhandled exception. + + const write = new Writable({ + write(chunk, enc, cb) { process.nextTick(cb); } + }); + write.once('error', common.mustCall((err) => { + assert.strictEqual(err.message, 'asd'); + })); + write.end('asd', common.mustCall((err) => { + assert.strictEqual(err.message, 'asd'); + })); + write.destroy(new Error('asd')); +} + +{ + // Call buffered write callback with error + + const write = new Writable({ + write(chunk, enc, cb) { + process.nextTick(cb, new Error('asd')); + }, + autoDestroy: false + }); + write.cork(); + write.write('asd', common.mustCall((err) => { + assert.strictEqual(err.message, 'asd'); + })); + write.write('asd', common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); + })); + write.on('error', common.mustCall((err) => { + assert.strictEqual(err.message, 'asd'); + })); + write.uncork(); +} + +{ + // Ensure callback order. + + let state = 0; + const write = new Writable({ + write(chunk, enc, cb) { + // `setImmediate()` is used on purpose to ensure the callback is called + // after `process.nextTick()` callbacks. + setImmediate(cb); + } + }); + write.write('asd', common.mustCall(() => { + assert.strictEqual(state++, 0); + })); + write.write('asd', common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); + assert.strictEqual(state++, 1); + })); + write.destroy(); +} From 96969634413624080852bd9c59d246eacfeba328 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 3 Jan 2020 20:41:51 +0100 Subject: [PATCH 2/2] fixup: unrelated tests --- test/parallel/test-stream-writable-destroy.js | 52 ------------------- 1 file changed, 52 deletions(-) diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index 83936eaff3380c..d67bdae3bba36e 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -293,58 +293,6 @@ const assert = require('assert'); write.uncork(); } -{ - // Call end(cb) after error & destroy - - const write = new Writable({ - write(chunk, enc, cb) { cb(new Error('asd')); } - }); - write.on('error', common.mustCall(() => { - write.destroy(); - let ticked = false; - write.end(common.mustCall((err) => { - assert.strictEqual(ticked, true); - assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); - })); - ticked = true; - })); - write.write('asd'); -} - -{ - // Call end(cb) after finish & destroy - - const write = new Writable({ - write(chunk, enc, cb) { cb(); } - }); - write.on('finish', common.mustCall(() => { - write.destroy(); - let ticked = false; - write.end(common.mustCall((err) => { - assert.strictEqual(ticked, false); - assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED'); - })); - ticked = true; - })); - write.end(); -} - -{ - // Call end(cb) after error & destroy and don't trigger - // unhandled exception. - - const write = new Writable({ - write(chunk, enc, cb) { process.nextTick(cb); } - }); - write.once('error', common.mustCall((err) => { - assert.strictEqual(err.message, 'asd'); - })); - write.end('asd', common.mustCall((err) => { - assert.strictEqual(err.message, 'asd'); - })); - write.destroy(new Error('asd')); -} - { // Call buffered write callback with error