Skip to content

Commit

Permalink
stream: use callback to properly propagate error
Browse files Browse the repository at this point in the history
The stream will be destroyed upstream through the proper error
flow.

PR-URL: #29179
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
  • Loading branch information
ronag committed Apr 3, 2020
1 parent b9da063 commit 8f86986
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 15 deletions.
15 changes: 9 additions & 6 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ function ReadableState(options, stream, isDuplex) {
// Has it been destroyed
this.destroyed = false;

// Indicates whether the stream has errored.
// Indicates whether the stream has errored. When true no further
// _read calls, 'data' or 'readable' events should occur. This is needed
// since when autoDestroy is disabled we need a way to tell whether the
// stream has failed.
this.errored = false;

// Indicates whether the stream has finished destroying.
Expand Down Expand Up @@ -258,7 +261,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
addChunk(stream, state, chunk, true);
} else if (state.ended) {
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
} else if (state.destroyed) {
} else if (state.destroyed || state.errored) {
return false;
} else {
state.reading = false;
Expand Down Expand Up @@ -453,9 +456,9 @@ Readable.prototype.read = function(n) {
}

// However, if we've ended, then there's no point, if we're already
// reading, then it's unnecessary, and if we're destroyed, then it's
// not allowed.
if (state.ended || state.reading || state.destroyed) {
// reading, then it's unnecessary, and if we're destroyed or errored,
// then it's not allowed.
if (state.ended || state.reading || state.destroyed || state.errored) {
doRead = false;
debug('reading or ended', doRead);
} else if (doRead) {
Expand Down Expand Up @@ -553,7 +556,7 @@ function emitReadable(stream) {
function emitReadable_(stream) {
const state = stream._readableState;
debug('emitReadable_', state.destroyed, state.length, state.ended);
if (!state.destroyed && (state.length || state.ended)) {
if (!state.destroyed && !state.errored && (state.length || state.ended)) {
stream.emit('readable');
state.emittedReadable = false;
}
Expand Down
7 changes: 7 additions & 0 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,13 @@ function onwrite(stream, er) {

if (er) {
state.errored = true;

// In case of duplex streams we need to notify the readable side of the
// error.
if (stream._readableState) {
stream._readableState.errored = true;
}

if (sync) {
process.nextTick(onwriteError, stream, state, er, cb);
} else {
Expand Down
13 changes: 11 additions & 2 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -1995,10 +1995,19 @@ class Http2Stream extends Duplex {

let req;

// writeGeneric does not destroy on error and we cannot enable autoDestroy,
// so make sure to destroy on error.
const callback = (err) => {
if (err) {
this.destroy(err);
}
cb(err);
};

if (writev)
req = writevGeneric(this, data, cb);
req = writevGeneric(this, data, callback);
else
req = writeGeneric(this, data, encoding, cb);
req = writeGeneric(this, data, encoding, callback);

trackWriteState(this, req.bytes);
}
Expand Down
16 changes: 10 additions & 6 deletions lib/internal/stream_base_commons.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,14 @@ function onWriteComplete(status) {
return;
}

// TODO (ronag): This should be moved before if(stream.destroyed)
// in order to avoid swallowing error.
if (status < 0) {
const ex = errnoException(status, 'write', this.error);
stream.destroy(ex, this.callback);
if (typeof this.callback === 'function')
this.callback(ex);
else
stream.destroy(ex);
return;
}

Expand Down Expand Up @@ -134,24 +139,24 @@ function writevGeneric(self, data, cb) {
// Retain chunks
if (err === 0) req._chunks = chunks;

afterWriteDispatched(self, req, err, cb);
afterWriteDispatched(req, err, cb);
return req;
}

function writeGeneric(self, data, encoding, cb) {
const req = createWriteWrap(self[kHandle]);
const err = handleWriteReq(req, data, encoding);

afterWriteDispatched(self, req, err, cb);
afterWriteDispatched(req, err, cb);
return req;
}

function afterWriteDispatched(self, req, err, cb) {
function afterWriteDispatched(req, err, cb) {
req.bytes = streamBaseState[kBytesWritten];
req.async = !!streamBaseState[kLastWriteWasAsync];

if (err !== 0)
return self.destroy(errnoException(err, 'write', req.error), cb);
return cb(errnoException(err, 'write', req.error));

if (!req.async) {
cb();
Expand Down Expand Up @@ -264,7 +269,6 @@ function setStreamTimeout(msecs, callback) {
}

module.exports = {
createWriteWrap,
writevGeneric,
writeGeneric,
onStreamRead,
Expand Down
56 changes: 56 additions & 0 deletions test/parallel/test-net-connect-buffer2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const net = require('net');

const tcp = net.Server(common.mustCall((s) => {
tcp.close();

let buf = '';
s.setEncoding('utf8');
s.on('data', function(d) {
buf += d;
});

s.on('end', common.mustCall(function() {
console.error('SERVER: end', buf);
assert.strictEqual(buf, "L'État, c'est moi");
s.end();
}));
}));

tcp.listen(0, common.mustCall(function() {
const socket = net.Stream({ highWaterMark: 0 });

let connected = false;
assert.strictEqual(socket.pending, true);
socket.connect(this.address().port, common.mustCall(() => connected = true));

assert.strictEqual(socket.pending, true);
assert.strictEqual(socket.connecting, true);
assert.strictEqual(socket.readyState, 'opening');

// Write a string that contains a multi-byte character sequence to test that
// `bytesWritten` is incremented with the # of bytes, not # of characters.
const a = "L'État, c'est ";
const b = 'moi';

// We're still connecting at this point so the datagram is first pushed onto
// the connect queue. Make sure that it's not added to `bytesWritten` again
// when the actual write happens.
const r = socket.write(a, common.mustCall((er) => {
console.error('write cb');
assert.ok(connected);
assert.strictEqual(socket.bytesWritten, Buffer.from(a + b).length);
assert.strictEqual(socket.pending, false);
}));
socket.on('close', common.mustCall(() => {
assert.strictEqual(socket.pending, true);
}));

assert.strictEqual(socket.bytesWritten, Buffer.from(a).length);
assert.strictEqual(r, false);
socket.end(b);

assert.strictEqual(socket.readyState, 'opening');
}));
1 change: 1 addition & 0 deletions test/parallel/test-net-write-arguments.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ assert.throws(() => {
[],
{}
].forEach((value) => {
const socket = net.Stream({ highWaterMark: 0 });
// We need to check the callback since 'error' will only
// be emitted once per instance.
assert.throws(() => {
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-wrap-js-stream-exceptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ process.once('uncaughtException', common.mustCall((err) => {
}));

const socket = new JSStreamWrap(new Duplex({
read: common.mustNotCall(),
read: common.mustCall(),
write: common.mustCall((buffer, data, cb) => {
throw new Error('exception!');
})
Expand Down

0 comments on commit 8f86986

Please sign in to comment.