From 61b4d60c5d9694e79069b1680b3736c96a5de501 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Tue, 19 Dec 2017 13:33:31 +0100 Subject: [PATCH] stream: added experimental support for for-await MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds support for Symbol.asyncIterator into the Readable class. The stream is destroyed when the loop terminates with break or throw. Fixes: https://github.com/nodejs/node/issues/15709 PR-URL: https://github.com/nodejs/node/pull/17755 Fixes: https://github.com/nodejs/node/issues/15709 Reviewed-By: Benjamin Gruenbaum Reviewed-By: Anatoli Papirovski Reviewed-By: James M Snell Reviewed-By: Vse Mozhet Byt Reviewed-By: Michaƫl Zasso --- doc/api/stream.md | 26 ++ lib/_stream_readable.js | 8 + lib/internal/streams/async_iterator.js | 159 ++++++++++ node.gyp | 1 + .../test-stream-readable-async-iterators.js | 298 ++++++++++++++++++ 5 files changed, 492 insertions(+) create mode 100644 lib/internal/streams/async_iterator.js create mode 100644 test/parallel/test-stream-readable-async-iterators.js diff --git a/doc/api/stream.md b/doc/api/stream.md index 2166ae4eccc5f5..fe7a9a8f3036cb 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1165,6 +1165,31 @@ readable stream will release any internal resources. Implementors should not override this method, but instead implement [`readable._destroy`][readable-_destroy]. +##### readable[@@asyncIterator] + + +> Stability: 1 - Experimental + +Returns an [AsyncIterator][async-iterator] to fully consume the stream. + +```js +async function print(readable) { + readable.setEncoding('utf8'); + let data = ''; + for await (const k of readable) { + data += k; + } + console.log(data); +} + +print(fs.createReadStream('file')).catch(console.log); +``` + +If the loop terminates with a `break` or a `throw`, the stream will be destroyed. +In other terms, iterating over a stream will consume the stream fully. + ### Duplex and Transform Streams #### Class: stream.Duplex @@ -2328,3 +2353,4 @@ contain multi-byte characters. [readable-destroy]: #stream_readable_destroy_error [writable-_destroy]: #stream_writable_destroy_err_callback [writable-destroy]: #stream_writable_destroy_error +[async-iterator]: https://github.com/tc39/proposal-async-iteration diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index eb90c28b64663e..9cf786a15be847 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -32,6 +32,8 @@ const debug = util.debuglog('stream'); const BufferList = require('internal/streams/BufferList'); const destroyImpl = require('internal/streams/destroy'); const errors = require('internal/errors'); +const ReadableAsyncIterator = require('internal/streams/async_iterator'); +const { emitExperimentalWarning } = require('internal/util'); var StringDecoder; util.inherits(Readable, Stream); @@ -922,6 +924,12 @@ Readable.prototype.wrap = function(stream) { return this; }; +Readable.prototype[Symbol.asyncIterator] = function() { + emitExperimentalWarning('Readable[Symbol.asyncIterator]'); + + return new ReadableAsyncIterator(this); +}; + Object.defineProperty(Readable.prototype, 'readableHighWaterMark', { // making it explicit this property is not enumerable // because otherwise some prototype manipulation in diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js new file mode 100644 index 00000000000000..9ca8e5ebe23b15 --- /dev/null +++ b/lib/internal/streams/async_iterator.js @@ -0,0 +1,159 @@ +'use strict'; + +const kLastResolve = Symbol('lastResolve'); +const kLastReject = Symbol('lastReject'); +const kError = Symbol('error'); +const kEnded = Symbol('ended'); +const kLastPromise = Symbol('lastPromise'); +const kHandlePromise = Symbol('handlePromise'); +const kStream = Symbol('stream'); + +const AsyncIteratorRecord = class AsyncIteratorRecord { + constructor(value, done) { + this.done = done; + this.value = value; + } +}; + +function readAndResolve(iter) { + const resolve = iter[kLastResolve]; + if (resolve !== null) { + const data = iter[kStream].read(); + // we defer if data is null + // we can be expecting either 'end' or + // 'error' + if (data !== null) { + iter[kLastPromise] = null; + iter[kLastResolve] = null; + iter[kLastReject] = null; + resolve(new AsyncIteratorRecord(data, false)); + } + } +} + +function onReadable(iter) { + // we wait for the next tick, because it might + // emit an error with process.nextTick + process.nextTick(readAndResolve, iter); +} + +function onEnd(iter) { + const resolve = iter[kLastResolve]; + if (resolve !== null) { + iter[kLastPromise] = null; + iter[kLastResolve] = null; + iter[kLastReject] = null; + resolve(new AsyncIteratorRecord(null, true)); + } + iter[kEnded] = true; +} + +function onError(iter, err) { + const reject = iter[kLastReject]; + // reject if we are waiting for data in the Promise + // returned by next() and store the error + if (reject !== null) { + iter[kLastPromise] = null; + iter[kLastResolve] = null; + iter[kLastReject] = null; + reject(err); + } + iter.error = err; +} + +function wrapForNext(lastPromise, iter) { + return function(resolve, reject) { + lastPromise.then(function() { + iter[kHandlePromise](resolve, reject); + }, reject); + }; +} + +const ReadableAsyncIterator = class ReadableAsyncIterator { + constructor(stream) { + this[kStream] = stream; + this[kLastResolve] = null; + this[kLastReject] = null; + this[kError] = null; + this[kEnded] = false; + this[kLastPromise] = null; + + stream.on('readable', onReadable.bind(null, this)); + stream.on('end', onEnd.bind(null, this)); + stream.on('error', onError.bind(null, this)); + + // the function passed to new Promise + // is cached so we avoid allocating a new + // closure at every run + this[kHandlePromise] = (resolve, reject) => { + const data = this[kStream].read(); + if (data) { + this[kLastPromise] = null; + this[kLastResolve] = null; + this[kLastReject] = null; + resolve(new AsyncIteratorRecord(data, false)); + } else { + this[kLastResolve] = resolve; + this[kLastReject] = reject; + } + }; + } + + get stream() { + return this[kStream]; + } + + next() { + // if we have detected an error in the meanwhile + // reject straight away + const error = this[kError]; + if (error !== null) { + return Promise.reject(error); + } + + if (this[kEnded]) { + return Promise.resolve(new AsyncIteratorRecord(null, true)); + } + + // if we have multiple next() calls + // we will wait for the previous Promise to finish + // this logic is optimized to support for await loops, + // where next() is only called once at a time + const lastPromise = this[kLastPromise]; + let promise; + + if (lastPromise) { + promise = new Promise(wrapForNext(lastPromise, this)); + } else { + // fast path needed to support multiple this.push() + // without triggering the next() queue + const data = this[kStream].read(); + if (data !== null) { + return Promise.resolve(new AsyncIteratorRecord(data, false)); + } + + promise = new Promise(this[kHandlePromise]); + } + + this[kLastPromise] = promise; + + return promise; + } + + return() { + // destroy(err, cb) is a private API + // we can guarantee we have that here, because we control the + // Readable class this is attached to + return new Promise((resolve, reject) => { + this[kStream].destroy(null, (err) => { + if (err) { + reject(err); + return; + } + resolve(new AsyncIteratorRecord(null, true)); + }); + }); + } +}; + +module.exports = ReadableAsyncIterator; diff --git a/node.gyp b/node.gyp index cf3dd50776b666..736dc975a354ae 100644 --- a/node.gyp +++ b/node.gyp @@ -138,6 +138,7 @@ 'lib/internal/v8_prof_polyfill.js', 'lib/internal/v8_prof_processor.js', 'lib/internal/streams/lazy_transform.js', + 'lib/internal/streams/async_iterator.js', 'lib/internal/streams/BufferList.js', 'lib/internal/streams/legacy.js', 'lib/internal/streams/destroy.js', diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js new file mode 100644 index 00000000000000..b1801a1db3e580 --- /dev/null +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -0,0 +1,298 @@ +'use strict'; + +const common = require('../common'); +const { Readable } = require('stream'); +const assert = require('assert'); + +common.crashOnUnhandledRejection(); + +async function tests() { + await (async function() { + console.log('read without for..await'); + const max = 5; + const readable = new Readable({ + objectMode: true, + read() {} + }); + + const iter = readable[Symbol.asyncIterator](); + assert.strictEqual(iter.stream, readable); + const values = []; + for (let i = 0; i < max; i++) { + values.push(iter.next()); + } + Promise.all(values).then(common.mustCall((values) => { + values.forEach(common.mustCall( + (item, i) => assert.strictEqual(item.value, 'hello-' + i), 5)); + })); + + readable.push('hello-0'); + readable.push('hello-1'); + readable.push('hello-2'); + readable.push('hello-3'); + readable.push('hello-4'); + readable.push(null); + + const last = await iter.next(); + assert.strictEqual(last.done, true); + })(); + + await (async function() { + console.log('read without for..await deferred'); + const readable = new Readable({ + objectMode: true, + read() {} + }); + + const iter = readable[Symbol.asyncIterator](); + assert.strictEqual(iter.stream, readable); + let values = []; + for (let i = 0; i < 3; i++) { + values.push(iter.next()); + } + + readable.push('hello-0'); + readable.push('hello-1'); + readable.push('hello-2'); + + let k = 0; + const results1 = await Promise.all(values); + results1.forEach(common.mustCall( + (item) => assert.strictEqual(item.value, 'hello-' + k++), 3)); + + values = []; + for (let i = 0; i < 2; i++) { + values.push(iter.next()); + } + + readable.push('hello-3'); + readable.push('hello-4'); + readable.push(null); + + const results2 = await Promise.all(values); + results2.forEach(common.mustCall( + (item) => assert.strictEqual(item.value, 'hello-' + k++), 2)); + + const last = await iter.next(); + assert.strictEqual(last.done, true); + })(); + + await (async function() { + console.log('read without for..await with errors'); + const max = 3; + const readable = new Readable({ + objectMode: true, + read() {} + }); + + const iter = readable[Symbol.asyncIterator](); + assert.strictEqual(iter.stream, readable); + const values = []; + const errors = []; + let i; + for (i = 0; i < max; i++) { + values.push(iter.next()); + } + for (i = 0; i < 2; i++) { + errors.push(iter.next()); + } + + readable.push('hello-0'); + readable.push('hello-1'); + readable.push('hello-2'); + + const resolved = await Promise.all(values); + + resolved.forEach(common.mustCall( + (item, i) => assert.strictEqual(item.value, 'hello-' + i), max)); + + errors.forEach((promise) => { + promise.catch(common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + })); + }); + + readable.destroy(new Error('kaboom')); + })(); + + await (async function() { + console.log('read object mode'); + const max = 42; + let readed = 0; + let received = 0; + const readable = new Readable({ + objectMode: true, + read() { + this.push('hello'); + if (++readed === max) { + this.push(null); + } + } + }); + + for await (const k of readable) { + received++; + assert.strictEqual(k, 'hello'); + } + + assert.strictEqual(readed, received); + })(); + + await (async function() { + console.log('destroy sync'); + const readable = new Readable({ + objectMode: true, + read() { + this.destroy(new Error('kaboom from read')); + } + }); + + let err; + try { + // eslint-disable-next-line no-unused-vars + for await (const k of readable) {} + } catch (e) { + err = e; + } + assert.strictEqual(err.message, 'kaboom from read'); + })(); + + await (async function() { + console.log('destroy async'); + const readable = new Readable({ + objectMode: true, + read() { + if (!this.pushed) { + this.push('hello'); + this.pushed = true; + + setImmediate(() => { + this.destroy(new Error('kaboom')); + }); + } + } + }); + + let received = 0; + + let err = null; + try { + // eslint-disable-next-line no-unused-vars + for await (const k of readable) { + received++; + } + } catch (e) { + err = e; + } + + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(received, 1); + })(); + + await (async function() { + console.log('destroyed by throw'); + const readable = new Readable({ + objectMode: true, + read() { + this.push('hello'); + } + }); + + let err = null; + try { + for await (const k of readable) { + assert.strictEqual(k, 'hello'); + throw new Error('kaboom'); + } + } catch (e) { + err = e; + } + + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(readable.destroyed, true); + })(); + + await (async function() { + console.log('destroyed sync after push'); + const readable = new Readable({ + objectMode: true, + read() { + this.push('hello'); + this.destroy(new Error('kaboom')); + } + }); + + let received = 0; + + let err = null; + try { + for await (const k of readable) { + assert.strictEqual(k, 'hello'); + received++; + } + } catch (e) { + err = e; + } + + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(received, 1); + })(); + + await (async function() { + console.log('push async'); + const max = 42; + let readed = 0; + let received = 0; + const readable = new Readable({ + objectMode: true, + read() { + setImmediate(() => { + this.push('hello'); + if (++readed === max) { + this.push(null); + } + }); + } + }); + + for await (const k of readable) { + received++; + assert.strictEqual(k, 'hello'); + } + + assert.strictEqual(readed, received); + })(); + + await (async function() { + console.log('push binary async'); + const max = 42; + let readed = 0; + const readable = new Readable({ + read() { + setImmediate(() => { + this.push('hello'); + if (++readed === max) { + this.push(null); + } + }); + } + }); + + let expected = ''; + readable.setEncoding('utf8'); + readable.pause(); + readable.on('data', (chunk) => { + expected += chunk; + }); + + let data = ''; + for await (const k of readable) { + data += k; + } + + assert.strictEqual(data, expected); + })(); +} + +// to avoid missing some tests if a promise does not resolve +tests().then(common.mustCall());