Skip to content
Permalink
Browse files

stream: emit 'error' asynchronously

errorOrDestroy emits 'error' synchronously due to
compat reasons. However, it should be possible to
use correct async behaviour for new code.

PR-URL: #29744
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Rich Trott <rtrott@gmail.com>
  • Loading branch information
ronag authored and Trott committed Sep 28, 2019
1 parent 9085c03 commit 75b30c606c9b18fdb2634e8fe5e2ca5e9a889286
@@ -1853,7 +1853,8 @@ methods only.
The `callback` method must be called to signal either that the write completed
successfully or failed with an error. The first argument passed to the
`callback` must be the `Error` object if the call failed or `null` if the
write succeeded.
write succeeded. The `callback` method will always be called asynchronously and
before `'error'` is emitted.

All calls to `writable.write()` that occur between the time `writable._write()`
is called and the `callback` is called will cause the written data to be
@@ -265,33 +265,6 @@ Writable.prototype.pipe = function() {
errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
};


function writeAfterEnd(stream, cb) {
const er = new ERR_STREAM_WRITE_AFTER_END();
// TODO: defer error events consistently everywhere, not just the cb
errorOrDestroy(stream, er);
process.nextTick(cb, er);
}

// Checks that a user-supplied chunk is valid, especially for the particular
// mode the stream is in. Currently this means that `null` is never accepted
// and undefined/non-string values are only allowed in object mode.
function validChunk(stream, state, chunk, cb) {
var er;

if (chunk === null) {
er = new ERR_STREAM_NULL_VALUES();
} else if (typeof chunk !== 'string' && !state.objectMode) {
er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
}
if (er) {
errorOrDestroy(stream, er);
process.nextTick(cb, er);
return false;
}
return true;
}

Writable.prototype.write = function(chunk, encoding, cb) {
const state = this._writableState;
var ret = false;
@@ -315,17 +288,25 @@ Writable.prototype.write = function(chunk, encoding, cb) {
if (typeof cb !== 'function')
cb = nop;

let err;
if (state.ending) {
writeAfterEnd(this, cb);
err = new ERR_STREAM_WRITE_AFTER_END();
} else if (state.destroyed) {
const err = new ERR_STREAM_DESTROYED('write');
process.nextTick(cb, err);
errorOrDestroy(this, err);
} else if (isBuf || validChunk(this, state, chunk, cb)) {
err = new ERR_STREAM_DESTROYED('write');
} else if (chunk === null) {
err = new ERR_STREAM_NULL_VALUES();
} else if (!isBuf && typeof chunk !== 'string' && !state.objectMode) {
err = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
} else {
state.pendingcb++;
ret = writeOrBuffer(this, state, chunk, encoding, cb);
}

if (err) {
process.nextTick(cb, err);
errorOrDestroy(this, err, true);
}

return ret;
};

@@ -629,7 +610,7 @@ Writable.prototype._write = function(chunk, encoding, cb) {
if (this._writev) {
this._writev([{ chunk, encoding }], cb);
} else {
cb(new ERR_METHOD_NOT_IMPLEMENTED('_write()'));
process.nextTick(cb, new ERR_METHOD_NOT_IMPLEMENTED('_write()'));
}
};

@@ -656,15 +637,25 @@ Writable.prototype.end = function(chunk, encoding, cb) {
this.uncork();
}

if (typeof cb !== 'function')
cb = nop;

// Ignore unnecessary end() calls.
if (!state.ending) {
// TODO(ronag): Compat. Allow end() after destroy().
if (!state.errored && !state.ending) {
endWritable(this, state, cb);
} else if (typeof cb === 'function') {
if (!state.finished) {
onFinished(this, state, cb);
} else {
cb(new ERR_STREAM_ALREADY_FINISHED('end'));
}
} else if (state.finished) {
const err = new ERR_STREAM_ALREADY_FINISHED('end');
process.nextTick(cb, err);
// TODO(ronag): Compat. Don't error the stream.
// errorOrDestroy(this, err, true);
} else if (state.destroyed) {
const err = new ERR_STREAM_DESTROYED('end');
process.nextTick(cb, err);
// TODO(ronag): Compat. Don't error the stream.
// errorOrDestroy(this, err, true);
} else if (cb !== nop) {
onFinished(this, state, cb);
}

return this;
@@ -749,7 +740,7 @@ function finish(stream, state) {
function endWritable(stream, state, cb) {
state.ending = true;
finishMaybe(stream, state, true);
if (cb) {
if (cb !== nop) {
if (state.finished)
process.nextTick(cb);
else
@@ -774,14 +765,6 @@ function onCorkedFinish(corkReq, state, err) {
}

function onFinished(stream, state, cb) {
if (state.destroyed && state.errorEmitted) {
// TODO(ronag): Backwards compat. Should be moved to end() without
// errorEmitted check and with errorOrDestroy.
const err = new ERR_STREAM_DESTROYED('end');
process.nextTick(cb, err);
return;
}

function onerror(err) {
stream.removeListener('finish', onfinish);
stream.removeListener('error', onerror);
@@ -119,7 +119,7 @@ function undestroy() {
}
}

function errorOrDestroy(stream, err) {
function errorOrDestroy(stream, err, sync) {
// We have tests that rely on errors being emitted
// in the same tick, so changing this is semver major.
// For now when you opt-in to autoDestroy we allow
@@ -138,7 +138,12 @@ function errorOrDestroy(stream, err) {
if (r) {
r.errored = true;
}
emitErrorNT(stream, err);

if (sync) {
process.nextTick(emitErrorNT, stream, err);
} else {
emitErrorNT(stream, err);
}
}
}

@@ -32,7 +32,7 @@ const server = net.createServer((conn) => {
}));
}).listen(common.PIPE, () => {
const client = net.connect(common.PIPE, common.mustCall());
client.on('data', () => {
client.once('data', () => {
client.end(() => {
server.close();
});
@@ -20,7 +20,7 @@
// USE OR OTHER DEALINGS IN THE SOFTWARE.

'use strict';
require('../common');
const common = require('../common');
const assert = require('assert');

const path = require('path');
@@ -46,9 +46,6 @@ file
callbacks.open++;
assert.strictEqual(typeof fd, 'number');
})
.on('error', function(err) {
throw err;
})
.on('drain', function() {
console.error('drain!', callbacks.drain);
callbacks.drain++;
@@ -65,17 +62,13 @@ file
assert.strictEqual(file.bytesWritten, EXPECTED.length * 2);

callbacks.close++;
assert.throws(
() => {
console.error('write after end should not be allowed');
file.write('should not work anymore');
},
{
code: 'ERR_STREAM_WRITE_AFTER_END',
name: 'Error',
message: 'write after end'
}
);
console.error('write after end should not be allowed');
file.write('should not work anymore');
file.on('error', common.expectsError({
code: 'ERR_STREAM_WRITE_AFTER_END',
name: 'Error',
message: 'write after end'
}));

fs.unlinkSync(fn);
});
@@ -1,18 +1,20 @@
'use strict';

require('../common');
const assert = require('assert');
const common = require('../common');
const net = require('net');

const server = net.createServer().listen(0, connectToServer);

function connectToServer() {
const client = net.createConnection(this.address().port, () => {
assert.throws(() => client.write(1337),
{
code: 'ERR_INVALID_ARG_TYPE',
name: 'TypeError'
});
client.write(1337, common.expectsError({
code: 'ERR_INVALID_ARG_TYPE',
name: 'TypeError'
}));
client.on('error', common.expectsError({
code: 'ERR_INVALID_ARG_TYPE',
name: 'TypeError'
}));

client.destroy();
})
@@ -1,17 +1,21 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const net = require('net');

const socket = net.Stream({ highWaterMark: 0 });

// Make sure that anything besides a buffer or a string throws.
assert.throws(() => socket.write(null),
{
code: 'ERR_STREAM_NULL_VALUES',
name: 'TypeError',
message: 'May not write null values to stream'
});
socket.write(null, common.expectsError({
code: 'ERR_STREAM_NULL_VALUES',
name: 'TypeError',
message: 'May not write null values to stream'
}));
socket.on('error', common.expectsError({
code: 'ERR_STREAM_NULL_VALUES',
name: 'TypeError',
message: 'May not write null values to stream'
}));

[
true,
false,
@@ -341,7 +341,7 @@ const assert = require('assert');
write.destroy();
let ticked = false;
write.end(common.mustCall((err) => {
assert.strictEqual(ticked, false);
assert.strictEqual(ticked, true);
assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED');
}));
ticked = true;
@@ -6,15 +6,17 @@ const assert = require('assert');
const stream = require('stream');

const writable = new stream.Writable();

writable._write = (chunk, encoding, cb) => {
setTimeout(() => cb(), 10);
};

writable.end('testing ended state', common.mustCall());
writable.end(common.mustCall());
writable.on('finish', common.mustCall(() => {
let ticked = false;
writable.end(common.mustCall((err) => {
assert.strictEqual(ticked, true);
assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED');
}));
ticked = true;
}));
@@ -1,5 +1,5 @@
'use strict';
require('../common');
const common = require('../common');
const assert = require('assert');

const stream = require('stream');
@@ -14,33 +14,29 @@ class MyWritable extends stream.Writable {
}
}

assert.throws(
() => {
const m = new MyWritable({ objectMode: true });
m.write(null, (err) => assert.ok(err));
},
{
{
const m = new MyWritable({ objectMode: true });
m.write(null, (err) => assert.ok(err));
m.on('error', common.expectsError({
code: 'ERR_STREAM_NULL_VALUES',
name: 'TypeError',
message: 'May not write null values to stream'
}
);
}));
}

{ // Should not throw.
const m = new MyWritable({ objectMode: true }).on('error', assert);
m.write(null, assert);
}

assert.throws(
() => {
const m = new MyWritable();
m.write(false, (err) => assert.ok(err));
},
{
{
const m = new MyWritable();
m.write(false, (err) => assert.ok(err));
m.on('error', common.expectsError({
code: 'ERR_INVALID_ARG_TYPE',
name: 'TypeError'
}
);
}));
}

{ // Should not throw.
const m = new MyWritable().on('error', assert);

0 comments on commit 75b30c6

Please sign in to comment.
You can’t perform that action at this time.