Skip to content

Commit

Permalink
Tweak semantics of readers, locks, and cancellation
Browse files Browse the repository at this point in the history
See discussion in #297. This commit implements the following changes:

- Allow acquiring readers for closed or errored streams; they simply act closed or errored.
- Stop auto-releasing readers when streams close/error.
- Disallow canceling a stream that is locked to a reader (you should use the reader cancel).
- Piping from a closed or errored stream will close or abort the destination stream, instead of immediately failing the pipe.
  • Loading branch information
domenic committed Mar 16, 2015
1 parent 013f09e commit ec787d9
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 107 deletions.
61 changes: 32 additions & 29 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,14 @@ A queuing strategy is generally associated with a specific type of <a>underlying
A <dfn>readable stream reader</dfn> or simply reader is an object that allows direct reading of <a>chunks</a> from
a <a>readable stream</a>. Without a reader, a <a>consumer</a> can only perform high-level operations on the readable
stream: waiting for the stream to become closed or errored, <a lt="cancel a readable stream">canceling</a> the stream,
or <a>piping</a> the readable stream to a writable stream.
or <a>piping</a> the readable stream to a writable stream. Many of those high-level operations actually use a reader
themselves.

A given readable stream only has at most one reader at a time. We say in this case the stream is
<dfn lt="locked to a reader">locked to the reader</dfn>, and that the reader is <dfn lt="active reader">active</dfn>.

A reader also has the capability to <dfn lt="release a read lock">release its read lock</dfn>, which makes it no
longer active. At this point another reader can be acquired at will. If the stream becomes closed or errored as a
result of the behavior of its <a>underlying source</a>, its reader (if one exists) will automatically release its lock.
longer active. At this point another reader can be acquired at will.

<h2 id="rs">Readable Streams</h2>

Expand Down Expand Up @@ -399,6 +399,7 @@ Instances of <code>ReadableStream</code> are created with the internal slots des

<ol>
<li> If IsReadableStream(<b>this</b>) is <b>false</b>, return a promise rejected with a <b>TypeError</b> exception.
<li> If IsReadableStreamLocked(<b>this</b>) is <b>true</b>, return a promise rejected with a <b>TypeError</b> exception.
<li> Return CancelReadableStream(<b>this</b>, <var>reason</var>).
</ol>

Expand All @@ -407,13 +408,12 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
<div class="note">
The <code>getReader</code> method creates an <a>readable stream reader</a> and
<a lt="locked to a reader">locks</a> the stream to the the new reader. While the stream is locked, no other reader
can be acquired until this one is <a lt="release a read lock">released</a>.
can be acquired until this one is <a lt="release a read lock">released</a>. The returned reader provides the ability
to directly read individual <a>chunks</a> from the stream via the reader's <code>read</code> method.

The returned reader provides the ability to directly read individual <a>chunks</a> from the stream via the reader's
<code>read</code> method. This design ensures that if you control the reader, nobody else can interleave reads with
yours, interfering with your code or observing its side-effects.

Note that when a stream is closed or errors, any reader it is locked to is automatically released.
This functionality is especially useful for creating abstractions that desire the ability to consume a stream in its
entirety. By getting a reader for the stream, you can ensure nobody else can interleave reads with yours or cancel
the stream, which would interfere with your abstraction.
</div>

<ol>
Expand Down Expand Up @@ -445,7 +445,8 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
}
</code></pre>

Note how the first thing it does is obtain a reader, and from then on it uses the reader exclusively.
Note how the first thing it does is obtain a reader, and from then on it uses the reader exclusively. This ensures
that no other consumer can interfere with the stream, either by reading chunks (causing )
</div>

<h5 id="rs-pipe-through">pipeThrough({ writable, readable }, options)</h5>
Expand Down Expand Up @@ -563,13 +564,15 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo

<ol>
<li> If IsReadableStream(<var>stream</var>) is <b>false</b>, throw a <b>TypeError</b> exception.
<li> If <var>stream</var>@\[[state]] is <code>"closed"</code>, throw a <b>TypeError</b> exception.
<li> If <var>stream</var>@\[[state]] is <code>"errored"</code>, throw <var>stream</bar>@\[[storedError]].
<li> If IsReadableStreamLocked(<var>stream</var>) is <b>true</b>, throw a <b>TypeError</b> exception.
<li> Set <var>stream</var>@\[[reader]] to <b>this</b>.
<li> Set <b>this</b>@\[[ownerReadableStream]] to <var>stream</var>.
<li> Set <b>this</b>@\[[readRequests]] to a new empty List.
<li> Set <b>this</b>@\[[closedPromise]] to a new promise.
<li> If <var>stream</var>@\[[state]] is <code>"closed"</code>, resolve <b>this</b>@\[[closedPromise]] with
<b>undefined</b>.
<li> If <var>stream</var>@\[[state]] is <code>"errored"</code>, reject <b>this</b>@\[[closedPromise]] with
<var>stream</bar>@\[[storedError]].
</ol>

<h4 id="reader-prototype">Properties of the <code>ReadableStreamReader</code> Prototype</h4>
Expand Down Expand Up @@ -604,7 +607,7 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo

<div class="note">
If the reader is <a lt="active reader">active</a>, the <code>cancel</code> method behaves the same as that for the
associated stream. When done, it automatically <a lt="release a read lock">releases the lock</a>.
associated stream.
</div>

<ol>
Expand Down Expand Up @@ -674,7 +677,9 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo
<li> If IsReadableStreamReader(<b>this</b>) is <b>false</b>, throw a <b>TypeError</b> exception.
<li> If <b>this</b>@\[[ownerReadableStream]] is <b>undefined</b>, return <b>undefined</b>.
<li> If <b>this</b>@\[[readRequests]] is not empty, throw a <b>TypeError</b> exception.
<li> Return ReleaseReadableStreamReader(<b>this</b>).
<li> Call-with-rethrow CloseReadableStreamReader(<b>this</b>).
<li> Set <b>this</b>@\[[ownerReadableStream]]@\[[reader]] to <b>undefined</b>.
<li> Set <b>this</b>@\[[ownerReadableStream]] to <b>undefined</b>.
</ol>

<h3 id="rs-abstract-ops">Readable Stream Abstract Operations</h3>
Expand Down Expand Up @@ -739,7 +744,19 @@ Instances of <code>ReadableStreamReader</code> are created with the internal slo
<li> Resolve <var>stream</var>@\[[closedPromise]] with <b>undefined</b>.
<li> Set <var>stream</var>@\[[state]] to <code>"closed"</code>.
<li> If IsReadableStreamLocked(<var>stream</var>) is <b>true</b>, return
ReleaseReadableStreamReader(<var>stream</var>).
CloseReadableStreamReader(<var>stream</var>).
<li> Return <b>undefined</b>.
</ol>

<h4 id="close-readable-stream-reader">CloseReadableStreamReader ( reader )</h4>

<ol>
<li> Repeat for each <var>readRequestPromise</var> that is an element of <var>reader</var>@\[[readRequests]],
<ol>
<li> Resolve <var>readRequestPromise</var> with CreateIterResultObject(<b>undefined</b>, <b>true</b>).
</ol>
<li> Set <var>reader</var>@\[[readRequests]] to a new empty List.
<li> Resolve <var>reader</var>@\[[closedPromise]] with <b>undefined</b>.
<li> Return <b>undefined</b>.
</ol>

Expand Down Expand Up @@ -872,20 +889,6 @@ a variable <var>stream</var>, that performs the following steps:
<li> Return <b>true</b>.
</ol>

<h4 id="release-readable-stream-reader">ReleaseReadableStreamReader ( reader )</h4>

<ol>
<li> Assert: <var>reader</var>@\[[ownerReadableStream]] is not <b>undefined</b>.
<li> Repeat for each <var>readRequestPromise</var> that is an element of <var>reader</var>@\[[readRequests]],
<ol>
<li> Resolve <var>readRequestPromise</var> with CreateIterResultObject(<b>undefined</b>, <b>true</b>).
</ol>
<li> Set <var>reader</var>@\[[readRequests]] to a new empty List.
<li> Set <var>reader</var>@\[[ownerReadableStream]]@\[[reader]] to <b>undefined</b>.
<li> Set <var>reader</var>@\[[ownerReadableStream]] to <b>undefined</b>.
<li> Resolve <var>reader</var>@\[[closedPromise]] with <b>undefined</b>.
</ol>

<h4 id="should-readable-stream-apply-backpressure">ShouldReadableStreamApplyBackpressure ( stream )</h4>

<ol>
Expand Down
51 changes: 28 additions & 23 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ export default class ReadableStream {
return Promise.reject(new TypeError('ReadableStream.prototype.cancel can only be used on a ReadableStream'));
}

if (IsReadableStreamLocked(this) === true) {
return Promise.reject(new TypeError('Cannot cancel a stream that already has a reader'));
}

return CancelReadableStream(this, reason);
}

Expand Down Expand Up @@ -107,8 +111,8 @@ export default class ReadableStream {

function cancelSource(reason) {
if (preventCancel === false) {
// cancelling automatically releases the lock (and that doesn't fail, since source is then closed)
source.cancel(reason);
reader.cancel(reason);
reader.releaseLock();
rejectPipeToPromise(reason);
} else {
// If we don't cancel, we need to wait for lastRead to finish before we're allowed to release.
Expand Down Expand Up @@ -177,12 +181,6 @@ class ReadableStreamReader {
if (IsReadableStream(stream) === false) {
throw new TypeError('ReadableStreamReader can only be constructed with a ReadableStream instance');
}
if (stream._state === 'closed') {
throw new TypeError('The stream has already been closed, so a reader cannot be acquired');
}
if (stream._state === 'errored') {
throw stream._storedError;
}
if (IsReadableStreamLocked(stream) === true) {
throw new TypeError('This stream has already been locked for exclusive reading by another reader');
}
Expand All @@ -196,6 +194,14 @@ class ReadableStreamReader {
this._closedPromise_resolve = resolve;
this._closedPromise_reject = reject;
});

if (stream._state === 'closed') {
this._closedPromise_resolve(undefined);
}

if (stream._state === 'errored') {
this._closedPromise_reject(stream._storedError);
}
}

get closed() {
Expand Down Expand Up @@ -277,7 +283,10 @@ class ReadableStreamReader {
throw new TypeError('Tried to release a reader lock when that reader has pending read() calls un-settled');
}

ReleaseReadableStreamReader(this);
CloseReadableStreamReader(this);

this._ownerReadableStream._reader = undefined;
this._ownerReadableStream = undefined;
}
}

Expand Down Expand Up @@ -337,12 +346,21 @@ function CloseReadableStream(stream) {
stream._state = 'closed';

if (IsReadableStreamLocked(stream) === true) {
return ReleaseReadableStreamReader(stream._reader);
return CloseReadableStreamReader(stream._reader);
}

return undefined;
}

function CloseReadableStreamReader(reader) {
for (const { _resolve } of reader._readRequests) {
_resolve(CreateIterResultObject(undefined, true));
}
reader._readRequests = [];

reader._closedPromise_resolve(undefined);
}

function CreateReadableStreamCloseFunction(stream) {
return () => {
if (stream._state !== 'readable') {
Expand Down Expand Up @@ -468,19 +486,6 @@ function IsReadableStreamReader(x) {
return true;
}

function ReleaseReadableStreamReader(reader) {
assert(reader._ownerReadableStream !== undefined);

for (const { _resolve } of reader._readRequests) {
_resolve(CreateIterResultObject(undefined, true));
}
reader._readRequests = [];

reader._ownerReadableStream._reader = undefined;
reader._ownerReadableStream = undefined;
reader._closedPromise_resolve(undefined);
}

function ShouldReadableStreamApplyBackpressure(stream) {
const queueSize = GetTotalQueueSize(stream._queue);
let shouldApplyBackpressure = queueSize > 1;
Expand Down
29 changes: 27 additions & 2 deletions reference-implementation/test/readable-stream-cancel.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ test('ReadableStream cancellation: integration test on an infinite stream derive
}, 50));
}
});
const reader = rs.getReader();

readableStreamToArray(rs).then(
readableStreamToArray(rs, reader).then(
chunks => {
t.equal(cancellationFinished, false, 'it did not wait for the cancellation process to finish before closing');
t.ok(chunks.length > 0, 'at least one chunk should be read');
Expand All @@ -42,7 +43,7 @@ test('ReadableStream cancellation: integration test on an infinite stream derive
);

setTimeout(() => {
rs.cancel().then(() => {
reader.cancel().then(() => {
t.equal(cancellationFinished, true, 'it returns a promise that is fulfilled when the cancellation finishes');
t.end();
})
Expand All @@ -66,6 +67,30 @@ test('ReadableStream cancellation: cancel(reason) should pass through the given
t.end();
});

test('ReadableStream cancellation: cancel() on a locked stream should fail and not call the underlying source cancel',
t => {
t.plan(3);
const rs = new ReadableStream({
start(enqueue, close) {
enqueue('a');
close();
},
cancel() {
t.fail('underlying source cancel() should not have been called');
}
});

const reader = rs.getReader();

rs.cancel().catch(e => t.equal(e.constructor, TypeError, 'cancel() should be rejected with a TypeError'));

reader.read().then(
result => t.deepEqual(result, { value: 'a', done: false }, 'read() should still work after the attempted cancel')
);

rs.closed.then(() => t.pass('closed should fulfill without underlying source cancel ever being called'));
});

test('ReadableStream cancellation: returning a value from the underlying source\'s cancel should not affect the ' +
'fulfillment value of the promise returned by the stream\'s cancel', t => {
t.plan(1);
Expand Down
31 changes: 17 additions & 14 deletions reference-implementation/test/readable-stream-reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@ test('Constructing an ReadableStreamReader directly should fail if the stream is
t.end();
});

test('Constructing an ReadableStreamReader directly should fail if the stream is already closed',
test('Constructing an ReadableStreamReader directly should be OK if the stream is closed',
t => {
const rs = new ReadableStream({
start(enqueue, close) {
close();
}
});

t.throws(() => new ReadableStreamReader(rs), /TypeError/, 'constructing directly should fail');
t.doesNotThrow(() => new ReadableStreamReader(rs), 'constructing directly should not throw');
t.end();
});

test('Constructing an ReadableStreamReader directly should fail if the stream is already errored',
test('Constructing an ReadableStreamReader directly should be OK if the stream is errored',
t => {
const theError = new Error('don\'t say i didn\'t warn ya');
const rs = new ReadableStream({
Expand All @@ -55,7 +55,7 @@ test('Constructing an ReadableStreamReader directly should fail if the stream is
}
});

t.throws(() => new ReadableStreamReader(rs), /don't say i didn't warn ya/, 'getReader() threw the error');
t.doesNotThrow(() => new ReadableStreamReader(rs), 'constructing directly should not throw');
t.end();
});

Expand All @@ -79,20 +79,23 @@ test('Reading from a reader for an empty stream will wait until a chunk is avail
enqueue('a');
});

test('cancel() on a reader releases the reader before calling through', t => {
t.plan(3);
test('cancel() on a reader should leave the reader active', t => {
t.plan(4);

const passedReason = new Error('it wasn\'t the right time, sorry');
const rs = new ReadableStream({
cancel(reason) {
t.equal(reader.isActive, false, 'reader should be released by the time underlying source cancel is called');
t.equal(reason, passedReason, 'the cancellation reason is passed through to the underlying source');
t.equal(reader.isActive, true, 'reader should still be active when underlying source cancel is called');
t.equal(reason, passedReason, 'the cancellation reason should be passed through to the underlying source');
}
});

const reader = rs.getReader();
reader.cancel(passedReason).then(
() => t.pass('reader.cancel() should fulfill'),
() => {
t.pass('reader.cancel() should fulfill');
t.equal(reader.isActive, true, 'the reader should still be active after cancel() fulfills');
},
e => t.fail('reader.cancel() should not reject')
);
});
Expand All @@ -108,14 +111,14 @@ test('closed should be fulfilled after stream is closed (stream .closed access b
});

rs.closed.then(() => {
t.equal(reader.isActive, false, 'reader is no longer active when stream closed is fulfilled');
t.equal(reader.isActive, true, 'reader is still active when stream closed is fulfilled');
});

const reader = rs.getReader();
doClose();

reader.closed.then(() => {
t.equal(reader.isActive, false, 'reader is no longer active when reader closed is fulfilled');
t.equal(reader.isActive, true, 'reader is still active when reader closed is fulfilled');
});
});

Expand All @@ -133,7 +136,7 @@ test('closed should be fulfilled after reader releases its lock (multiple stream

rs.closed.then(() => {
t.equal(reader1.isActive, false, 'reader1 is no longer active when stream closed is fulfilled');
t.equal(reader2.isActive, false, 'reader2 is no longer active when stream closed is fulfilled');
t.equal(reader2.isActive, true, 'reader2 is still active when stream closed is fulfilled');
});

reader1.releaseLock();
Expand All @@ -143,12 +146,12 @@ test('closed should be fulfilled after reader releases its lock (multiple stream

reader1.closed.then(() => {
t.equal(reader1.isActive, false, 'reader1 is no longer active when reader1 closed is fulfilled');
t.equal(reader2.isActive, false, 'reader2 is no longer active when reader1 closed is fulfilled');
t.equal(reader2.isActive, true, 'reader2 is active when reader1 closed is fulfilled');
});

reader2.closed.then(() => {
t.equal(reader1.isActive, false, 'reader1 is no longer active when reader2 closed is fulfilled');
t.equal(reader2.isActive, false, 'reader2 is no longer active when reader2 closed is fulfilled');
t.equal(reader2.isActive, true, 'reader2 is still active when reader2 closed is fulfilled');
});
});

Expand Down
Loading

0 comments on commit ec787d9

Please sign in to comment.