Skip to content

Commit

Permalink
stream: add errored and closed props
Browse files Browse the repository at this point in the history
PR-URL: #40696
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
ronag committed Nov 13, 2021
1 parent 3e5a5e8 commit f217025
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 15 deletions.
44 changes: 43 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,16 @@ further errors except from `_destroy()` may be emitted as `'error'`.
Implementors should not override this method,
but instead implement [`writable._destroy()`][writable-_destroy].

##### `writable.closed`

<!-- YAML
added: REPLACEME
-->

* {boolean}

Is `true` after `'close'` has been emitted.

##### `writable.destroyed`

<!-- YAML
Expand Down Expand Up @@ -611,6 +621,17 @@ added:
Number of times [`writable.uncork()`][stream-uncork] needs to be
called in order to fully uncork the stream.

##### `writable.writableErrored`

<!-- YAML
added:
REPLACEME
-->

* {Error}

Returns error if the stream has been destroyed with an error.

##### `writable.writableFinished`

<!-- YAML
Expand Down Expand Up @@ -1080,14 +1101,24 @@ further errors except from `_destroy()` may be emitted as `'error'`.
Implementors should not override this method, but instead implement
[`readable._destroy()`][readable-_destroy].

##### `readable.destroyed`
##### `readable.closed`

<!-- YAML
added: v8.0.0
-->

* {boolean}

Is `true` after `'close'` has been emitted.

##### `readable.destroyed`

<!-- YAML
added: REPLACEME
-->

* {boolean}

Is `true` after [`readable.destroy()`][readable-destroy] has been called.

##### `readable.isPaused()`
Expand Down Expand Up @@ -1346,6 +1377,17 @@ added: v12.9.0

Becomes `true` when [`'end'`][] event is emitted.

##### `readable.readableErrored`

<!-- YAML
added:
REPLACEME
-->

* {Error}

Returns error if the stream has been destroyed with an error.

##### `readable.readableFlowing`

<!-- YAML
Expand Down
6 changes: 0 additions & 6 deletions lib/internal/fs/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,9 @@ const FileHandleOperations = (handle) => {

function close(stream, err, cb) {
if (!stream.fd) {
// TODO(ronag)
// stream.closed = true;
cb(err);
} else {
stream[kFs].close(stream.fd, (er) => {
stream.closed = true;
cb(er || err);
});
stream.fd = null;
Expand Down Expand Up @@ -186,7 +183,6 @@ function ReadStream(path, options) {
this.end = options.end;
this.pos = undefined;
this.bytesRead = 0;
this.closed = false;
this[kIsPerformingIO] = false;

if (this.start !== undefined) {
Expand Down Expand Up @@ -358,10 +354,8 @@ function WriteStream(path, options) {
this.start = options.start;
this.pos = undefined;
this.bytesWritten = 0;
this.closed = false;
this[kIsPerformingIO] = false;


if (this.start !== undefined) {
validateInteger(this.start, 'start', 0);

Expand Down
4 changes: 3 additions & 1 deletion lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ const {
isReadable,
isReadableNodeStream,
isReadableFinished,
isReadableErrored,
isWritable,
isWritableNodeStream,
isWritableFinished,
isWritableErrored,
isNodeStream,
willEmitClose: _willEmitClose,
} = require('internal/streams/utils');
Expand Down Expand Up @@ -110,7 +112,7 @@ function eos(stream, options, callback) {
const onclose = () => {
closed = true;

const errored = wState?.errored || rState?.errored;
const errored = isWritableErrored(stream) || isReadableErrored(stream);

if (errored && typeof errored !== 'boolean') {
return callback.call(stream, errored);
Expand Down
18 changes: 14 additions & 4 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -1239,13 +1239,23 @@ ObjectDefineProperties(Readable.prototype, {
}
},

readableErrored: {
enumerable: false,
get() {
return this._readableState ? this._readableState.errored : null;
}
},

closed: {
get() {
return this._readableState ? this._readableState.closed : false;
}
},

destroyed: {
enumerable: false,
get() {
if (this._readableState === undefined) {
return false;
}
return this._readableState.destroyed;
return this._readableState ? this._readableState.destroyed : false;
},
set(value) {
// We ignore the value if the stream
Expand Down
30 changes: 30 additions & 0 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,39 @@ function isFinished(stream, opts) {
return true;
}

function isWritableErrored(stream) {
if (!isNodeStream(stream)) {
return null;
}

if (stream.writableErrored) {
return stream.writableErrored;
}

return stream._writableState?.errored ?? null;
}

function isReadableErrored(stream) {
if (!isNodeStream(stream)) {
return null;
}

if (stream.readableErrored) {
return stream.readableErrored;
}

return stream._readableState?.errored ?? null;
}

function isClosed(stream) {
if (!isNodeStream(stream)) {
return null;
}

if (typeof stream.closed === 'boolean') {
return stream.closed;
}

const wState = stream._writableState;
const rState = stream._readableState;

Expand Down Expand Up @@ -226,11 +254,13 @@ module.exports = {
isReadableNodeStream,
isReadableEnded,
isReadableFinished,
isReadableErrored,
isNodeStream,
isWritable,
isWritableNodeStream,
isWritableEnded,
isWritableFinished,
isWritableErrored,
isServerRequest,
isServerResponse,
willEmitClose,
Expand Down
15 changes: 14 additions & 1 deletion lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,12 @@ function finish(stream, state) {

ObjectDefineProperties(Writable.prototype, {

closed: {
get() {
return this._writableState ? this._writableState.closed : false;
}
},

destroyed: {
get() {
return this._writableState ? this._writableState.destroyed : false;
Expand Down Expand Up @@ -846,7 +852,14 @@ ObjectDefineProperties(Writable.prototype, {
get() {
return this._writableState && this._writableState.length;
}
}
},

writableErrored: {
enumerable: false,
get() {
return this._writableState ? this._writableState.errored : null;
}
},
});

const destroy = destroyImpl.destroy;
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-fs-read-stream-inherit.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ const rangeFile = fixtures.path('x.txt');
file.on('error', common.mustCall());

process.on('exit', function() {
assert(!file.closed);
assert(file.closed);
assert(file.destroyed);
});
}
2 changes: 1 addition & 1 deletion test/parallel/test-fs-read-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ if (!common.isWindows) {
file.on('error', common.mustCall());

process.on('exit', function() {
assert(!file.closed);
assert(file.closed);
assert(file.destroyed);
});
}
4 changes: 4 additions & 0 deletions test/parallel/test-stream-finished.js
Original file line number Diff line number Diff line change
Expand Up @@ -612,8 +612,10 @@ testClosed((opts) => new Writable({ write() {}, ...opts }));
const w = new Writable();
const _err = new Error();
w.destroy(_err);
assert.strictEqual(w.writableErrored, _err);
finished(w, common.mustCall((err) => {
assert.strictEqual(_err, err);
assert.strictEqual(w.closed, true);
finished(w, common.mustCall((err) => {
assert.strictEqual(_err, err);
}));
Expand All @@ -623,7 +625,9 @@ testClosed((opts) => new Writable({ write() {}, ...opts }));
{
const w = new Writable();
w.destroy();
assert.strictEqual(w.writableErrored, null);
finished(w, common.mustCall((err) => {
assert.strictEqual(w.closed, true);
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
finished(w, common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
Expand Down
2 changes: 2 additions & 0 deletions test/parallel/test-stream-readable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const assert = require('assert');
read.on('close', common.mustCall());

read.destroy();
assert.strictEqual(read.readableErrored, null);
assert.strictEqual(read.destroyed, true);
}

Expand All @@ -31,6 +32,7 @@ const assert = require('assert');
}));

read.destroy(expected);
assert.strictEqual(read.readableErrored, expected);
assert.strictEqual(read.destroyed, true);
}

Expand Down

0 comments on commit f217025

Please sign in to comment.