Skip to content

Commit

Permalink
stream: implement throw for async iterator
Browse files Browse the repository at this point in the history
PR-URL: #31316
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
Reviewed-By: Minwoo Jung <nodecorelab@gmail.com>
  • Loading branch information
ronag authored and codebytere committed Feb 17, 2020
1 parent 5a95fa4 commit 3046648
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 20 deletions.
49 changes: 29 additions & 20 deletions lib/internal/streams/async_iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,31 @@ function wrapForNext(lastPromise, iter) {
const AsyncIteratorPrototype = ObjectGetPrototypeOf(
ObjectGetPrototypeOf(async function* () {}).prototype);

function finish(self, err) {
return new Promise((resolve, reject) => {
const stream = self[kStream];

// TODO(ronag): Remove this check once finished() handles
// already ended and/or destroyed streams.
const ended = stream.destroyed || stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);

if (ended) {
resolve(createIterResult(undefined, true));
return;
}

finished(stream, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
reject(err);
} else {
resolve(createIterResult(undefined, true));
}
});
destroy(stream, err);
});
}

const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
get stream() {
return this[kStream];
Expand Down Expand Up @@ -131,27 +156,11 @@ const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
},

return() {
return new Promise((resolve, reject) => {
const stream = this[kStream];

// TODO(ronag): Remove this check once finished() handles
// already ended and/or destroyed streams.
const ended = stream.destroyed || stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);
if (ended) {
resolve(createIterResult(undefined, true));
return;
}
return finish(this);
},

finished(stream, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
reject(err);
} else {
resolve(createIterResult(undefined, true));
}
});
destroy(stream);
});
throw(err) {
return finish(this, err);
},
}, AsyncIteratorPrototype);

Expand Down
22 changes: 22 additions & 0 deletions test/parallel/test-stream-readable-async-iterators.js
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,28 @@ async function tests() {
assert.strictEqual(received, 1);
}

{
// Iterator throw.

const readable = new Readable({
objectMode: true,
read() {
this.push('hello');
}
});

readable.on('error', common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));

const it = readable[Symbol.asyncIterator]();
it.throw(new Error('kaboom')).catch(common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));

assert.strictEqual(readable.destroyed, true);
}

{
console.log('destroyed by throw');
const readable = new Readable({
Expand Down

0 comments on commit 3046648

Please sign in to comment.