Skip to content

Commit

Permalink
Add back auto-releasing of readers
Browse files Browse the repository at this point in the history
  • Loading branch information
domenic committed Mar 16, 2015
1 parent 32f0447 commit 076ba75
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 37 deletions.
31 changes: 24 additions & 7 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ A given readable stream only has at most one reader at a time. We say in this ca
<dfn lt="locked to a reader">locked to the reader</dfn>, and that the reader is <dfn lt="active reader">active</dfn>.

A reader also has the capability to <dfn lt="release a read lock">release its read lock</dfn>, which makes it no
longer active. At this point another reader can be acquired at will.
longer active. At this point another reader can be acquired at will. If the stream becomes closed or errored as a
result of the behavior of its <a>underlying source</a> or via <a lt="cancel a readable stream">cancellation</a>, its
reader (if one exists) will automatically release its lock.

<h2 id="rs">Readable Streams</h2>

Expand Down Expand Up @@ -414,6 +416,8 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
This functionality is especially useful for creating abstractions that desire the ability to consume a stream in its
entirety. By getting a reader for the stream, you can ensure nobody else can interleave reads with yours or cancel
the stream, which would interfere with your abstraction.

Note that if a stream becomes closed or errored, any reader it is locked to is automatically released.
</div>

<ol>
Expand Down Expand Up @@ -446,7 +450,8 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
</code></pre>

Note how the first thing it does is obtain a reader, and from then on it uses the reader exclusively. This ensures
that no other consumer can interfere with the stream, either by reading chunks (causing )
that no other consumer can interfere with the stream, either by reading chunks or by
<a lt="cancel a readable stream">canceling</a> the stream.
</div>

<h5 id="rs-pipe-through">pipeThrough({ writable, readable }, options)</h5>
Expand Down Expand Up @@ -607,7 +612,7 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo

<div class="note">
If the reader is <a lt="active reader">active</a>, the <code>cancel</code> method behaves the same as that for the
associated stream.
associated stream. When done, it automatically <a lt="release a read lock">releases the lock</a>.
</div>

<ol>
Expand Down Expand Up @@ -677,9 +682,7 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo
<li> If IsReadableStreamReader(<b>this</b>) is <b>false</b>, throw a <b>TypeError</b> exception.
<li> If <b>this</b>@\[[ownerReadableStream]] is <b>undefined</b>, return <b>undefined</b>.
<li> If <b>this</b>@\[[readRequests]] is not empty, throw a <b>TypeError</b> exception.
<li> Call-with-rethrow CloseReadableStreamReader(<b>this</b>).
<li> Set <b>this</b>@\[[ownerReadableStream]]@\[[reader]] to <b>undefined</b>.
<li> Set <b>this</b>@\[[ownerReadableStream]] to <b>undefined</b>.
<li> Return ReleaseReadableStreamReader(<b>this</b>).
</ol>

<h3 id="rs-abstract-ops">Readable Stream Abstract Operations</h3>
Expand Down Expand Up @@ -744,7 +747,7 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo
<li> Resolve <var>stream</var>@\[[closedPromise]] with <b>undefined</b>.
<li> Set <var>stream</var>@\[[state]] to <code>"closed"</code>.
<li> If IsReadableStreamLocked(<var>stream</var>) is <b>true</b>, return
CloseReadableStreamReader(<var>stream</var>).
ReleaseReadableStreamReader(<var>stream</var>).
<li> Return <b>undefined</b>.
</ol>

Expand Down Expand Up @@ -889,6 +892,20 @@ a variable <var>stream</var>, that performs the following steps:
<li> Return <b>true</b>.
</ol>

<h4 id="release-readable-stream-reader">ReleaseReadableStreamReader ( reader )</h4>

<ol>
<li> Assert: <var>reader</var>@\[[ownerReadableStream]] is not <b>undefined</b>.
<li> Repeat for each <var>readRequestPromise</var> that is an element of <var>reader</var>@\[[readRequests]],
<ol>
<li> Resolve <var>readRequestPromise</var> with CreateIterResultObject(<b>undefined</b>, <b>true</b>).
</ol>
<li> Set <var>reader</var>@\[[readRequests]] to a new empty List.
<li> Set <var>reader</var>@\[[ownerReadableStream]]@\[[reader]] to <b>undefined</b>.
<li> Set <var>reader</var>@\[[ownerReadableStream]] to <b>undefined</b>.
<li> Resolve <var>reader</var>@\[[closedPromise]] with <b>undefined</b>.
</ol>

<h4 id="should-readable-stream-apply-backpressure">ShouldReadableStreamApplyBackpressure ( stream )</h4>

<ol>
Expand Down
29 changes: 14 additions & 15 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ export default class ReadableStream {
}

if (preventCancel === false) {
// cancelling automatically releases the lock (and that doesn't fail, since source is then closed)
reader.cancel(reason);
reader.releaseLock();
rejectFinishedPromise(reason);
} else {
// If we don't cancel, we need to wait for lastRead to finish before we're allowed to release.
Expand Down Expand Up @@ -312,10 +312,7 @@ class ReadableStreamReader {
throw new TypeError('Tried to release a reader lock when that reader has pending read() calls un-settled');
}

CloseReadableStreamReader(this);

this._ownerReadableStream._reader = undefined;
this._ownerReadableStream = undefined;
ReleaseReadableStreamReader(this);
}
}

Expand Down Expand Up @@ -375,21 +372,12 @@ function CloseReadableStream(stream) {
stream._state = 'closed';

if (IsReadableStreamLocked(stream) === true) {
return CloseReadableStreamReader(stream._reader);
return ReleaseReadableStreamReader(stream._reader);
}

return undefined;
}

function CloseReadableStreamReader(reader) {
for (const { _resolve } of reader._readRequests) {
_resolve(CreateIterResultObject(undefined, true));
}
reader._readRequests = [];

reader._closedPromise_resolve(undefined);
}

function CreateReadableStreamCloseFunction(stream) {
return () => {
if (stream._state !== 'readable') {
Expand Down Expand Up @@ -515,6 +503,17 @@ function IsReadableStreamReader(x) {
return true;
}

function ReleaseReadableStreamReader(reader) {
for (const { _resolve } of reader._readRequests) {
_resolve(CreateIterResultObject(undefined, true));
}
reader._readRequests = [];

reader._ownerReadableStream._reader = undefined;
reader._ownerReadableStream = undefined;
reader._closedPromise_resolve(undefined);
}

function ShouldReadableStreamApplyBackpressure(stream) {
const queueSize = GetTotalQueueSize(stream._queue);
let shouldApplyBackpressure = queueSize > 1;
Expand Down
23 changes: 10 additions & 13 deletions reference-implementation/test/readable-stream-reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,20 @@ test('Reading from a reader for an empty stream will wait until a chunk is avail
enqueue('a');
});

test('cancel() on a reader should leave the reader active', t => {
t.plan(4);
test('cancel() on a reader releases the reader before calling through', t => {
t.plan(3);

const passedReason = new Error('it wasn\'t the right time, sorry');
const rs = new ReadableStream({
cancel(reason) {
t.equal(reader.isActive, true, 'reader should still be active when underlying source cancel is called');
t.equal(reason, passedReason, 'the cancellation reason should be passed through to the underlying source');
t.equal(reader.isActive, false, 'reader should be released by the time underlying source cancel is called');
t.equal(reason, passedReason, 'the cancellation reason is passed through to the underlying source');
}
});

const reader = rs.getReader();
reader.cancel(passedReason).then(
() => {
t.pass('reader.cancel() should fulfill');
t.equal(reader.isActive, true, 'the reader should still be active after cancel() fulfills');
},
() => t.pass('reader.cancel() should fulfill'),
e => t.fail('reader.cancel() should not reject')
);
});
Expand All @@ -111,14 +108,14 @@ test('closed should be fulfilled after stream is closed (stream .closed access b
});

rs.closed.then(() => {
t.equal(reader.isActive, true, 'reader is still active when stream closed is fulfilled');
t.equal(reader.isActive, false, 'reader is no longer active when stream closed is fulfilled');
});

const reader = rs.getReader();
doClose();

reader.closed.then(() => {
t.equal(reader.isActive, true, 'reader is still active when reader closed is fulfilled');
t.equal(reader.isActive, false, 'reader is no longer active when reader closed is fulfilled');
});
});

Expand All @@ -136,7 +133,7 @@ test('closed should be fulfilled after reader releases its lock (multiple stream

rs.closed.then(() => {
t.equal(reader1.isActive, false, 'reader1 is no longer active when stream closed is fulfilled');
t.equal(reader2.isActive, true, 'reader2 is still active when stream closed is fulfilled');
t.equal(reader2.isActive, false, 'reader2 is no longer active when stream closed is fulfilled');
});

reader1.releaseLock();
Expand All @@ -146,12 +143,12 @@ test('closed should be fulfilled after reader releases its lock (multiple stream

reader1.closed.then(() => {
t.equal(reader1.isActive, false, 'reader1 is no longer active when reader1 closed is fulfilled');
t.equal(reader2.isActive, true, 'reader2 is active when reader1 closed is fulfilled');
t.equal(reader2.isActive, false, 'reader2 is no longer active when reader1 closed is fulfilled');
});

reader2.closed.then(() => {
t.equal(reader1.isActive, false, 'reader1 is no longer active when reader2 closed is fulfilled');
t.equal(reader2.isActive, true, 'reader2 is still active when reader2 closed is fulfilled');
t.equal(reader2.isActive, false, 'reader2 is no longer active when reader2 closed is fulfilled');
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,13 @@ export default (label, factory) => {
stream.closed.then(() => t.fail('stream.closed got after release should not fulfill'));
});

test('canceling via the reader should leave the reader active', t => {
test('canceling via the reader should cause the reader to become inactive', t => {
t.plan(3);
const { reader } = factory();

t.equal(reader.isActive, true, 'the reader should be active before releasing it');
reader.cancel();
t.equal(reader.isActive, true, 'the reader should still be active');
t.equal(reader.isActive, false, 'the reader should no longer be active');
reader.read().then(r => t.deepEqual(r, { value: undefined, done: true },
'read()ing from the reader should give a done result'))
});
Expand Down

0 comments on commit 076ba75

Please sign in to comment.