Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the model for ReadableStream to have async read() #296

Merged
merged 9 commits into from Mar 17, 2015
63 changes: 23 additions & 40 deletions Examples.md
Expand Up @@ -4,36 +4,6 @@ Many examples of using and creating streams are given in-line in the specificati

## Readable Streams

### Getting the Next Piece of Available Data

As another example, this helper function will return a promise for the next available piece of data from a given readable stream. This introduces an artificial delay if there is already data queued, but can provide a convenient interface for simple chunk-by-chunk consumption, as one might do e.g. when streaming database records. It uses an EOF sentinel to signal the end of the stream, and behaves poorly if called twice in parallel without waiting for the previously-returned promise to fulfill.

```js
const EOF = Symbol("ReadableStream getNext EOF");

function getNext(stream) {
if (stream.state === "closed") {
return Promise.resolve(EOF);
}

return stream.ready.then(() => {
if (stream.state === "closed") {
return EOF;
}

// If stream is "errored", this will throw, causing the promise to be rejected.
return stream.read();
});
}

// Usage with proposed ES2016 async/await keywords:
async function processStream(stream) {
while ((const chunk = await getNext(stream)) !== EOF) {
// do something with `chunk`.
}
}
```

### Buffering the Entire Stream Into Memory

This function uses the reading APIs to buffer the entire stream in memory and give a promise for the results, defeating the purpose of streams but educating us while doing so:
Expand All @@ -42,19 +12,17 @@ This function uses the reading APIs to buffer the entire stream in memory and gi
function readableStreamToArray(readable) {
const chunks = [];

pump();
return readable.closed.then(() => chunks);
return pump();

function pump() {
while (readable.state === "readable") {
chunks.push(readable.read());
}

if (readable.state === "waiting") {
readable.ready.then(pump);
}
return readable.read().then(({ value, done }) => {
if (done) {
return chunks;
}

// Otherwise the stream is "closed" or "errored", which will be handled above.
chunks.push(value);
return pump();
});
}
}

Expand All @@ -65,6 +33,21 @@ readableStreamToArray(myStream).then(chunks => {
})
```

We can also write this using the [async function syntax](https://github.com/lukehoban/ecmascript-asyncawait/) proposed for ES2016:

```js
async function readableStreamToArray(readable) {
const chunks = [];

let result;
while (!(result = await readable.read()).done) {
chunks.push(result.value);
}

return chunks;
}
```

## Writable Streams

### Reporting Incremental Progress
Expand Down
92 changes: 0 additions & 92 deletions Locking Design Doc.md

This file was deleted.

35 changes: 27 additions & 8 deletions Requirements.md
Expand Up @@ -30,12 +30,6 @@ For example, Node.js's "streams1" (version 0.6 and below) presented a push-based

The solution is to move the queuing logic into the stream primitive itself, removing the error-prone and easy-to-forget process it forces upon consumers. If you stop there, you end up with a push stream with `pause()` and `resume()` methods that are not advisory, but instead reliably stop the flow of `"data"`, `"end"`, and `"close"` events. However, you can take this further, and use your internal queue to unify both push- and pull-based data sources into a single pull-based streaming API.

### You must not force an asynchronous reading API upon users.

Data is often available synchronously, for example because it has been previously queued into memory, or cached by the OS, or because it is being passed through a synchronous transform stream. Thus, even though the most natural mental model may consist of asynchronously pulling data or waiting for it to arrive, enforcing this upon consumers of the readable stream interface would be a mistake, as it prevents them from accessing the data as fast as possible, and imposes an unnecessary delay for every step along a stream chain.

A better model is to provide synchronous access to already-available data, plus the ability to install a handler to be called asynchronously when more data becomes available. A consumer then consults a state property of the readable stream, which tells them whether synchronously reading will be successful, or whether they should install an asynchronous handler to read later.

## Creating Writable Streams

### You must shield the user from the complexity of queuing sequential writes.
Expand All @@ -48,9 +42,9 @@ Thus it is the duty of a writable stream API to provide an easy interface for wr

This leads to a natural quantification of how "slow" a writable stream is in terms of how full its write queue is. This measure of slowness can be used to propagate backpressure signals to anyone writing data to the writable stream; see below for more details.

### You must not force an asynchronous writing API upon users.
### You should not force an asynchronous writing API upon users.

Again, it is often possible to perform a synchronous write, e.g. to an in-memory data source or to a synchronous transformation. Thus, the user must not be forced to wait for a previous write to complete before continuing to write.
It is often possible to perform a synchronous write, e.g. to an in-memory data source or to a synchronous transformation. Thus, the user should not be forced to wait for a previous write to complete before continuing to write.

This issue is actually solved by properly considering the previous one; they are two facets of the same thing. If you queue sequential writes for the user, they can synchronously write chunks with impunity.

Expand Down Expand Up @@ -192,3 +186,28 @@ This is commonly used for analytics or progress reporting. You wish to observe d

A convenient interface for this is an evented one. However, marrying an evented API to a stream API presents many problems, and was widely considered a huge mistake by the Node.js core team. (For example, there are now two sources of truth about what data stream holds, and since traditional event emitters allow anyone to emit events on them, the second one is unreliable.) A better strategy may be using AOP-style "wrapping" of `read` or `write` calls to notify a separately-managed event emitter.

## Byte Streams

### Byte streams must be naively substitutable wherever a non-byte stream is expected.

A good stream ecosystem will consist of a lot of code that operates on any readable, writable, or transform stream. This code should work on the byte-specialized versions of those constructs as well. This generally means that, for example, a readable byte stream should have an API that is a superset of a simple readable stream.

### You should have a way of "bringing your own buffer" to a readable byte stream.

When reading from a normal readable stream, the stream is responsible for allocating and giving you new objects each time you read. In the context where a readable byte stream is treated just like a readable stream, this will necessarily continue to be the case: each read will allocate a new `ArrayBuffer`, which the consumer will use until it is eventually garbage-collected.

For predictable performance and memory usage, however, it is ideal to allow "bring your own buffer" code, where instead of allocating a new buffer, the byte stream reads directly into a supplied buffer. This would allow e.g. reuse of a single pre-allocated memory region.

Given the substitutability constraint, this API must exist alongside the normal auto-allocating one, so that consumers which know they are dealing with a readable byte stream can use it, but agnostic consumers do not need to.

### You must not allow observable data races.

While reading into a buffer, possibly from another thread, the consumer must not be able to observe the buffer filling up. (That is, `typedArray[0] !== typedArray[0]` should never occur, even if another thread is mutating the beginning of the backing buffer.) The general solution here is detaching the array buffer.

### You must have a way of specifying an upper limit on the number of bytes read from a readable byte stream.

Several use cases demand precise control over the maximum number of bytes read from a byte stream. For example, reading a file header before deciding how to process the rest of the file. For general readable streams this is not really feasible, but for byte streams it is perfectly possible.

### You must allow in-another-thread BYOB underlying source APIs to be wrapped in a readable byte stream.

I discuss this scenario in more detail [in a blog post](https://blog.domenic.me/reading-from-files/). The essential result is that the buffer for BYOB reads must be supplied up front, before any async waiting occurs. This argues in favor of an async read API, as we came to conclude in the (very long) [#253](https://github.com/whatwg/streams/issues/253), culminating in [#296](https://github.com/whatwg/streams/pull/296).