Permalink
Browse files

stream: added experimental support for for-await

Adds support for Symbol.asyncIterator into the Readable class.
The stream is destroyed when the loop terminates with break or throw.

Fixes: #15709

PR-URL: #17755
Fixes: #15709
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Vse Mozhet Byt <vsemozhetbyt@gmail.com>
Reviewed-By: Michaël Zasso <targos@protonmail.com>
  • Loading branch information...
mcollina committed Dec 19, 2017
1 parent 4d96c17 commit 61b4d60c5d9694e79069b1680b3736c96a5de501
@@ -1165,6 +1165,31 @@ readable stream will release any internal resources.
Implementors should not override this method, but instead implement
[`readable._destroy`][readable-_destroy].
##### readable[@@asyncIterator]
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
Returns an [AsyncIterator][async-iterator] to fully consume the stream.
```js
async function print(readable) {
readable.setEncoding('utf8');
let data = '';
for await (const k of readable) {
data += k;
}
console.log(data);
}
print(fs.createReadStream('file')).catch(console.log);
```
If the loop terminates with a `break` or a `throw`, the stream will be destroyed.
In other terms, iterating over a stream will consume the stream fully.
### Duplex and Transform Streams
#### Class: stream.Duplex
@@ -2328,3 +2353,4 @@ contain multi-byte characters.
[readable-destroy]: #stream_readable_destroy_error
[writable-_destroy]: #stream_writable_destroy_err_callback
[writable-destroy]: #stream_writable_destroy_error
[async-iterator]: https://github.com/tc39/proposal-async-iteration
@@ -32,6 +32,8 @@ const debug = util.debuglog('stream');
const BufferList = require('internal/streams/BufferList');
const destroyImpl = require('internal/streams/destroy');
const errors = require('internal/errors');
const ReadableAsyncIterator = require('internal/streams/async_iterator');
const { emitExperimentalWarning } = require('internal/util');
var StringDecoder;
util.inherits(Readable, Stream);
@@ -922,6 +924,12 @@ Readable.prototype.wrap = function(stream) {
return this;
};
Readable.prototype[Symbol.asyncIterator] = function() {
emitExperimentalWarning('Readable[Symbol.asyncIterator]');
return new ReadableAsyncIterator(this);
};
Object.defineProperty(Readable.prototype, 'readableHighWaterMark', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
@@ -0,0 +1,159 @@
'use strict';
const kLastResolve = Symbol('lastResolve');
const kLastReject = Symbol('lastReject');
const kError = Symbol('error');
const kEnded = Symbol('ended');
const kLastPromise = Symbol('lastPromise');
const kHandlePromise = Symbol('handlePromise');
const kStream = Symbol('stream');
const AsyncIteratorRecord = class AsyncIteratorRecord {
constructor(value, done) {
this.done = done;
this.value = value;
}
};
function readAndResolve(iter) {
const resolve = iter[kLastResolve];
if (resolve !== null) {
const data = iter[kStream].read();
// we defer if data is null
// we can be expecting either 'end' or
// 'error'
if (data !== null) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
resolve(new AsyncIteratorRecord(data, false));
}
}
}
function onReadable(iter) {
// we wait for the next tick, because it might
// emit an error with process.nextTick
process.nextTick(readAndResolve, iter);
}
function onEnd(iter) {
const resolve = iter[kLastResolve];
if (resolve !== null) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
resolve(new AsyncIteratorRecord(null, true));
}
iter[kEnded] = true;
}
function onError(iter, err) {
const reject = iter[kLastReject];
// reject if we are waiting for data in the Promise
// returned by next() and store the error
if (reject !== null) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
reject(err);
}
iter.error = err;

This comment has been minimized.

@julien-f

julien-f Apr 26, 2018

Contributor

Shoudn't it be iter[kError]?

This comment has been minimized.

@mcollina

mcollina Apr 26, 2018

Member

Very likely! Would you like to send a PR?

This comment has been minimized.

@julien-f

julien-f Apr 26, 2018

Contributor

No problem :)

}
function wrapForNext(lastPromise, iter) {
return function(resolve, reject) {
lastPromise.then(function() {
iter[kHandlePromise](resolve, reject);
}, reject);
};
}
const ReadableAsyncIterator = class ReadableAsyncIterator {
constructor(stream) {
this[kStream] = stream;
this[kLastResolve] = null;
this[kLastReject] = null;
this[kError] = null;
this[kEnded] = false;
this[kLastPromise] = null;
stream.on('readable', onReadable.bind(null, this));
stream.on('end', onEnd.bind(null, this));
stream.on('error', onError.bind(null, this));
// the function passed to new Promise
// is cached so we avoid allocating a new
// closure at every run
this[kHandlePromise] = (resolve, reject) => {
const data = this[kStream].read();
if (data) {
this[kLastPromise] = null;
this[kLastResolve] = null;
this[kLastReject] = null;
resolve(new AsyncIteratorRecord(data, false));
} else {
this[kLastResolve] = resolve;
this[kLastReject] = reject;
}
};
}
get stream() {
return this[kStream];
}
next() {
// if we have detected an error in the meanwhile
// reject straight away
const error = this[kError];
if (error !== null) {
return Promise.reject(error);
}
if (this[kEnded]) {
return Promise.resolve(new AsyncIteratorRecord(null, true));
}
// if we have multiple next() calls
// we will wait for the previous Promise to finish
// this logic is optimized to support for await loops,
// where next() is only called once at a time
const lastPromise = this[kLastPromise];
let promise;
if (lastPromise) {
promise = new Promise(wrapForNext(lastPromise, this));
} else {
// fast path needed to support multiple this.push()
// without triggering the next() queue
const data = this[kStream].read();
if (data !== null) {
return Promise.resolve(new AsyncIteratorRecord(data, false));
}
promise = new Promise(this[kHandlePromise]);
}
this[kLastPromise] = promise;
return promise;
}
return() {
// destroy(err, cb) is a private API
// we can guarantee we have that here, because we control the
// Readable class this is attached to
return new Promise((resolve, reject) => {
this[kStream].destroy(null, (err) => {
if (err) {
reject(err);
return;
}
resolve(new AsyncIteratorRecord(null, true));
});
});
}
};
module.exports = ReadableAsyncIterator;
@@ -138,6 +138,7 @@
'lib/internal/v8_prof_polyfill.js',
'lib/internal/v8_prof_processor.js',
'lib/internal/streams/lazy_transform.js',
'lib/internal/streams/async_iterator.js',
'lib/internal/streams/BufferList.js',
'lib/internal/streams/legacy.js',
'lib/internal/streams/destroy.js',
Oops, something went wrong.

0 comments on commit 61b4d60

Please sign in to comment.