Skip to content

Commit

Permalink
Fix bugs in reader releasing algorithm
Browse files Browse the repository at this point in the history
As illustrated by the new tests, there were previously issues around auto-releasing not always giving the right result for errored or closed streams, especially with regard to allowing future readers to be acquired. This commit simplifies the logic and ensures consistency across all readers obtained from errored or closed streams.
  • Loading branch information
domenic committed Mar 16, 2015
1 parent f4ac4d5 commit 0aabdda
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 44 deletions.
62 changes: 40 additions & 22 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,17 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo
<tr>
<td>\[[readRequests]]
<td>A List of promises returned by calls to the reader's <code>read()</code> method that have not yet been resolved,
due to the <a>consumer</a> requesting <a>chunks</a> sooner than they are available.
due to the <a>consumer</a> requesting <a>chunks</a> sooner than they are available
</tr>
<tr>
<td>\[[state]]
<td>A string containing the reader's current state, used internally; one of <code>"readable"</code>,
<code>"closed"</code>, or <code>"errored"</code>
</tr>
<tr>
<td>\[[storedError]]
<td>A value indicating how the reader's stream failed, to be given as a failure reason or exception when trying to
operate on the reader; applicable only when \[[state]] is <code>"errored"</code>
</tr>
</table>

Expand All @@ -556,12 +566,12 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo
<li> If IsReadableStreamLocked(<var>stream</var>) is <b>true</b>, throw a <b>TypeError</b> exception.
<li> Set <var>stream</var>@\[[reader]] to <b>this</b>.
<li> Set <b>this</b>@\[[ownerReadableStream]] to <var>stream</var>.
<li> Set <b>this</b>@\[[state]] to <code>"readable"</code>.
<li> Set <b>this</b>@\[[storedError]] to <b>undefined</b>.
<li> Set <b>this</b>@\[[readRequests]] to a new empty List.
<li> Set <b>this</b>@\[[closedPromise]] to a new promise.
<li> If <var>stream</var>@\[[state]] is <code>"closed"</code>, resolve <b>this</b>@\[[closedPromise]] with
<b>undefined</b>.
<li> If <var>stream</var>@\[[state]] is <code>"errored"</code>, reject <b>this</b>@\[[closedPromise]] with
<var>stream</bar>@\[[storedError]].
<li> If <var>stream</var>@\[[state]] is <code>"closed"</code> or <code>"errored"</code>, call-with-rethrow
ReleaseReadableStreamReader(<b>this</b>).
</ol>

<h4 id="reader-prototype">Properties of the <code>ReadableStreamReader</code> Prototype</h4>
Expand Down Expand Up @@ -614,11 +624,12 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo

<ol>
<li> If IsReadableStreamReader(<b>this</b>) is <b>false</b>, throw a <b>TypeError</b> exception.
<li> If <b>this</b>@\[[ownerReadableStream]] is <b>undefined</b> or
<b>this</b>@\[[ownerReadableStream]]@\[[state]] is <code>"closed"</code>, return a new promise resolved with
<li> If <b>this</b>@\[[state]] is <code>"closed"</code>, return a new promise resolved with
CreateIterResultObject(<b>undefined</b>, <b>true</b>).
<li> If <b>this</b>@\[[ownerReadableStream]]@\[[state]] is <code>"errored"</code>, return a new promise
rejected with <b>this</b>@\[[ownerReadableStream]]@\[[storedError]].
<li> If <b>this</b>@\[[state]] is <code>"errored"</code>, return a new promise rejected with
<b>this</b>@\[[storedError]].
<li> Assert: <b>this</b>@\[[ownerReadableStream]] is not <b>undefined</b>.
<li> Assert: <b>this</b>@\[[ownerReadableStream]]@\[[state]] is <code>"readable"</code>.
<li> If <b>this</b>@\[[ownerReadableStream]]@\[[queue]] is not empty,
<ol>
<li> Let <var>chunk</var> be DequeueValue(<b>this</b>@\[[ownerReadableStream]]@\[[queue]]).
Expand Down Expand Up @@ -820,16 +831,8 @@ a variable <var>stream</var>, that performs the following steps:
<li> Let <var>stream</var>@\[[queue]] be a new empty List.
<li> Set <var>stream</var>@\[[storedError]] to <var>e</var>.
<li> Set <var>stream</var>@\[[state]] to <code>"errored"</code>.
<li> If IsReadableStreamLocked(<var>stream</var>) is <b>true</b>,
<ol>
<li> Reject <var>stream</var>@\[[reader]]@\[[closedPromise]] with <var>e</var>.
<li> Repeat for each <var>readRequestPromise</var> that is an element of
<var>stream</var>@\[[reader]]@\[[readRequests]],
<ol>
<li> Reject <var>readRequestPromise</var> with <var>e</var>.
</ol>
<li> Set <var>stream</var>@\[[reader]]@\[[readRequests]] to a new empty List.
</ol>
<li> If IsReadableStreamLocked(<var>stream</var>) is <b>true</b>, return
ReleaseReadableStreamReader(<var>stream</var>@\[[reader]]).
</ol>

<h4 id="is-readable-stream">IsReadableStream ( x )</h4>
Expand Down Expand Up @@ -865,14 +868,29 @@ a variable <var>stream</var>, that performs the following steps:

<ol>
<li> Assert: <var>reader</var>@\[[ownerReadableStream]] is not <b>undefined</b>.
<li> Repeat for each <var>readRequestPromise</var> that is an element of <var>reader</var>@\[[readRequests]],
<li> If <var>reader</var>@\[[ownerReadableStream]]@\[[state]] is <code>"errored"</code>,
<ol>
<li> Resolve <var>readRequestPromise</var> with CreateIterResultObject(<b>undefined</b>, <b>true</b>).
<li> Set <var>reader</var>@\[[state]] to <code>"errored"</code>.
<li> Let <var>e</var> be <var>reader</var>@\[[ownerReadableStream]]@\[[storedError]].
<li> Set <var>reader</var>@\[[storedError]] to <var>e</var>.
<li> Reject <var>reader</var>@\[[closedPromise]] with <var>e</var>.
<li> Repeat for each <var>readRequestPromise</var> that is an element of <var>reader</var>@\[[readRequests]],
<ol>
<li> Reject <var>readRequestPromise</var> with <var>e</var>.
</ol>
</ol>
<li> Otherwise,
<ol>
<li> Set <var>reader</var>@\[[state]] to <code>"closed"</code>.
<li> Resolve <var>reader</var>@\[[closedPromise]] with <b>undefined</b>.
<li> Repeat for each <var>readRequestPromise</var> that is an element of <var>reader</var>@\[[readRequests]],
<ol>
<li> Resolve <var>readRequestPromise</var> with CreateIterResultObject(<b>undefined</b>, <b>true</b>).
</ol>
</ol>
<li> Set <var>reader</var>@\[[readRequests]] to a new empty List.
<li> Set <var>reader</var>@\[[ownerReadableStream]]@\[[reader]] to <b>undefined</b>.
<li> Set <var>reader</var>@\[[ownerReadableStream]] to <b>undefined</b>.
<li> Resolve <var>reader</var>@\[[closedPromise]] with <b>undefined</b>.
</ol>

<h4 id="should-readable-stream-apply-backpressure">ShouldReadableStreamApplyBackpressure ( stream )</h4>
Expand Down
52 changes: 32 additions & 20 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ class ReadableStreamReader {

stream._reader = this;
this._ownerReadableStream = stream;
this._state = 'readable';
this._storedError = undefined;

this._readRequests = [];

Expand All @@ -162,12 +164,8 @@ class ReadableStreamReader {
this._closedPromise_reject = reject;
});

if (stream._state === 'closed') {
this._closedPromise_resolve(undefined);
}

if (stream._state === 'errored') {
this._closedPromise_reject(stream._storedError);
if (stream._state === 'closed' || stream._state === 'errored') {
ReleaseReadableStreamReader(this);
}
}

Expand Down Expand Up @@ -199,14 +197,17 @@ class ReadableStreamReader {
new TypeError('ReadableStreamReader.prototype.read can only be used on a ReadableStreamReader'));
}

if (this._ownerReadableStream === undefined || this._ownerReadableStream._state === 'closed') {
if (this._state === 'closed') {
return Promise.resolve(CreateIterResultObject(undefined, true));
}

if (this._ownerReadableStream._state === 'errored') {
return Promise.reject(this._ownerReadableStream._storedError);
if (this._state === 'errored') {
return Promise.reject(this._storedError);
}

assert(this._ownerReadableStream !== undefined);
assert(this._ownerReadableStream._state === 'readable');

if (this._ownerReadableStream._queue.length > 0) {
const chunk = DequeueValue(this._ownerReadableStream._queue);

Expand Down Expand Up @@ -242,7 +243,7 @@ class ReadableStreamReader {
throw new TypeError('Tried to release a reader lock when that reader has pending read() calls un-settled');
}

ReleaseReadableStreamReader(this);
return ReleaseReadableStreamReader(this);
}
}

Expand Down Expand Up @@ -387,12 +388,7 @@ function CreateReadableStreamErrorFunction(stream) {
stream._state = 'errored';

if (IsReadableStreamLocked(stream) === true) {
stream._reader._closedPromise_reject(e);

for (const { _reject } of stream._reader._readRequests) {
_reject(e);
}
stream._reader._readRequests = [];
return ReleaseReadableStreamReader(stream._reader);
}
};
}
Expand Down Expand Up @@ -432,14 +428,30 @@ function IsReadableStreamReader(x) {
}

function ReleaseReadableStreamReader(reader) {
for (const { _resolve } of reader._readRequests) {
_resolve(CreateIterResultObject(undefined, true));
assert(reader._ownerReadableStream !== undefined);

if (reader._ownerReadableStream._state === 'errored') {
reader._state = 'errored';

const e = reader._ownerReadableStream._storedError;
reader._storedError = e;
reader._closedPromise_reject(e);

for (const { _reject } of reader._readRequests) {
_reject(e);
}
} else {
reader._state = 'closed';
reader._closedPromise_resolve(undefined);

for (const { _resolve } of reader._readRequests) {
_resolve(CreateIterResultObject(undefined, true));
}
}
reader._readRequests = [];

reader._readRequests = [];
reader._ownerReadableStream._reader = undefined;
reader._ownerReadableStream = undefined;
reader._closedPromise_resolve(undefined);
}

function ShouldReadableStreamApplyBackpressure(stream) {
Expand Down
34 changes: 34 additions & 0 deletions reference-implementation/test/readable-stream-reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,37 @@ test('cancel() on a released reader is a no-op and does not pass through', t =>

setTimeout(() => t.end(), 50);
});

test('Getting a second reader after erroring the stream should succeed', t => {
t.plan(5);

let doError;
const theError = new Error('bad');
const rs = new ReadableStream({
start(enqueue, close, error) {
doError = error;
}
});

const reader1 = rs.getReader();

reader1.closed.catch(e => {
t.equal(e, theError, 'the first reader closed getter should be rejected with the error');
});

reader1.read().catch(e => {
t.equal(e, theError, 'the first reader read() should be rejected with the error');
});

t.throws(() => rs.getReader(), /TypeError/, 'trying to get another reader before erroring should throw');

doError(theError);

rs.getReader().closed.catch(e => {
t.equal(e, theError, 'the second reader closed getter should be rejected with the error');
});

rs.getReader().read().catch(e => {
t.equal(e, theError, 'the third reader read() should be rejected with the error');
});
});
15 changes: 13 additions & 2 deletions reference-implementation/test/templated/readable-stream-closed.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,21 @@ export default (label, factory) => {
startPromise.then(() => {
t.equal(ws.state, 'writable', 'writable stream should start in writable state');

rs.pipeTo(ws).then(() => {
return rs.pipeTo(ws).then(() => {
t.pass('pipeTo promise should be fulfilled');
t.equal(ws.state, 'closed', 'writable stream should become closed');
});
});
})
.catch(e => t.error(e));
});

test('should be able to acquire multiple readers, since they are all auto-released', t => {
const rs = factory();

rs.getReader();

t.doesNotThrow(() => rs.getReader(), 'getting a second reader should not throw');
t.doesNotThrow(() => rs.getReader(), 'getting a third reader should not throw');
t.end();
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,14 @@ export default (label, factory, error) => {
cancelPromise2.catch(e => t.equal(e, error, 'second cancel() call should reject with the error'));
t.notEqual(cancelPromise1, cancelPromise2, 'cancel() calls should return distinct promises');
});

test('should be able to acquire multiple readers, since they are all auto-released', t => {
const rs = factory();

rs.getReader();

t.doesNotThrow(() => rs.getReader(), 'getting a second reader should not throw');
t.doesNotThrow(() => rs.getReader(), 'getting a third reader should not throw');
t.end();
});
};

0 comments on commit 0aabdda

Please sign in to comment.