Skip to content

Commit

Permalink
stream: updated streams error handling
Browse files Browse the repository at this point in the history
This improves error handling for streams in a few ways.

1. It ensures that no user defined methods (_read, _write, ...) are run
after .destroy has been called.
2. It introduces an explicit error to tell the user if they are write to
write, etc to the stream after it has been destroyed.
3. It makes streams always emit close as the last thing after they have
been destroyed
4. Changes the default _destroy to not gracefully end streams.

It also updates net, http2, zlib and fs to the new error handling.

PR-URL: #18438
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
mafintosh authored and mcollina committed Mar 6, 2018
1 parent acac0f8 commit 5e3f516
Show file tree
Hide file tree
Showing 18 changed files with 107 additions and 55 deletions.
11 changes: 6 additions & 5 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,12 @@ An unspecified or non-specific system error has occurred within the Node.js
process. The error object will have an `err.info` object property with
additional details.

<a id="ERR_STREAM_DESTROYED"></a>
### ERR_STREAM_DESTROYED

A stream method was called that cannot complete because the stream was
destroyed using `stream.destroy()`.

<a id="ERR_TLS_CERT_ALTNAME_INVALID"></a>
### ERR_TLS_CERT_ALTNAME_INVALID

Expand Down Expand Up @@ -1615,11 +1621,6 @@ The fulfilled value of a linking promise is not a `vm.Module` object.
The current module's status does not allow for this operation. The specific
meaning of the error depends on the specific function.

<a id="ERR_ZLIB_BINDING_CLOSED"></a>
### ERR_ZLIB_BINDING_CLOSED

An attempt was made to use a `zlib` object after it has already been closed.

<a id="ERR_ZLIB_INITIALIZATION_FAILED"></a>
### ERR_ZLIB_INITIALIZATION_FAILED

Expand Down
19 changes: 15 additions & 4 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,10 @@ added: v8.0.0

* Returns: {this}

Destroy the stream, and emit the passed error. After this call, the
writable stream has ended. Implementors should not override this method,
Destroy the stream, and emit the passed `error` and a `close` event.
After this call, the writable stream has ended and subsequent calls
to `write` / `end` will give an `ERR_STREAM_DESTROYED` error.
Implementors should not override this method,
but instead implement [`writable._destroy`][writable-_destroy].

### Readable Streams
Expand Down Expand Up @@ -1167,8 +1169,9 @@ myReader.on('readable', () => {
added: v8.0.0
-->

Destroy the stream, and emit `'error'`. After this call, the
readable stream will release any internal resources.
Destroy the stream, and emit `'error'` and `close`. After this call, the
readable stream will release any internal resources and subsequent calls
to `push` will be ignored.
Implementors should not override this method, but instead implement
[`readable._destroy`][readable-_destroy].

Expand Down Expand Up @@ -1382,6 +1385,12 @@ constructor and implement the `writable._write()` method. The
`writable._writev()` method *may* also be implemented.

#### Constructor: new stream.Writable([options])
<!-- YAML
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/18438
description: Add `emitClose` option to specify if `close` is emitted on destroy
-->

* `options` {Object}
* `highWaterMark` {number} Buffer level when
Expand All @@ -1395,6 +1404,8 @@ constructor and implement the `writable._write()` method. The
it becomes possible to write JavaScript values other than string,
`Buffer` or `Uint8Array` if supported by the stream implementation.
Defaults to `false`
* `emitClose` {boolean} Whether or not the stream should emit `close`
after it has been destroyed. Defaults to `true`
* `write` {Function} Implementation for the
[`stream._write()`][stream-_write] method.
* `writev` {Function} Implementation for the
Expand Down
7 changes: 0 additions & 7 deletions lib/_stream_duplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,3 @@ Object.defineProperty(Duplex.prototype, 'destroyed', {
this._writableState.destroyed = value;
}
});

Duplex.prototype._destroy = function(err, cb) {
this.push(null);
this.end();

process.nextTick(cb, err);
};
6 changes: 5 additions & 1 deletion lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ function ReadableState(options, stream) {
this.readableListening = false;
this.resumeScheduled = false;

// Should close be emitted on destroy. Defaults to true.
this.emitClose = options.emitClose !== false;

// has it been destroyed
this.destroyed = false;

Expand Down Expand Up @@ -177,7 +180,6 @@ Object.defineProperty(Readable.prototype, 'destroyed', {
Readable.prototype.destroy = destroyImpl.destroy;
Readable.prototype._undestroy = destroyImpl.undestroy;
Readable.prototype._destroy = function(err, cb) {
this.push(null);
cb(err);
};

Expand Down Expand Up @@ -236,6 +238,8 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
addChunk(stream, state, chunk, true);
} else if (state.ended) {
stream.emit('error', new errors.Error('ERR_STREAM_PUSH_AFTER_EOF'));
} else if (state.destroyed) {
return false;
} else {
state.reading = false;
if (state.decoder && !encoding) {
Expand Down
3 changes: 1 addition & 2 deletions lib/_stream_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ function Transform(options) {
}

function prefinish() {
if (typeof this._flush === 'function') {
if (typeof this._flush === 'function' && !this._readableState.destroyed) {
this._flush((er, data) => {
done(this, er, data);
});
Expand Down Expand Up @@ -194,7 +194,6 @@ Transform.prototype._read = function(n) {
Transform.prototype._destroy = function(err, cb) {
Duplex.prototype._destroy.call(this, err, (err2) => {
cb(err2);
this.emit('close');
});
};

Expand Down
10 changes: 7 additions & 3 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ function WritableState(options, stream) {
// True if the error was already emitted and should not be thrown again
this.errorEmitted = false;

// Should close be emitted on destroy. Defaults to true.
this.emitClose = options.emitClose !== false;

// count buffered requests
this.bufferedRequestCount = 0;

Expand Down Expand Up @@ -390,7 +393,9 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.writecb = cb;
state.writing = true;
state.sync = true;
if (writev)
if (state.destroyed)
state.onwrite(new errors.Error('ERR_STREAM_DESTROYED', 'write'));
else if (writev)
stream._writev(chunk, state.onwrite);
else
stream._write(chunk, encoding, state.onwrite);
Expand Down Expand Up @@ -604,7 +609,7 @@ function callFinal(stream, state) {
}
function prefinish(stream, state) {
if (!state.prefinished && !state.finalCalled) {
if (typeof stream._final === 'function') {
if (typeof stream._final === 'function' && !state.destroyed) {
state.pendingcb++;
state.finalCalled = true;
process.nextTick(callFinal, stream, state);
Expand Down Expand Up @@ -681,6 +686,5 @@ Object.defineProperty(Writable.prototype, 'destroyed', {
Writable.prototype.destroy = destroyImpl.destroy;
Writable.prototype._undestroy = destroyImpl.undestroy;
Writable.prototype._destroy = function(err, cb) {
this.end();
cb(err);
};
6 changes: 6 additions & 0 deletions lib/fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -1929,6 +1929,9 @@ function ReadStream(path, options) {
if (options.highWaterMark === undefined)
options.highWaterMark = 64 * 1024;

// for backwards compat do not emit close on destroy.
options.emitClose = false;

Readable.call(this, options);

// path will be ignored when fd is specified, so it can be falsy
Expand Down Expand Up @@ -2084,6 +2087,9 @@ function WriteStream(path, options) {

options = copyObject(getOptions(options, {}));

// for backwards compat do not emit close on destroy.
options.emitClose = false;

Writable.call(this, options);

// path will be ignored when fd is specified, so it can be falsy
Expand Down
2 changes: 1 addition & 1 deletion lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,7 @@ E('ERR_SOCKET_DGRAM_NOT_RUNNING', 'Not running', Error);
E('ERR_STDERR_CLOSE', 'process.stderr cannot be closed', Error);
E('ERR_STDOUT_CLOSE', 'process.stdout cannot be closed', Error);
E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error);
E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed');
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);
E('ERR_STREAM_READ_NOT_IMPLEMENTED', '_read() is not implemented', Error);
Expand Down Expand Up @@ -908,7 +909,6 @@ E('ERR_VM_MODULE_NOT_LINKED',
E('ERR_VM_MODULE_NOT_MODULE',
'Provided module is not an instance of Module', Error);
E('ERR_VM_MODULE_STATUS', 'Module status %s', Error);
E('ERR_ZLIB_BINDING_CLOSED', 'zlib binding closed', Error);
E('ERR_ZLIB_INITIALIZATION_FAILED', 'Initialization failed', Error);

function sysError(code, syscall, path, dest,
Expand Down
1 change: 1 addition & 0 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -1475,6 +1475,7 @@ class Http2Stream extends Duplex {
constructor(session, options) {
options.allowHalfOpen = true;
options.decodeStrings = false;
options.emitClose = false;
super(options);
this[async_id_symbol] = -1;

Expand Down
9 changes: 9 additions & 0 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ function destroy(err, cb) {
}

this._destroy(err || null, (err) => {
process.nextTick(emitCloseNT, this);
if (!cb && err) {
process.nextTick(emitErrorNT, this, err);
if (this._writableState) {
Expand All @@ -43,6 +44,14 @@ function destroy(err, cb) {
return this;
}

function emitCloseNT(self) {
if (self._writableState && !self._writableState.emitClose)
return;
if (self._readableState && !self._readableState.emitClose)
return;
self.emit('close');
}

function undestroy() {
if (this._readableState) {
this._readableState.destroyed = false;
Expand Down
5 changes: 5 additions & 0 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ function Socket(options) {
options = { fd: options }; // Legacy interface.
else if (options === undefined)
options = {};
else
options = util._extend({}, options);

// For backwards compat do not emit close on destroy.
options.emitClose = false;

stream.Duplex.call(this, options);

Expand Down
9 changes: 2 additions & 7 deletions lib/zlib.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ const {
ERR_BUFFER_TOO_LARGE,
ERR_INVALID_ARG_TYPE,
ERR_OUT_OF_RANGE,
ERR_ZLIB_BINDING_CLOSED,
ERR_ZLIB_INITIALIZATION_FAILED
} = require('internal/errors').codes;
const Transform = require('_stream_transform');
Expand Down Expand Up @@ -392,7 +391,7 @@ Zlib.prototype.flush = function flush(kind, callback) {

Zlib.prototype.close = function close(callback) {
_close(this, callback);
process.nextTick(emitCloseNT, this);
this.destroy();
};

Zlib.prototype._transform = function _transform(chunk, encoding, cb) {
Expand Down Expand Up @@ -510,7 +509,7 @@ function processChunkSync(self, chunk, flushFlag) {
function processChunk(self, chunk, flushFlag, cb) {
var handle = self._handle;
if (!handle)
return cb(new ERR_ZLIB_BINDING_CLOSED());
assert(false, 'zlib binding closed');

handle.buffer = chunk;
handle.cb = cb;
Expand Down Expand Up @@ -603,10 +602,6 @@ function _close(engine, callback) {
engine._handle = null;
}

function emitCloseNT(self) {
self.emit('close');
}

// generic zlib
// minimal 2-byte header
function Deflate(opts) {
Expand Down
8 changes: 4 additions & 4 deletions test/parallel/test-net-socket-destroy-send.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ server.listen(0, common.mustCall(function() {
// Test destroy returns this, even on multiple calls when it short-circuits.
assert.strictEqual(conn, conn.destroy().destroy());
conn.on('error', common.expectsError({
code: 'ERR_SOCKET_CLOSED',
message: 'Socket is closed',
code: 'ERR_STREAM_DESTROYED',
message: 'Cannot call write after a stream was destroyed',
type: Error
}));

conn.write(Buffer.from('kaboom'), common.expectsError({
code: 'ERR_SOCKET_CLOSED',
message: 'Socket is closed',
code: 'ERR_STREAM_DESTROYED',
message: 'Cannot call write after a stream was destroyed',
type: Error
}));
server.close();
Expand Down
14 changes: 8 additions & 6 deletions test/parallel/test-stream-duplex-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ const { inherits } = require('util');

duplex.resume();

duplex.on('end', common.mustCall());
duplex.on('finish', common.mustCall());
duplex.on('end', common.mustNotCall());
duplex.on('finish', common.mustNotCall());
duplex.on('close', common.mustCall());

duplex.destroy();
assert.strictEqual(duplex.destroyed, true);
Expand All @@ -29,8 +30,8 @@ const { inherits } = require('util');

const expected = new Error('kaboom');

duplex.on('end', common.mustCall());
duplex.on('finish', common.mustCall());
duplex.on('end', common.mustNotCall());
duplex.on('finish', common.mustNotCall());
duplex.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
}));
Expand Down Expand Up @@ -78,6 +79,7 @@ const { inherits } = require('util');

// error is swallowed by the custom _destroy
duplex.on('error', common.mustNotCall('no error event'));
duplex.on('close', common.mustCall());

duplex.destroy(expected);
assert.strictEqual(duplex.destroyed, true);
Expand Down Expand Up @@ -159,8 +161,8 @@ const { inherits } = require('util');
});
duplex.resume();

duplex.on('finish', common.mustCall());
duplex.on('end', common.mustCall());
duplex.on('finish', common.mustNotCall());
duplex.on('end', common.mustNotCall());

duplex.destroy();
assert.strictEqual(duplex.destroyed, true);
Expand Down
Loading

0 comments on commit 5e3f516

Please sign in to comment.