Skip to content
Permalink
Browse files

stream: add readableEnded

Adds a readableEnded property and improved finished compat with possible
stream-like objects.

PR-URL: #28814
Refs: #28813
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
  • Loading branch information...
ronag authored and targos committed Jul 23, 2019
1 parent f42eb01 commit a9f8b62b47588c78651bd0cf34fd1a9078b8d635
@@ -1126,6 +1126,15 @@ added: v12.7.0
Getter for the property `encoding` of a given `Readable` stream. The `encoding`
property can be set using the [`readable.setEncoding()`][] method.

##### readable.readableEnded
<!-- YAML
added: REPLACEME
-->

* {boolean}

Becomes `true` when [`'end'`][] event is emitted.

##### readable.readableHighWaterMark
<!-- YAML
added: v9.3.0
@@ -196,6 +196,16 @@ Object.defineProperty(Readable.prototype, 'destroyed', {
}
});

Object.defineProperty(Readable.prototype, 'readableEnded', {
// Making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get() {
return this._readableState ? this._readableState.endEmitted : false;
}
});

Readable.prototype.destroy = destroyImpl.destroy;
Readable.prototype._undestroy = destroyImpl.undestroy;
Readable.prototype._destroy = function(err, cb) {
@@ -132,7 +132,7 @@ const createReadableStreamAsyncIterator = (stream) => {
[kLastReject]: { value: null, writable: true },
[kError]: { value: null, writable: true },
[kEnded]: {
value: stream._readableState.endEmitted,
value: stream.readableEnded || stream._readableState.endEmitted,
writable: true
},
// The function passed to new Promise is cached so we avoid allocating a new
@@ -42,7 +42,8 @@ function eos(stream, opts, callback) {
if (!readable) callback.call(stream);
};

var readableEnded = stream._readableState && stream._readableState.endEmitted;
var readableEnded = stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);
const onend = () => {
readable = false;
readableEnded = true;
@@ -3,6 +3,7 @@
const common = require('../common');
const { Writable, Readable, Transform, finished } = require('stream');
const assert = require('assert');
const EE = require('events');
const fs = require('fs');
const { promisify } = require('util');

@@ -175,3 +176,11 @@ const { promisify } = require('util');
rs.push(null);
rs.resume();
}

{
const streamLike = new EE();
streamLike.readableEnded = true;
streamLike.readable = true;
finished(streamLike, common.mustCall);
streamLike.emit('close');
}
@@ -0,0 +1,33 @@
'use strict';

const common = require('../common');
const { Readable } = require('stream');
const assert = require('assert');

// basic
{
// Find it on Readable.prototype
assert(Readable.prototype.hasOwnProperty('readableEnded'));
}

// event
{
const readable = new Readable();

readable._read = () => {
// The state ended should start in false.
assert.strictEqual(readable.readableEnded, false);
readable.push('asd');
assert.strictEqual(readable.readableEnded, false);
readable.push(null);
assert.strictEqual(readable.readableEnded, false);
};

readable.on('end', common.mustCall(() => {
assert.strictEqual(readable.readableEnded, true);
}));

readable.on('data', common.mustCall(() => {
assert.strictEqual(readable.readableEnded, false);
}));
}

0 comments on commit a9f8b62

Please sign in to comment.
You can’t perform that action at this time.