Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Commit

Permalink
stream: Simplify flowing, passive data listening
Browse files Browse the repository at this point in the history
Closes #5860

In streams2, there is an "old mode" for compatibility.  Once switched
into this mode, there is no going back.

With this change, there is a "flowing mode" and a "paused mode".  If you
add a data listener, then this will start the flow of data.  However,
hitting the `pause()` method will switch *back* into a non-flowing mode,
where the `read()` method will pull data out.

Every time `read()` returns a data chunk, it also emits a `data` event.
In this way, a passive data listener can be added, and the stream passed
off to some other reader, for use with progress bars and the like.

There is no API change beyond this added flexibility.
  • Loading branch information
isaacs committed Jul 22, 2013
1 parent 5fcd6e4 commit 0f8de5e
Show file tree
Hide file tree
Showing 11 changed files with 433 additions and 201 deletions.
91 changes: 60 additions & 31 deletions doc/api/stream.markdown
Expand Up @@ -104,11 +104,35 @@ Readable stream.
A Readable stream will not start emitting data until you indicate that
you are ready to receive it.

Readable streams have two "modes": a **flowing mode** and a **non-flowing
Readable streams have two "modes": a **flowing mode** and a **paused
mode**. When in flowing mode, data is read from the underlying system
and provided to your program as fast as possible. In non-flowing
mode, you must explicitly call `stream.read()` to get chunks of data
out.
and provided to your program as fast as possible. In paused mode, you
must explicitly call `stream.read()` to get chunks of data out.
Streams start out in paused mode.

**Note**: If no data event handlers are attached, and there are no
[`pipe()`][] destinations, and the stream is switched into flowing
mode, then data will be lost.

You can switch to flowing mode by doing any of the following:

* Adding a [`'data'` event][] handler to listen for data.
* Calling the [`resume()`][] method to explicitly open the flow.
* Calling the [`pipe()`][] method to send the data to a [Writable][].

You can switch back to paused mode by doing either of the following:

* If there are no pipe destinations, by calling the [`pause()`][]
method.
* If there are pipe destinations, by removing any [`'data'` event][]
handlers, and removing all pipe destinations by calling the
[`unpipe()`][] method.

Note that, for backwards compatibility reasons, removing `'data'`
event handlers will **not** automatically pause the stream. Also, if
there are piped destinations, then calling `pause()` will not
guarantee that the stream will *remain* paused once those
destinations drain and ask for more data.

Examples of readable streams include:

Expand Down Expand Up @@ -144,9 +168,9 @@ again when more data is available.

* `chunk` {Buffer | String} The chunk of data.

If you attach a `data` event listener, then it will switch the stream
into flowing mode, and data will be passed to your handler as soon as
it is available.
Attaching a `data` event listener to a stream that has not been
explicitly paused will switch the stream into flowing mode. Data will
then be passed as soon as it is available.

If you just want to get all the data out of the stream as fast as
possible, this is the best way to do so.
Expand Down Expand Up @@ -200,9 +224,9 @@ bytes. If `size` bytes are not available, then it will return `null`.
If you do not specify a `size` argument, then it will return all the
data in the internal buffer.

This method should only be called in non-flowing mode. In
flowing-mode, this method is called automatically until the internal
buffer is drained.
This method should only be called in paused mode. In flowing mode,
this method is called automatically until the internal buffer is
drained.

```javascript
var readable = getReadableStreamSomehow();
Expand All @@ -214,6 +238,9 @@ readable.on('readable', function() {
});
```

If this method returns a data chunk, then it will also trigger the
emission of a [`'data'` event][].

#### readable.setEncoding(encoding)

* `encoding` {String} The encoding to use.
Expand Down Expand Up @@ -244,9 +271,9 @@ readable.on('data', function(chunk) {
This method will cause the readable stream to resume emitting `data`
events.

This method will switch the stream into flowing-mode. If you do *not*
This method will switch the stream into flowing mode. If you do *not*
want to consume the data from a stream, but you *do* want to get to
its `end` event, you can call `readable.resume()` to open the flow of
its `end` event, you can call [`readable.resume()`][] to open the flow of
data.

```javascript
Expand All @@ -259,13 +286,9 @@ readable.on('end', function(chunk) {

#### readable.pause()

This method will cause a stream in flowing-mode to stop emitting
`data` events. Any data that becomes available will remain in the
internal buffer.

This method is only relevant in flowing mode. When called on a
non-flowing stream, it will switch into flowing mode, but remain
paused.
This method will cause a stream in flowing mode to stop emitting
`data` events, switching out of flowing mode. Any data that becomes
available will remain in the internal buffer.

```javascript
var readable = getReadableStreamSomehow();
Expand Down Expand Up @@ -414,7 +437,7 @@ entire Streams API as it is today. (See "Compatibility" below for
more information.)

If you are using an older Node library that emits `'data'` events and
has a `pause()` method that is advisory only, then you can use the
has a [`pause()`][] method that is advisory only, then you can use the
`wrap()` method to create a [Readable][] stream that uses the old stream
as its data source.

Expand Down Expand Up @@ -1298,23 +1321,23 @@ simpler, but also less powerful and less useful.
events would start emitting immediately. If you needed to do some
I/O to decide how to handle data, then you had to store the chunks
in some kind of buffer so that they would not be lost.
* The `pause()` method was advisory, rather than guaranteed. This
* The [`pause()`][] method was advisory, rather than guaranteed. This
meant that you still had to be prepared to receive `'data'` events
even when the stream was in a paused state.

In Node v0.10, the Readable class described below was added. For
backwards compatibility with older Node programs, Readable streams
switch into "flowing mode" when a `'data'` event handler is added, or
when the `pause()` or `resume()` methods are called. The effect is
that, even if you are not using the new `read()` method and
`'readable'` event, you no longer have to worry about losing `'data'`
chunks.
when the [`resume()`][] method is called. The effect is that, even if
you are not using the new `read()` method and `'readable'` event, you
no longer have to worry about losing `'data'` chunks.

Most programs will continue to function normally. However, this
introduces an edge case in the following conditions:

* No `'data'` event handler is added.
* The `pause()` and `resume()` methods are never called.
* No [`'data'` event][] handler is added.
* The [`resume()`][] method is never called.
* The stream is not piped to any writable destination.

For example, consider the following code:

Expand All @@ -1336,7 +1359,7 @@ simply discarded. However, in Node v0.10 and beyond, the socket will
remain paused forever.

The workaround in this situation is to call the `resume()` method to
trigger "old mode" behavior:
start the flow of data:

```javascript
// Workaround
Expand All @@ -1352,9 +1375,9 @@ net.createServer(function(socket) {
}).listen(1337);
```

In addition to new Readable streams switching into flowing-mode, pre-v0.10
style streams can be wrapped in a Readable class using the `wrap()`
method.
In addition to new Readable streams switching into flowing mode,
pre-v0.10 style streams can be wrapped in a Readable class using the
`wrap()` method.


### Object Mode
Expand Down Expand Up @@ -1494,3 +1517,9 @@ modify them.
[_write]: #stream_writable_write_chunk_encoding_callback_1
[`util.inherits`]: util.html#util_util_inherits_constructor_superconstructor
[`end()`]: #stream_writable_end_chunk_encoding_callback
[`'data'` event]: #stream_event_data
[`resume()`]: #stream_readable_resume
[`readable.resume()`]: #stream_readable_resume
[`pause()`]: #stream_readable_pause
[`unpipe()`]: #stream_readable_unpipe_destination
[`pipe()`]: #stream_readable_pipe_destination_options

0 comments on commit 0f8de5e

Please sign in to comment.