Skip to content
Permalink
Browse files

stream: always invoke end callback

Ensure that the callback passed into end() is always invoke in
order to avoid bug such as deadlock the user.

PR-URL: #29747
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Trivikram Kamat <trivikr.dev@gmail.com>
  • Loading branch information
ronag authored and addaleax committed Sep 28, 2019
1 parent 535e957 commit 9d09969f4c29b7f2bacc9cb44e210c4e269945a4
@@ -611,11 +611,11 @@ Writable.prototype.end = function(chunk, encoding, cb) {
}

// Ignore unnecessary end() calls.
if (!state.ending)
if (!state.ending) {
endWritable(this, state, cb);
else if (typeof cb === 'function') {
} else if (typeof cb === 'function') {
if (!state.finished) {
this.once('finish', cb);
onFinished(this, state, cb);
} else {
cb(new ERR_STREAM_ALREADY_FINISHED('end'));
}
@@ -695,7 +695,7 @@ function endWritable(stream, state, cb) {
if (state.finished)
process.nextTick(cb);
else
stream.once('finish', cb);
onFinished(stream, state, cb);
}
state.ended = true;
stream.writable = false;
@@ -715,6 +715,32 @@ function onCorkedFinish(corkReq, state, err) {
state.corkedRequestsFree.next = corkReq;
}

function onFinished(stream, state, cb) {
if (state.destroyed && state.errorEmitted) {
// TODO(ronag): Backwards compat. Should be moved to end() without
// errorEmitted check and with errorOrDestroy.
const err = new ERR_STREAM_DESTROYED('end');
process.nextTick(cb, err);
return;
}

function onerror(err) {
stream.removeListener('finish', onfinish);
stream.removeListener('error', onerror);
cb(err);
if (stream.listenerCount('error') === 0) {
stream.emit('error', err);
}
}
function onfinish() {
stream.removeListener('finish', onfinish);
stream.removeListener('error', onerror);
cb();
}
stream.on('finish', onfinish);
stream.prependListener('error', onerror);
}

Object.defineProperty(Writable.prototype, 'destroyed', {
// Making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
@@ -292,3 +292,55 @@ const assert = require('assert');
}));
write.uncork();
}

{
// Call end(cb) after error & destroy

const write = new Writable({
write(chunk, enc, cb) { cb(new Error('asd')); }
});
write.on('error', common.mustCall(() => {
write.destroy();
let ticked = false;
write.end(common.mustCall((err) => {
assert.strictEqual(ticked, true);
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
}));
ticked = true;
}));
write.write('asd');
}

{
// Call end(cb) after finish & destroy

const write = new Writable({
write(chunk, enc, cb) { cb(); }
});
write.on('finish', common.mustCall(() => {
write.destroy();
let ticked = false;
write.end(common.mustCall((err) => {
assert.strictEqual(ticked, false);
assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED');
}));
ticked = true;
}));
write.end();
}

{
// Call end(cb) after error & destroy and don't trigger
// unhandled exception.

const write = new Writable({
write(chunk, enc, cb) { process.nextTick(cb); }
});
write.once('error', common.mustCall((err) => {
assert.strictEqual(err.message, 'asd');
}));
write.end('asd', common.mustCall((err) => {
assert.strictEqual(err.message, 'asd');
}));
write.destroy(new Error('asd'));
}
@@ -0,0 +1,48 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const stream = require('stream');

{
// Invoke end callback on failure.
const writable = new stream.Writable();

writable._write = (chunk, encoding, cb) => {
process.nextTick(cb, new Error('kaboom'));
};

writable.on('error', common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));
writable.write('asd');
writable.end(common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));
writable.end(common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));
}

{
// Don't invoke end callback twice
const writable = new stream.Writable();

writable._write = (chunk, encoding, cb) => {
process.nextTick(cb);
};

let called = false;
writable.end('asd', common.mustCall((err) => {
called = true;
assert.strictEqual(err, undefined);
}));

writable.on('error', common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));
writable.on('finish', common.mustCall(() => {
assert.strictEqual(called, true);
writable.emit('error', new Error('kaboom'));
}));
}
@@ -0,0 +1,23 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const stream = require('stream');

process.on('uncaughtException', common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));

const writable = new stream.Writable();

writable._write = (chunk, encoding, cb) => {
cb();
};
writable._final = (cb) => {
cb(new Error('kaboom'));
};

writable.write('asd');
writable.end(common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));

0 comments on commit 9d09969

Please sign in to comment.
You can’t perform that action at this time.