Skip to content

Commit

Permalink
Add @@asyncIterator to ReadableStream
Browse files Browse the repository at this point in the history
Enable using a ReadableStream with the JavaScript "for await" syntax.
  • Loading branch information
MattiasBuelens authored and ricea committed Feb 6, 2019
1 parent 1d3eee8 commit bf2cac8
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 66 deletions.
110 changes: 109 additions & 1 deletion index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ spec:promises-guide; type:dfn;
<pre class="anchors">
urlPrefix: https://tc39.github.io/ecma262/; spec: ECMASCRIPT
text: %Uint8Array%; url: #sec-typedarray-objects; type: constructor
text: %AsyncIteratorPrototype%; url: #sec-asynciteratorprototype; type: interface
text: AsyncIterator; url: #sec-asynciterator-interface; type: interface
text: ArrayBuffer; url: #sec-arraybuffer-objects; type: interface
text: DataView; url: #sec-dataview-objects; type: interface
text: Number; url: #sec-ecmascript-language-types-number-type; type: interface
Expand Down Expand Up @@ -399,11 +401,14 @@ like
get <a href="#rs-locked">locked</a>()

<a href="#rs-cancel">cancel</a>(reason)
<a href="#rs-get-reader">getReader</a>()
<a href="#rs-get-iterator">getIterator</a>({ preventCancel } = {})
<a href="#rs-get-reader">getReader</a>({ mode } = {})
<a href="#rs-pipe-through">pipeThrough</a>({ writable, readable },
{ preventClose, preventAbort, preventCancel, signal } = {})
<a href="#rs-pipe-to">pipeTo</a>(dest, { preventClose, preventAbort, preventCancel, signal } = {})
<a href="#rs-tee">tee</a>()

<a href="#rs-asynciterator">[@@asyncIterator]</a>({ preventCancel } = {})
}
</code></pre>

Expand Down Expand Up @@ -602,6 +607,23 @@ option. If <code><a for="underlying source">type</a></code> is set to <code>unde
1. Return ! ReadableStreamCancel(*this*, _reason_).
</emu-alg>

<h5 id="rs-get-iterator" method for="ReadableStream">getIterator({ <var>preventCancel</var> } = {})</h5>

<div class="note">
The <code>getIterator</code> method returns an async iterator which can be used to consume the stream. The
{{ReadableStreamAsyncIteratorPrototype/return()}} method of this iterator object will, by default,
<a lt="cancel a readable stream">cancel</a> the stream; it will also release the reader.
</div>

<emu-alg>
1. If ! IsReadableStream(*this*) is *false*, throw a *TypeError* exception.
1. Let _reader_ be ? AcquireReadableStreamDefaultReader(*this*).
1. Let _iterator_ be ! ObjectCreate(`<a idl>ReadableStreamAsyncIteratorPrototype</a>`).
1. Set _iterator_.[[asyncIteratorReader]] to _reader_.
1. Set _iterator_.[[preventCancel]] to ! ToBoolean(_preventCancel_).
1. Return _iterator_.
</emu-alg>

<h5 id="rs-get-reader" method for="ReadableStream">getReader({ <var ignore>mode</var> } = {})</h5>

<div class="note">
Expand Down Expand Up @@ -792,6 +814,82 @@ option. If <code><a for="underlying source">type</a></code> is set to <code>unde
</code></pre>
</div>

<!-- Bikeshed doesn't let us mark this up correctly: https://github.com/tabatkins/bikeshed/issues/1344 -->
<h5 id="rs-asynciterator" iterator for="ReadableStream">[@@asyncIterator]({ <var>preventCancel</var> } = {})</h5>

<p class="note">
The <code>@@asyncIterator</code> method is an alias of {{ReadableStream/getIterator()}}.
</p>

The initial value of the <code>@@asyncIterator</code> method is the same function object as the initial value of the
{{ReadableStream/getIterator()}} method.

<h3 id="rs-asynciterator-prototype" interface
lt="ReadableStreamAsyncIteratorPrototype">ReadableStreamAsyncIteratorPrototype</h3>

{{ReadableStreamAsyncIteratorPrototype}} is an ordinary object that is used by {{ReadableStream/getIterator()}} to
construct the objects it returns. Instances of {{ReadableStreamAsyncIteratorPrototype}} implement the {{AsyncIterator}}
abstract interface from the JavaScript specification. [[!ECMASCRIPT]]

The {{ReadableStreamAsyncIteratorPrototype}} object must have its \[[Prototype]] internal slot set to
{{%AsyncIteratorPrototype%}}.

<h4 id="default-reader-asynciterator-prototype-internal-slots">Internal slots</h4>

Objects created by {{ReadableStream/getIterator()}}, using {{ReadableStreamAsyncIteratorPrototype}} as their
prototype, are created with the internal slots described in the following table:

<table>
<thead>
<tr>
<th>Internal Slot</th>
<th>Description (<em>non-normative</em>)</th>
</tr>
</thead>
<tr>
<td>\[[asyncIteratorReader]]
<td class="non-normative">A {{ReadableStreamDefaultReader}} instance
</tr>
<tr>
<td>\[[preventCancel]]
<td class="non-normative">A boolean value indicating if the stream will be <a lt="cancel a readable
stream">canceled</a> when the async iterator's {{ReadableStreamAsyncIteratorPrototype/return()}} method is called
</tr>
</table>

<h4 id="rs-asynciterator-prototype-next" method for="ReadableStreamAsyncIteratorPrototype">next()</h4>

<emu-alg>
1. If ! IsReadableStreamAsyncIterator(*this*) is *false*, return <a>a promise rejected with</a> a *TypeError* exception.
1. Let _reader_ be *this*.[[asyncIteratorReader]].
1. If _reader_.[[ownerReadableStream]] is *undefined*, return <a>a promise rejected with</a> a *TypeError* exception.
1. Return the result of <a>transforming</a> ! ReadableStreamDefaultReaderRead(_reader_) with a fulfillment handler
which takes the argument _result_ and performs the following steps:
1. Assert: Type(_result_) is Object.
1. Let _value_ be ? Get(_result_, `"value"`).
1. Let _done_ be ? Get(_result_, `"done"`).
1. Assert: Type(_done_) is Boolean.
1. If _done_ is *true*, perform ! ReadableStreamReaderGenericRelease(_reader_).
1. Return ! ReadableStreamCreateReadResult(_value_, _done_, *true*).
</emu-alg>

<h4 id="rs-asynciterator-prototype-return" method
for="ReadableStreamAsyncIteratorPrototype">return( <var>value</var> )</h4>

<emu-alg>
1. If ! IsReadableStreamAsyncIterator(*this*) is *false*, return <a>a promise rejected with</a> a *TypeError* exception.
1. Let _reader_ be *this*.[[asyncIteratorReader]].
1. If _reader_.[[ownerReadableStream]] is *undefined*, return <a>a promise rejected with</a> a *TypeError* exception.
1. If _reader_.[[readRequests]] is not empty, return <a>a promise rejected with</a> a *TypeError* exception.
1. If *this*.[[preventCancel]] is *false*, then:
1. Let _result_ be ! ReadableStreamReaderGenericCancel(_reader_, _value_).
1. Perform ! ReadableStreamReaderGenericRelease(_reader_).
1. Return the result of <a>transforming</a> _result_ by a fulfillment handler that returns !
ReadableStreamCreateReadResult(_value_, *true*, *true*).
1. Perform ! ReadableStreamReaderGenericRelease(_reader_).
1. Return <a>a promise resolved with</a> ! ReadableStreamCreateReadResult(_value_, *true*, *true*).
</emu-alg>

<h3 id="rs-abstract-ops">General readable stream abstract operations</h3>

The following abstract operations, unlike most in this specification, are meant to be generally useful by other
Expand Down Expand Up @@ -910,6 +1008,15 @@ readable stream is <a>locked to a reader</a>.
1. Return *true*.
</emu-alg>

<h4 id="is-readable-stream-asynciterator" aoid="IsReadableStreamAsyncIterator" nothrow
export>IsReadableStreamAsyncIterator ( <var>x</var> )</h4>

<emu-alg>
1. If Type(_x_) is not Object, return *false*.
1. If _x_ does not have a [[asyncIteratorReader]] internal slot, return *false*.
1. Return *true*.
</emu-alg>

<h4 id="readable-stream-tee" aoid="ReadableStreamTee" throws export>ReadableStreamTee ( <var>stream</var>,
<var>cloneForBranch2</var> )</h4>

Expand Down Expand Up @@ -5985,6 +6092,7 @@ Forbes Lindesay,
Forrest Norvell,
Gary Blackwood,
Gorgi Kosev,
Gus Caplan,
贺师俊 (hax),
Isaac Schlueter,
isonmad,
Expand Down
10 changes: 9 additions & 1 deletion reference-implementation/.eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,15 @@
"id-blacklist": "off",
"id-length": "off",
"id-match": "off",
"indent": ["error", 2, { "SwitchCase": 1 }],
"indent": ["error", 2, {
"SwitchCase": 1,
"FunctionDeclaration": { "parameters": "first" },
"FunctionExpression": { "parameters": "first" },
"CallExpression": { "arguments": "first" },
"ArrayExpression": "first",
"ObjectExpression": "first",
"ImportDeclaration": "first"
}],
"jsx-quotes": "off",
"key-spacing": ["error", { "beforeColon": false, "afterColon": true, "mode": "strict" }],
"keyword-spacing": ["error", { "before": true, "after": true }],
Expand Down
131 changes: 103 additions & 28 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class ReadableStream {
}
if (IsWritableStream(dest) === false) {
return Promise.reject(
new TypeError('ReadableStream.prototype.pipeTo\'s first argument must be a WritableStream'));
new TypeError('ReadableStream.prototype.pipeTo\'s first argument must be a WritableStream'));
}

preventClose = Boolean(preventClose);
Expand Down Expand Up @@ -158,8 +158,72 @@ class ReadableStream {
const branches = ReadableStreamTee(this, false);
return createArrayFromList(branches);
}

getIterator({ preventCancel = false } = {}) {
if (IsReadableStream(this) === false) {
throw streamBrandCheckException('getIterator');
}
const reader = AcquireReadableStreamDefaultReader(this);
const iterator = Object.create(ReadableStreamAsyncIteratorPrototype);
iterator._asyncIteratorReader = reader;
iterator._preventCancel = Boolean(preventCancel);
return iterator;
}
}

const AsyncIteratorPrototype = Object.getPrototypeOf(Object.getPrototypeOf(async function* () {}).prototype);
const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
next() {
if (IsReadableStreamAsyncIterator(this) === false) {
return Promise.reject(streamAsyncIteratorBrandCheckException('next'));
}
const reader = this._asyncIteratorReader;
if (reader._ownerReadableStream === undefined) {
return Promise.reject(readerLockException('iterate'));
}
return ReadableStreamDefaultReaderRead(reader).then(result => {
assert(typeIsObject(result));
const value = result.value;
const done = result.done;
assert(typeof done === 'boolean');
if (done) {
ReadableStreamReaderGenericRelease(reader);
}
return ReadableStreamCreateReadResult(value, done, true);
});
},

return(value) {
if (IsReadableStreamAsyncIterator(this) === false) {
return Promise.reject(streamAsyncIteratorBrandCheckException('next'));
}
const reader = this._asyncIteratorReader;
if (reader._ownerReadableStream === undefined) {
return Promise.reject(readerLockException('finish iterating'));
}
if (reader._readRequests.length > 0) {
return Promise.reject(new TypeError(
'Tried to release a reader lock when that reader has pending read() calls un-settled'));
}
if (this._preventCancel === false) {
const result = ReadableStreamReaderGenericCancel(reader, value);
ReadableStreamReaderGenericRelease(reader);
return result.then(() => ReadableStreamCreateReadResult(value, true, true));
}
ReadableStreamReaderGenericRelease(reader);
return Promise.resolve(ReadableStreamCreateReadResult(value, true, true));
}
}, AsyncIteratorPrototype);
Object.defineProperty(ReadableStreamAsyncIteratorPrototype, 'next', { enumerable: false });
Object.defineProperty(ReadableStreamAsyncIteratorPrototype, 'return', { enumerable: false });

Object.defineProperty(ReadableStream.prototype, Symbol.asyncIterator, {
value: ReadableStream.prototype.getIterator,
enumerable: false,
writable: true,
configurable: true
});

module.exports = {
CreateReadableByteStream,
CreateReadableStream,
Expand Down Expand Up @@ -194,7 +258,7 @@ function CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm, hi
const controller = Object.create(ReadableStreamDefaultController.prototype);

SetUpReadableStreamDefaultController(
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm
stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm
);

return stream;
Expand Down Expand Up @@ -255,6 +319,18 @@ function IsReadableStreamLocked(stream) {
return true;
}

function IsReadableStreamAsyncIterator(x) {
if (!typeIsObject(x)) {
return false;
}

if (!Object.prototype.hasOwnProperty.call(x, '_asyncIteratorReader')) {
return false;
}

return true;
}

function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventCancel, signal) {
assert(IsReadableStream(source) === true);
assert(IsWritableStream(dest) === true);
Expand Down Expand Up @@ -420,10 +496,9 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC

function doTheRest() {
action().then(
() => finalize(originalIsError, originalError),
newError => finalize(true, newError)
)
.catch(rethrowAssertionErrorRejection);
() => finalize(originalIsError, originalError),
newError => finalize(true, newError)
).catch(rethrowAssertionErrorRejection);
}
}

Expand Down Expand Up @@ -931,12 +1006,12 @@ function ReadableStreamReaderGenericRelease(reader) {

if (reader._ownerReadableStream._state === 'readable') {
defaultReaderClosedPromiseReject(
reader,
new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness'));
reader,
new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness'));
} else {
defaultReaderClosedPromiseResetToRejected(
reader,
new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness'));
reader,
new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness'));
}
reader._closedPromise.catch(() => {});

Expand Down Expand Up @@ -1098,8 +1173,7 @@ function ReadableStreamDefaultControllerCallPullIfNeeded(controller) {
e => {
ReadableStreamDefaultControllerError(controller, e);
}
)
.catch(rethrowAssertionErrorRejection);
).catch(rethrowAssertionErrorRejection);

return undefined;
}
Expand Down Expand Up @@ -1260,8 +1334,7 @@ function SetUpReadableStreamDefaultController(
r => {
ReadableStreamDefaultControllerError(controller, r);
}
)
.catch(rethrowAssertionErrorRejection);
).catch(rethrowAssertionErrorRejection);
}

function SetUpReadableStreamDefaultControllerFromUnderlyingSource(stream, underlyingSource, highWaterMark,
Expand Down Expand Up @@ -1533,8 +1606,7 @@ function ReadableByteStreamControllerCallPullIfNeeded(controller) {
e => {
ReadableByteStreamControllerError(controller, e);
}
)
.catch(rethrowAssertionErrorRejection);
).catch(rethrowAssertionErrorRejection);

return undefined;
}
Expand Down Expand Up @@ -1570,7 +1642,7 @@ function ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescripto
assert(bytesFilled % elementSize === 0);

return new pullIntoDescriptor.ctor(
pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, bytesFilled / elementSize);
pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, bytesFilled / elementSize);
}

function ReadableByteStreamControllerEnqueueChunkToQueue(controller, buffer, byteOffset, byteLength) {
Expand Down Expand Up @@ -1994,19 +2066,18 @@ function SetUpReadableByteStreamController(stream, controller, startAlgorithm, p

const startResult = startAlgorithm();
Promise.resolve(startResult).then(
() => {
controller._started = true;
() => {
controller._started = true;

assert(controller._pulling === false);
assert(controller._pullAgain === false);
assert(controller._pulling === false);
assert(controller._pullAgain === false);

ReadableByteStreamControllerCallPullIfNeeded(controller);
},
r => {
ReadableByteStreamControllerError(controller, r);
}
)
.catch(rethrowAssertionErrorRejection);
ReadableByteStreamControllerCallPullIfNeeded(controller);
},
r => {
ReadableByteStreamControllerError(controller, r);
}
).catch(rethrowAssertionErrorRejection);
}

function SetUpReadableByteStreamControllerFromUnderlyingSource(stream, underlyingByteSource, highWaterMark) {
Expand Down Expand Up @@ -2063,6 +2134,10 @@ function streamBrandCheckException(name) {
return new TypeError(`ReadableStream.prototype.${name} can only be used on a ReadableStream`);
}

function streamAsyncIteratorBrandCheckException(name) {
return new TypeError(`ReadableStreamAsyncIterator.${name} can only be used on a ReadableSteamAsyncIterator`);
}

// Helper functions for the readers.

function readerLockException(name) {
Expand Down
Loading

0 comments on commit bf2cac8

Please sign in to comment.