Skip to content

Commit

Permalink
First draft reference implementation of ExclusiveStreamReader
Browse files Browse the repository at this point in the history
Currently fails the ReadableByteStream test.
  • Loading branch information
domenic committed Dec 10, 2014
1 parent f40e1ef commit e185eea
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 14 deletions.
99 changes: 99 additions & 0 deletions Locking Design Doc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Locking a Stream for Exclusive Reading

In [#241](https://github.com/whatwg/streams/issues/241) we had a great conversation about the need for being able to "lock" a stream for exclusive use. This would be done implicitly while piping, but could also be useful for building user-facing abstractions, as we'll see below.

What emerged was the idea of a "stream reader," which has most of the readable stream interface, but while it exists you cannot read from the stream except through that reader.

This document represents some formative rationales for the design of the reader concept, approached from the perspective of a developer that uses increasingly complex features of the streams ecosystem.

## Developer usage

### Level 0: no reader usage

If the developer knows nothing about readers, they can continue using the stream just fine.

- `read()`, `state`, and `ready` all behave as they do now if used without `pipeTo`.
- `pipeTo` will cause the following side effects:
- `read()` will throw an informative error
- `state` will return `"ready"` until the pipe completes (successfully or otherwise)
- `ready` will return a promise that remains pending until the pipe completes

### Level 1: using readers directly

The developer might want to create their own abstractions that require exclusive access to the stream. For example, a read-to-end function would probably want to avoid others being able to call `.read()` in the middle.

Example code:

```js
function readAsJson(rs) {
var string = "";
var reader = new ExclusiveStreamReader(rs);

pump();

// These lines would be simpler with `Promise.prototype.finally` (or async functions).
return reader.closed.then(
() => {
reader.releaseLock();
return JSON.parse(string);
},
e => {
reader.releaseLock();
throw e;
}
);

function pump() {
while (reader.state === "readable") {
string += reader.read();
}
if (reader.state === "waiting") {
reader.ready.then(pump);
}
}
}
```

The stream would have the same behaviors after being passed to `readAsJson` that it would have after calling its `pipeTo` method.

The reader should have all of the non-piping-related public interface of the stream. This includes:

- `closed` getter, which is a pass-through
- `state` and `ready` getters, which reveal the "true" state and state transitions of the stream which the stream itself no longer reveals
- `read()` method, which has the same behavior as that of the stream's except that it works while the stream is locked
- `cancel()` method, which first calls `this.releaseLock()` before the pass-through

Additionally, it should be possible to check whether a stream is locked. A few candidate syntaxes:

```js
stream.isLocked
ExclusiveStreamReader.isLocked(stream)
```

Finally it is probably a good idea to be able to tell if a reader is still active/has been released. One of these two maybe:

```js
reader.isReleased
reader.isActive // inverse
```

(For now we have settled on `ExclusiveStreamReader.isLocked(stream)` and `reader.isActive`.)

### Level 2: subclassers of `ReadableStream`

Subclasses of `ReadableStream` should get locking support "for free." The same mechanisms for acquiring and using a lock should work flawlessly. More interestingly, if they wanted to support modifying the behavior of e.g. `read()` (or `state` or `ready` or `closed`), they should only have to override it in one location.

Which location is more friendly? Probably in `ReadableStream`, so that `ExclusiveStreamReader` still works for `ReadableStream` subclasses. Less work.

This means `ExclusiveStreamReader` should delegate to `ReadableStream`, and not the other way around.

### Level 3: custom readable stream implementations?

It is unclear whether this is necessary, but up until now we have a high level of support for anyone who wants to re-implement the entire `ReadableStream` interface with their own specific code. For example, if you implement `state`, `ready`, `closed`, `read()`, and `cancel()`, you can do `myCustomStream.pipeTo = ReadableStream.prototype.pipeTo` and it will continue to work.

If we encourage this kind of thing, we should make it easy for custom readable streams to be lockable as well. That basically means `ExclusiveStreamReader` should not require knowledge of `ReadableStream`'s internal slots.

We can work around this if necessary by passing `ExclusiveStreamReader` any capabilities it needs to manipulate `ReadableStream`'s internal state; then people reimplementing the readable stream interface can do e.g. `new ExclusiveStreamReader(this, { getLock, setLock })` or similar.

This would probably impact API, by switching us to a `rs.getReader()` interface that calls the constructor, instead of a `new ExclusiveStreamReader(stream)` interface.

105 changes: 105 additions & 0 deletions reference-implementation/lib/exclusive-stream-reader.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
var assert = require('assert');

export default class ExclusiveStreamReader {
constructor(stream) {
ensureIsRealStream(stream);

if (stream._reader !== undefined) {
throw new TypeError('This stream has already been locked for exclusive reading by another reader');
}

stream._reader = this;

this._stream = stream;

this._lockReleased = new Promise(resolve => {
this._lockReleased_resolve = resolve;
});
}

get ready() {
ensureStreamReaderIsExclusive(this);

this._stream._reader = undefined;
try {
return this._stream.ready;
} finally {
this._stream._reader = this;
}
}

get state() {
ensureStreamReaderIsExclusive(this);

this._stream._reader = undefined;
try {
return this._stream.state;
} finally {
this._stream._reader = this;
}
}

get closed() {
ensureStreamReaderIsExclusive(this);

return this._stream.closed;
}

get isActive() {
return this._stream !== undefined;
}

read(...args) {
ensureStreamReaderIsExclusive(this);

this._stream._reader = undefined;
try {
return this._stream.read(...args);
} finally {
this._stream._reader = this;
}
}

cancel(reason, ...args) {
ensureStreamReaderIsExclusive(this);

var stream = this._stream;
this.releaseLock();
return stream.cancel(reason, ...args);
}

releaseLock() {
if (this._stream === undefined) {
return;
}

this._stream._reader = undefined;
this._stream = undefined;
this._lockReleased_resolve(undefined);
}

static isLocked(stream) {
ensureIsRealStream(stream);

return stream._reader !== undefined;
}
}

// These do not appear in the spec (thus the lower-case names), since they're one-liners in spec text anyway, but we
// factor them out into helper functions in the reference implementation just for brevity's sake, and to emphasize that
// the error message is the same in all places they're called, and to give us the opportunity to add an assert.

function ensureStreamReaderIsExclusive(reader) {
if (reader._stream === undefined) {
throw new TypeError('This stream reader has released its lock on the stream and can no longer be used');
}

assert(reader._stream._reader === reader,
'If the reader has a [[stream]] then the stream\'s [[reader]] must be this reader');
}

function ensureIsRealStream(stream) {
if (!('_reader' in stream)) {
throw new TypeError('ExclusiveStreamReader can only be used with ReadableStream objects or subclasses');
}
}
21 changes: 20 additions & 1 deletion reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ var assert = require('assert');
import * as helpers from './helpers';
import { DequeueValue, EnqueueValueWithSize, GetTotalQueueSize } from './queue-with-sizes';
import CountQueuingStrategy from './count-queuing-strategy';
import ExclusiveStreamReader from './exclusive-stream-reader';

export default class ReadableStream {
constructor({
Expand Down Expand Up @@ -30,6 +31,7 @@ export default class ReadableStream {
this._started = false;
this._draining = false;
this._pulling = false;
this._reader = undefined;

this._enqueue = CreateReadableStreamEnqueueFunction(this);
this._close = CreateReadableStreamCloseFunction(this);
Expand All @@ -50,6 +52,10 @@ export default class ReadableStream {
}

get state() {
if (this._reader !== undefined) {
return 'waiting';
}

return this._state;
}

Expand Down Expand Up @@ -86,7 +92,7 @@ export default class ReadableStream {
}

pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) {
var source = this;
var source = new ExclusiveStreamReader(this);
preventClose = Boolean(preventClose);
preventAbort = Boolean(preventAbort);
preventCancel = Boolean(preventCancel);
Expand Down Expand Up @@ -137,12 +143,16 @@ export default class ReadableStream {

function cancelSource(reason) {
if (preventCancel === false) {
// implicitly releases the lock
source.cancel(reason);
} else {
source.releaseLock();
}
rejectPipeToPromise(reason);
}

function closeDest() {
source.releaseLock();
if (preventClose === false) {
dest.close().then(resolvePipeToPromise, rejectPipeToPromise);
} else {
Expand All @@ -151,6 +161,7 @@ export default class ReadableStream {
}

function abortDest(reason) {
source.releaseLock();
if (preventAbort === false) {
dest.abort(reason);
}
Expand All @@ -159,6 +170,10 @@ export default class ReadableStream {
}

read() {
if (this._reader !== undefined) {
throw new TypeError('This stream is locked to a single exclusive reader and cannot be read from directly');
}

if (this._state === 'waiting') {
throw new TypeError('no chunks available (yet)');
}
Expand Down Expand Up @@ -190,6 +205,10 @@ export default class ReadableStream {
}

get ready() {
if (this._reader !== undefined) {
return this._reader._lockReleased;
}

return this._readyPromise;
}

Expand Down
Loading

0 comments on commit e185eea

Please sign in to comment.