From 3046648580c4b91602f3376a35acd3f6ea09f644 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 11 Jan 2020 18:01:43 +0100 Subject: [PATCH] stream: implement throw for async iterator PR-URL: https://github.com/nodejs/node/pull/31316 Reviewed-By: Anna Henningsen Reviewed-By: Matteo Collina Reviewed-By: Rich Trott Reviewed-By: Minwoo Jung --- lib/internal/streams/async_iterator.js | 49 +++++++++++-------- .../test-stream-readable-async-iterators.js | 22 +++++++++ 2 files changed, 51 insertions(+), 20 deletions(-) diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index 5eefba517b97d2..6d798ec2ffe21f 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -71,6 +71,31 @@ function wrapForNext(lastPromise, iter) { const AsyncIteratorPrototype = ObjectGetPrototypeOf( ObjectGetPrototypeOf(async function* () {}).prototype); +function finish(self, err) { + return new Promise((resolve, reject) => { + const stream = self[kStream]; + + // TODO(ronag): Remove this check once finished() handles + // already ended and/or destroyed streams. + const ended = stream.destroyed || stream.readableEnded || + (stream._readableState && stream._readableState.endEmitted); + + if (ended) { + resolve(createIterResult(undefined, true)); + return; + } + + finished(stream, (err) => { + if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { + reject(err); + } else { + resolve(createIterResult(undefined, true)); + } + }); + destroy(stream, err); + }); +} + const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({ get stream() { return this[kStream]; @@ -131,27 +156,11 @@ const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({ }, return() { - return new Promise((resolve, reject) => { - const stream = this[kStream]; - - // TODO(ronag): Remove this check once finished() handles - // already ended and/or destroyed streams. - const ended = stream.destroyed || stream.readableEnded || - (stream._readableState && stream._readableState.endEmitted); - if (ended) { - resolve(createIterResult(undefined, true)); - return; - } + return finish(this); + }, - finished(stream, (err) => { - if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { - reject(err); - } else { - resolve(createIterResult(undefined, true)); - } - }); - destroy(stream); - }); + throw(err) { + return finish(this, err); }, }, AsyncIteratorPrototype); diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index b783b5f2ee8440..1da31b45e23ba9 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -282,6 +282,28 @@ async function tests() { assert.strictEqual(received, 1); } + { + // Iterator throw. + + const readable = new Readable({ + objectMode: true, + read() { + this.push('hello'); + } + }); + + readable.on('error', common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + })); + + const it = readable[Symbol.asyncIterator](); + it.throw(new Error('kaboom')).catch(common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + })); + + assert.strictEqual(readable.destroyed, true); + } + { console.log('destroyed by throw'); const readable = new Readable({