Skip to content

Commit

Permalink
stream: emit 'error' asynchronously
Browse files Browse the repository at this point in the history
errorOrDestroy emits 'error' synchronously due to
compat reasons. However, it should be possible to
use correct async behaviour for new code.

PR-URL: #29744
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Rich Trott <rtrott@gmail.com>
  • Loading branch information
ronag authored and Trott committed Jan 5, 2020
1 parent 9085c03 commit 75b30c6
Show file tree
Hide file tree
Showing 13 changed files with 163 additions and 116 deletions.
3 changes: 2 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1853,7 +1853,8 @@ methods only.
The `callback` method must be called to signal either that the write completed
successfully or failed with an error. The first argument passed to the
`callback` must be the `Error` object if the call failed or `null` if the
write succeeded.
write succeeded. The `callback` method will always be called asynchronously and
before `'error'` is emitted.

All calls to `writable.write()` that occur between the time `writable._write()`
is called and the `callback` is called will cause the written data to be
Expand Down
81 changes: 32 additions & 49 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -265,33 +265,6 @@ Writable.prototype.pipe = function() {
errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
};


function writeAfterEnd(stream, cb) {
const er = new ERR_STREAM_WRITE_AFTER_END();
// TODO: defer error events consistently everywhere, not just the cb
errorOrDestroy(stream, er);
process.nextTick(cb, er);
}

// Checks that a user-supplied chunk is valid, especially for the particular
// mode the stream is in. Currently this means that `null` is never accepted
// and undefined/non-string values are only allowed in object mode.
function validChunk(stream, state, chunk, cb) {
var er;

if (chunk === null) {
er = new ERR_STREAM_NULL_VALUES();
} else if (typeof chunk !== 'string' && !state.objectMode) {
er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
}
if (er) {
errorOrDestroy(stream, er);
process.nextTick(cb, er);
return false;
}
return true;
}

Writable.prototype.write = function(chunk, encoding, cb) {
const state = this._writableState;
var ret = false;
Expand All @@ -315,17 +288,25 @@ Writable.prototype.write = function(chunk, encoding, cb) {
if (typeof cb !== 'function')
cb = nop;

let err;
if (state.ending) {
writeAfterEnd(this, cb);
err = new ERR_STREAM_WRITE_AFTER_END();
} else if (state.destroyed) {
const err = new ERR_STREAM_DESTROYED('write');
process.nextTick(cb, err);
errorOrDestroy(this, err);
} else if (isBuf || validChunk(this, state, chunk, cb)) {
err = new ERR_STREAM_DESTROYED('write');
} else if (chunk === null) {
err = new ERR_STREAM_NULL_VALUES();
} else if (!isBuf && typeof chunk !== 'string' && !state.objectMode) {
err = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
} else {
state.pendingcb++;
ret = writeOrBuffer(this, state, chunk, encoding, cb);
}

if (err) {
process.nextTick(cb, err);
errorOrDestroy(this, err, true);
}

return ret;
};

Expand Down Expand Up @@ -629,7 +610,7 @@ Writable.prototype._write = function(chunk, encoding, cb) {
if (this._writev) {
this._writev([{ chunk, encoding }], cb);
} else {
cb(new ERR_METHOD_NOT_IMPLEMENTED('_write()'));
process.nextTick(cb, new ERR_METHOD_NOT_IMPLEMENTED('_write()'));
}
};

Expand All @@ -656,15 +637,25 @@ Writable.prototype.end = function(chunk, encoding, cb) {
this.uncork();
}

if (typeof cb !== 'function')
cb = nop;

// Ignore unnecessary end() calls.
if (!state.ending) {
// TODO(ronag): Compat. Allow end() after destroy().
if (!state.errored && !state.ending) {
endWritable(this, state, cb);
} else if (typeof cb === 'function') {
if (!state.finished) {
onFinished(this, state, cb);
} else {
cb(new ERR_STREAM_ALREADY_FINISHED('end'));
}
} else if (state.finished) {
const err = new ERR_STREAM_ALREADY_FINISHED('end');
process.nextTick(cb, err);
// TODO(ronag): Compat. Don't error the stream.
// errorOrDestroy(this, err, true);
} else if (state.destroyed) {
const err = new ERR_STREAM_DESTROYED('end');
process.nextTick(cb, err);
// TODO(ronag): Compat. Don't error the stream.
// errorOrDestroy(this, err, true);
} else if (cb !== nop) {
onFinished(this, state, cb);
}

return this;
Expand Down Expand Up @@ -749,7 +740,7 @@ function finish(stream, state) {
function endWritable(stream, state, cb) {
state.ending = true;
finishMaybe(stream, state, true);
if (cb) {
if (cb !== nop) {
if (state.finished)
process.nextTick(cb);
else
Expand All @@ -774,14 +765,6 @@ function onCorkedFinish(corkReq, state, err) {
}

function onFinished(stream, state, cb) {
if (state.destroyed && state.errorEmitted) {
// TODO(ronag): Backwards compat. Should be moved to end() without
// errorEmitted check and with errorOrDestroy.
const err = new ERR_STREAM_DESTROYED('end');
process.nextTick(cb, err);
return;
}

function onerror(err) {
stream.removeListener('finish', onfinish);
stream.removeListener('error', onerror);
Expand Down
9 changes: 7 additions & 2 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ function undestroy() {
}
}

function errorOrDestroy(stream, err) {
function errorOrDestroy(stream, err, sync) {
// We have tests that rely on errors being emitted
// in the same tick, so changing this is semver major.
// For now when you opt-in to autoDestroy we allow
Expand All @@ -138,7 +138,12 @@ function errorOrDestroy(stream, err) {
if (r) {
r.errored = true;
}
emitErrorNT(stream, err);

if (sync) {
process.nextTick(emitErrorNT, stream, err);
} else {
emitErrorNT(stream, err);
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-child-process-server-close.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const server = net.createServer((conn) => {
}));
}).listen(common.PIPE, () => {
const client = net.connect(common.PIPE, common.mustCall());
client.on('data', () => {
client.once('data', () => {
client.end(() => {
server.close();
});
Expand Down
23 changes: 8 additions & 15 deletions test/parallel/test-file-write-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// USE OR OTHER DEALINGS IN THE SOFTWARE.

'use strict';
require('../common');
const common = require('../common');
const assert = require('assert');

const path = require('path');
Expand All @@ -46,9 +46,6 @@ file
callbacks.open++;
assert.strictEqual(typeof fd, 'number');
})
.on('error', function(err) {
throw err;
})
.on('drain', function() {
console.error('drain!', callbacks.drain);
callbacks.drain++;
Expand All @@ -65,17 +62,13 @@ file
assert.strictEqual(file.bytesWritten, EXPECTED.length * 2);

callbacks.close++;
assert.throws(
() => {
console.error('write after end should not be allowed');
file.write('should not work anymore');
},
{
code: 'ERR_STREAM_WRITE_AFTER_END',
name: 'Error',
message: 'write after end'
}
);
console.error('write after end should not be allowed');
file.write('should not work anymore');
file.on('error', common.expectsError({
code: 'ERR_STREAM_WRITE_AFTER_END',
name: 'Error',
message: 'write after end'
}));

fs.unlinkSync(fn);
});
Expand Down
16 changes: 9 additions & 7 deletions test/parallel/test-net-socket-write-error.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
'use strict';

require('../common');
const assert = require('assert');
const common = require('../common');
const net = require('net');

const server = net.createServer().listen(0, connectToServer);

function connectToServer() {
const client = net.createConnection(this.address().port, () => {
assert.throws(() => client.write(1337),
{
code: 'ERR_INVALID_ARG_TYPE',
name: 'TypeError'
});
client.write(1337, common.expectsError({
code: 'ERR_INVALID_ARG_TYPE',
name: 'TypeError'
}));
client.on('error', common.expectsError({
code: 'ERR_INVALID_ARG_TYPE',
name: 'TypeError'
}));

client.destroy();
})
Expand Down
18 changes: 11 additions & 7 deletions test/parallel/test-net-write-arguments.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const net = require('net');

const socket = net.Stream({ highWaterMark: 0 });

// Make sure that anything besides a buffer or a string throws.
assert.throws(() => socket.write(null),
{
code: 'ERR_STREAM_NULL_VALUES',
name: 'TypeError',
message: 'May not write null values to stream'
});
socket.write(null, common.expectsError({
code: 'ERR_STREAM_NULL_VALUES',
name: 'TypeError',
message: 'May not write null values to stream'
}));
socket.on('error', common.expectsError({
code: 'ERR_STREAM_NULL_VALUES',
name: 'TypeError',
message: 'May not write null values to stream'
}));

[
true,
false,
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-stream-writable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ const assert = require('assert');
write.destroy();
let ticked = false;
write.end(common.mustCall((err) => {
assert.strictEqual(ticked, false);
assert.strictEqual(ticked, true);
assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED');
}));
ticked = true;
Expand Down
4 changes: 3 additions & 1 deletion test/parallel/test-stream-writable-end-multiple.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ const assert = require('assert');
const stream = require('stream');

const writable = new stream.Writable();

writable._write = (chunk, encoding, cb) => {
setTimeout(() => cb(), 10);
};

writable.end('testing ended state', common.mustCall());
writable.end(common.mustCall());
writable.on('finish', common.mustCall(() => {
let ticked = false;
writable.end(common.mustCall((err) => {
assert.strictEqual(ticked, true);
assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED');
}));
ticked = true;
}));
30 changes: 13 additions & 17 deletions test/parallel/test-stream-writable-null.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'use strict';
require('../common');
const common = require('../common');
const assert = require('assert');

const stream = require('stream');
Expand All @@ -14,33 +14,29 @@ class MyWritable extends stream.Writable {
}
}

assert.throws(
() => {
const m = new MyWritable({ objectMode: true });
m.write(null, (err) => assert.ok(err));
},
{
{
const m = new MyWritable({ objectMode: true });
m.write(null, (err) => assert.ok(err));
m.on('error', common.expectsError({
code: 'ERR_STREAM_NULL_VALUES',
name: 'TypeError',
message: 'May not write null values to stream'
}
);
}));
}

{ // Should not throw.
const m = new MyWritable({ objectMode: true }).on('error', assert);
m.write(null, assert);
}

assert.throws(
() => {
const m = new MyWritable();
m.write(false, (err) => assert.ok(err));
},
{
{
const m = new MyWritable();
m.write(false, (err) => assert.ok(err));
m.on('error', common.expectsError({
code: 'ERR_INVALID_ARG_TYPE',
name: 'TypeError'
}
);
}));
}

{ // Should not throw.
const m = new MyWritable().on('error', assert);
Expand Down
Loading

0 comments on commit 75b30c6

Please sign in to comment.