Skip to content

Commit

Permalink
stream: add destroy and _destroy methods.
Browse files Browse the repository at this point in the history
Adds destroy() and _destroy() methods to Readable, Writable, Duplex
and Transform. It also standardizes the behavior and the implementation
of destroy(), which has been inconsistent in userland and core.
This PR also updates all the subsystems of core to use the new
destroy().

PR-URL: #12925
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Calvin Metcalf <calvin.metcalf@gmail.com>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
  • Loading branch information
mcollina authored and jasnell committed May 28, 2017
1 parent f2e3a67 commit b6e1d22
Show file tree
Hide file tree
Showing 18 changed files with 964 additions and 68 deletions.
48 changes: 47 additions & 1 deletion doc/api/stream.md
Expand Up @@ -499,6 +499,15 @@ write('hello', () => {


A Writable stream in object mode will always ignore the `encoding` argument. A Writable stream in object mode will always ignore the `encoding` argument.


##### writable.destroy([error])
<!-- YAML
added: REPLACEME
-->

Destroy the stream, and emit the passed error. After this call, the
writible stream has ended. Implementors should not override this method,
but instead implement [`writable._destroy`][writable-_destroy].

### Readable Streams ### Readable Streams


Readable streams are an abstraction for a *source* from which data is Readable streams are an abstraction for a *source* from which data is
Expand Down Expand Up @@ -1070,6 +1079,16 @@ myReader.on('readable', () => {
}); });
``` ```


##### readable.destroy([error])
<!-- YAML
added: REPLACEME
-->

Destroy the stream, and emit `'error'`. After this call, the
readable stream will release any internal resources.
Implementors should not override this method, but instead implement
[`readable._destroy`][readable-_destroy].

### Duplex and Transform Streams ### Duplex and Transform Streams


#### Class: stream.Duplex #### Class: stream.Duplex
Expand Down Expand Up @@ -1109,6 +1128,16 @@ Examples of Transform streams include:
* [zlib streams][zlib] * [zlib streams][zlib]
* [crypto streams][crypto] * [crypto streams][crypto]


##### transform.destroy([error])
<!-- YAML
added: REPLACEME
-->

Destroy the stream, and emit `'error'`. After this call, the
transform stream would release any internal resources.
implementors should not override this method, but instead implement
[`readable._destroy`][readable-_destroy].
The default implementation of `_destroy` for `Transform` also emit `'close'`.


## API for Stream Implementers ## API for Stream Implementers


Expand Down Expand Up @@ -1248,6 +1277,8 @@ constructor and implement the `writable._write()` method. The
[`stream._write()`][stream-_write] method. [`stream._write()`][stream-_write] method.
* `writev` {Function} Implementation for the * `writev` {Function} Implementation for the
[`stream._writev()`][stream-_writev] method. [`stream._writev()`][stream-_writev] method.
* `destroy` {Function} Implementation for the
[`stream._destroy()`][writable-_destroy] method.


For example: For example:


Expand Down Expand Up @@ -1358,6 +1389,15 @@ The `writable._writev()` method is prefixed with an underscore because it is
internal to the class that defines it, and should never be called directly by internal to the class that defines it, and should never be called directly by
user programs. user programs.


#### writable.\_destroy(err, callback)
<!-- YAML
added: REPLACEME
-->

* `err` {Error} An error.
* `callback` {Function} A callback function that takes an optional error argument
which is invoked when the writable is destroyed.

#### Errors While Writing #### Errors While Writing


It is recommended that errors occurring during the processing of the It is recommended that errors occurring during the processing of the
Expand Down Expand Up @@ -1428,6 +1468,8 @@ constructor and implement the `readable._read()` method.
a single value instead of a Buffer of size n. Defaults to `false` a single value instead of a Buffer of size n. Defaults to `false`
* `read` {Function} Implementation for the [`stream._read()`][stream-_read] * `read` {Function} Implementation for the [`stream._read()`][stream-_read]
method. method.
* `destroy` {Function} Implementation for the [`stream._destroy()`][readable-_destroy]
method.


For example: For example:


Expand Down Expand Up @@ -2079,4 +2121,8 @@ readable buffer so there is nothing for a user to consume.
[stream-read]: #stream_readable_read_size [stream-read]: #stream_readable_read_size
[stream-resume]: #stream_readable_resume [stream-resume]: #stream_readable_resume
[stream-write]: #stream_writable_write_chunk_encoding_callback [stream-write]: #stream_writable_write_chunk_encoding_callback
[zlib]: zlib.html [readable-_destroy]: #stream_readable_destroy_err_callback
[writable-_destroy]: #stream_writable_destroy_err_callback
[TCP sockets]: net.html#net_class_net_socket
[Transform]: #stream_class_stream_transform
[Writable]: #stream_class_stream_writable
30 changes: 30 additions & 0 deletions lib/_stream_duplex.js
Expand Up @@ -76,3 +76,33 @@ function onend() {
function onEndNT(self) { function onEndNT(self) {
self.end(); self.end();
} }

Object.defineProperty(Duplex.prototype, 'destroyed', {
get() {
if (this._readableState === undefined ||
this._writableState === undefined) {
return false;
}
return this._readableState.destroyed && this._writableState.destroyed;
},
set(value) {
// we ignore the value if the stream
// has not been initialized yet
if (this._readableState === undefined ||
this._writableState === undefined) {
return;
}

// backward compatibility, the user is explicitly
// managing destroyed
this._readableState.destroyed = value;
this._writableState.destroyed = value;
}
});

Duplex.prototype._destroy = function(err, cb) {
this.push(null);
this.end();

process.nextTick(cb, err);
};
40 changes: 38 additions & 2 deletions lib/_stream_readable.js
Expand Up @@ -30,6 +30,7 @@ const Buffer = require('buffer').Buffer;
const util = require('util'); const util = require('util');
const debug = util.debuglog('stream'); const debug = util.debuglog('stream');
const BufferList = require('internal/streams/BufferList'); const BufferList = require('internal/streams/BufferList');
const destroyImpl = require('internal/streams/destroy');
var StringDecoder; var StringDecoder;


util.inherits(Readable, Stream); util.inherits(Readable, Stream);
Expand Down Expand Up @@ -99,6 +100,9 @@ function ReadableState(options, stream) {
this.readableListening = false; this.readableListening = false;
this.resumeScheduled = false; this.resumeScheduled = false;


// has it been destroyed
this.destroyed = false;

// Crypto is kind of old and crusty. Historically, its default string // Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable. // encoding is 'binary' so we have to make this configurable.
// Everything else in the universe uses 'utf8', though. // Everything else in the universe uses 'utf8', though.
Expand Down Expand Up @@ -129,12 +133,44 @@ function Readable(options) {
// legacy // legacy
this.readable = true; this.readable = true;


if (options && typeof options.read === 'function') if (options) {
this._read = options.read; if (typeof options.read === 'function')
this._read = options.read;

if (typeof options.destroy === 'function')
this._destroy = options.destroy;
}


Stream.call(this); Stream.call(this);
} }


Object.defineProperty(Readable.prototype, 'destroyed', {
get() {
if (this._readableState === undefined) {
return false;
}
return this._readableState.destroyed;
},
set(value) {
// we ignore the value if the stream
// has not been initialized yet
if (!this._readableState) {
return;
}

// backward compatibility, the user is explicitly
// managing destroyed
this._readableState.destroyed = value;
}
});

Readable.prototype.destroy = destroyImpl.destroy;
Readable.prototype._undestroy = destroyImpl.undestroy;
Readable.prototype._destroy = function(err, cb) {
this.push(null);
cb(err);
};

// Manually shove something into the read() buffer. // Manually shove something into the read() buffer.
// This returns true if the highWaterMark has not been hit yet, // This returns true if the highWaterMark has not been hit yet,
// similar to how Writable.write() returns true if you should // similar to how Writable.write() returns true if you should
Expand Down
8 changes: 8 additions & 0 deletions lib/_stream_transform.js
Expand Up @@ -196,6 +196,14 @@ Transform.prototype._read = function(n) {
}; };




Transform.prototype._destroy = function(err, cb) {
Duplex.prototype._destroy.call(this, err, (err2) => {
cb(err2);
this.emit('close');
});
};


function done(stream, er, data) { function done(stream, er, data) {
if (er) if (er)
return stream.emit('error', er); return stream.emit('error', er);
Expand Down
34 changes: 34 additions & 0 deletions lib/_stream_writable.js
Expand Up @@ -32,6 +32,7 @@ const util = require('util');
const internalUtil = require('internal/util'); const internalUtil = require('internal/util');
const Stream = require('stream'); const Stream = require('stream');
const Buffer = require('buffer').Buffer; const Buffer = require('buffer').Buffer;
const destroyImpl = require('internal/streams/destroy');


util.inherits(Writable, Stream); util.inherits(Writable, Stream);


Expand Down Expand Up @@ -66,6 +67,9 @@ function WritableState(options, stream) {
// when 'finish' is emitted // when 'finish' is emitted
this.finished = false; this.finished = false;


// has it been destroyed
this.destroyed = false;

// should we decode strings into buffers before passing to _write? // should we decode strings into buffers before passing to _write?
// this is here so that some node-core streams can optimize string // this is here so that some node-core streams can optimize string
// handling at a lower level. // handling at a lower level.
Expand Down Expand Up @@ -192,6 +196,9 @@ function Writable(options) {


if (typeof options.writev === 'function') if (typeof options.writev === 'function')
this._writev = options.writev; this._writev = options.writev;

if (typeof options.destroy === 'function')
this._destroy = options.destroy;
} }


Stream.call(this); Stream.call(this);
Expand Down Expand Up @@ -563,3 +570,30 @@ function onCorkedFinish(corkReq, state, err) {
state.corkedRequestsFree = corkReq; state.corkedRequestsFree = corkReq;
} }
} }

Object.defineProperty(Writable.prototype, 'destroyed', {
get() {
if (this._writableState === undefined) {
return false;
}
return this._writableState.destroyed;
},
set(value) {
// we ignore the value if the stream
// has not been initialized yet
if (!this._writableState) {
return;
}

// backward compatibility, the user is explicitly
// managing destroyed
this._writableState.destroyed = value;
}
});

Writable.prototype.destroy = destroyImpl.destroy;
Writable.prototype._undestroy = destroyImpl.undestroy;
Writable.prototype._destroy = function(err, cb) {
this.end();
cb(err);
};
11 changes: 5 additions & 6 deletions lib/fs.js
Expand Up @@ -1986,11 +1986,10 @@ ReadStream.prototype._read = function(n) {
}; };




ReadStream.prototype.destroy = function() { ReadStream.prototype._destroy = function(err, cb) {
if (this.destroyed) this.close(function(err2) {
return; cb(err || err2);
this.destroyed = true; });
this.close();
}; };




Expand Down Expand Up @@ -2157,7 +2156,7 @@ WriteStream.prototype._writev = function(data, cb) {
}; };




WriteStream.prototype.destroy = ReadStream.prototype.destroy; WriteStream.prototype._destroy = ReadStream.prototype._destroy;
WriteStream.prototype.close = ReadStream.prototype.close; WriteStream.prototype.close = ReadStream.prototype.close;


// There is no shutdown() for files. // There is no shutdown() for files.
Expand Down
12 changes: 8 additions & 4 deletions lib/internal/process/stdio.js
Expand Up @@ -18,10 +18,12 @@ function setupStdio() {
function getStdout() { function getStdout() {
if (stdout) return stdout; if (stdout) return stdout;
stdout = createWritableStdioStream(1); stdout = createWritableStdioStream(1);
stdout.destroy = stdout.destroySoon = function(er) { stdout.destroySoon = stdout.destroy;
stdout._destroy = function(er, cb) {
// avoid errors if we already emitted
const errors = lazyErrors(); const errors = lazyErrors();
er = er || new errors.Error('ERR_STDOUT_CLOSE'); er = er || new errors.Error('ERR_STDOUT_CLOSE');
stdout.emit('error', er); cb(er);
}; };
if (stdout.isTTY) { if (stdout.isTTY) {
process.on('SIGWINCH', () => stdout._refreshSize()); process.on('SIGWINCH', () => stdout._refreshSize());
Expand All @@ -32,10 +34,12 @@ function setupStdio() {
function getStderr() { function getStderr() {
if (stderr) return stderr; if (stderr) return stderr;
stderr = createWritableStdioStream(2); stderr = createWritableStdioStream(2);
stderr.destroy = stderr.destroySoon = function(er) { stderr.destroySoon = stderr.destroy;
stderr._destroy = function(er, cb) {
// avoid errors if we already emitted
const errors = lazyErrors(); const errors = lazyErrors();
er = er || new errors.Error('ERR_STDERR_CLOSE'); er = er || new errors.Error('ERR_STDERR_CLOSE');
stderr.emit('error', er); cb(er);
}; };
if (stderr.isTTY) { if (stderr.isTTY) {
process.on('SIGWINCH', () => stderr._refreshSize()); process.on('SIGWINCH', () => stderr._refreshSize());
Expand Down
65 changes: 65 additions & 0 deletions lib/internal/streams/destroy.js
@@ -0,0 +1,65 @@
'use strict';

// undocumented cb() API, needed for core, not for public API
function destroy(err, cb) {
const readableDestroyed = this._readableState &&
this._readableState.destroyed;
const writableDestroyed = this._writableState &&
this._writableState.destroyed;

if (readableDestroyed || writableDestroyed) {
if (err && (!this._writableState || !this._writableState.errorEmitted)) {
process.nextTick(emitErrorNT, this, err);
}
return;
}

// we set destroyed to true before firing error callbacks in order
// to make it re-entrance safe in case destroy() is called within callbacks

if (this._readableState) {
this._readableState.destroyed = true;
}

// if this is a duplex stream mark the writable part as destroyed as well
if (this._writableState) {
this._writableState.destroyed = true;
}

this._destroy(err || null, (err) => {
if (!cb && err) {
process.nextTick(emitErrorNT, this, err);
if (this._writableState) {
this._writableState.errorEmitted = true;
}
} else if (cb) {
cb(err);
}
});
}

function undestroy() {
if (this._readableState) {
this._readableState.destroyed = false;
this._readableState.reading = false;
this._readableState.ended = false;
this._readableState.endEmitted = false;
}

if (this._writableState) {
this._writableState.destroyed = false;
this._writableState.ended = false;
this._writableState.ending = false;
this._writableState.finished = false;
this._writableState.errorEmitted = false;
}
}

function emitErrorNT(self, err) {
self.emit('error', err);
}

module.exports = {
destroy,
undestroy
};

0 comments on commit b6e1d22

Please sign in to comment.