Permalink
Browse files

Update docs

  • Loading branch information...
1 parent abe4b73 commit eeb890f3b3109221028fb4a2179169f743c72b25 @isaacs isaacs committed Oct 2, 2012
Showing with 382 additions and 44 deletions.
  1. +382 −44 README.md
View
426 README.md
@@ -9,43 +9,21 @@ This is an abstract class designed to be extended. It also provides a
streams that have the "readable stream" interface of Node 0.8 and
before.
-## Usage
-
-```javascript
-var Readable = require('readable-stream');
-var r = new Readable();
-
-r.read = function(n) {
- // your magic goes here.
- // return n bytes, or null if there is nothing to be read.
- // if you return null, then you MUST emit 'readable' at some
- // point in the future if there are bytes available, or 'end'
- // if you are not going to have any more data.
- //
- // You MUST NOT emit either 'end' or 'readable' before
- // returning from this function, but you MAY emit 'end' or
- // 'readable' in process.nextTick().
-};
-
-r.on('end', function() {
- // no more bytes will be provided.
-});
-
-r.on('readable', function() {
- // now is the time to call read() again.
-});
-```
+Note that Transform, Writable, and PassThrough streams are also
+provided as base classes. See the full API details below.
## Justification
-Writable streams in node are very straightforward to use and extend.
-The `write` method either returns `true` if the bytes could be
-completely handled and another `write` should be performed, or `false`
-if you would like the user to back off a bit, in which case a `drain`
-event at some point in the future will let them continue writing. The
-`end()` method lets the user indicate that no more bytes will be
-written. That's pretty much the entire required interface for
-writing.
+<!-- misc -->
+
+Writable streams in node are relatively straightforward to use and
+extend. The `write` method either returns `false` if you would like
+the user to back off a bit, in which case a `drain` event at some
+point in the future will let them continue writing, or anything other
+than false if the bytes could be completely handled and another
+`write` should be performed, or The `end()` method lets the user
+indicate that no more bytes will be written. That's pretty much the
+entire required interface for writing.
However, readable streams in Node 0.8 and before are rather
complicated.
@@ -56,6 +34,9 @@ complicated.
2. If you extend the interface in userland programs, then you must
implement `pause()` and `resume()` methods, and take care of
buffering yourself.
+3. In many streams, `pause()` was purely advisory, so **even while
+ paused**, you still have to be careful that you might get some
+ data. This caused a lot of subtle b ugs.
So, while writers only have to implement `write()`, `end()`, and
`drain`, readers have to implement (at minimum):
@@ -65,11 +46,14 @@ So, while writers only have to implement `write()`, `end()`, and
* `data` event
* `end` event
+And read consumers had to always be prepared for their backpressure
+advice to simply be ignored.
+
If you are using a readable stream, and want to just get the first 10
bytes, make a decision, and then pass the rest off to somewhere else,
-then you have to handle buffering, pausing, and so on. This is all
-rather brittle and easy to get wrong for all but the most trivial use
-cases.
+then you have to handle buffering, pausing, slicing, and so on. This
+is all rather brittle and easy to get wrong for all but the most
+trivial use cases.
Additionally, this all made the `reader.pipe(writer)` method
unnecessarily complicated and difficult to extend without breaking
@@ -78,34 +62,48 @@ and brittle.
### Solution
+<!-- misc -->
+
The reader does not have pause/resume methods. If you want to consume
the bytes, you call `read()`. If bytes are not being consumed, then
effectively the stream is in a paused state. It exerts backpressure
-on upstream connections, doesn't read from files, etc.
+on upstream connections, doesn't read from files, etc. Any data that
+was already in the process of being read will be placed in a buffer.
If `read()` returns `null`, then a future `readable` event will be
fired when there are more bytes ready to be consumed.
This is simpler and conceptually closer to the underlying mechanisms.
-The resulting `pipe()` method is much shorter and simpler.
+The resulting `pipe()` method is much shorter and simpler. The
+problems of data events happening while paused are alleviated.
### Compatibility
+<!-- misc -->
+
It's not particularly difficult to wrap older-style streams in this
new interface, or to wrap this type of stream in the older-style
interface.
-The `Readable` class takes an argument which is an old-style stream
-with `data` events and `pause()` and `resume()` methods, and uses that
-as the data source. For example:
+The `Readable` class provides a `wrap(oldStream)` method that takes an
+argument which is an old-style stream with `data` events and `pause()`
+and `resume()` methods, and uses that as the data source. For
+example:
```javascript
-var r = new Readable(oldReadableStream);
+var r = new Readable();
+r.wrap(oldReadableStream);
// now you can use r.read(), and it will emit 'readable' events
+// but the data is based on whatever oldReadableStream spits out of
+// its 'data' events.
```
-The `Readable` class will also automatically convert into an old-style
+In order to work with programs that use the older interface, some
+magic is unfortunately required. At some point in the future, this
+magic will be removed.
+
+The `Readable` class will automatically convert into an old-style
`data`-emitting stream if any listeners are added to the `data` event.
So, this works fine, though you of course lose a lot of the benefits of
the new interface:
@@ -115,8 +113,13 @@ var r = new ReadableThing();
r.on('data', function(chunk) {
// ...
+ // magic is happening! oh no! the animals are walking upright!
+ // the brooms are sweeping the floors all by themselves!
});
+// this will also turn on magic-mode:
+r.pause();
+
// now pause, resume, etc. are patched into place, and r will
// continually call read() until it returns null, emitting the
// returned chunks in 'data' events.
@@ -125,3 +128,338 @@ r.on('end', function() {
// ...
});
```
+
+## Class: Readable
+
+A base class for implementing Readable streams. Override the
+`_read(n,cb)` method to fetch data asynchronously and take advantage
+of the buffering built into the Readable class.
+
+### Example
+
+Extend the Readable class, and provide a `_read(n,cb)` implementation
+method.
+
+```javascript
+var Readable = require('readable-stream');
+var util = require('util');
+
+util.inherits(MyReadable, Readable);
+
+function MyReadable(options) {
+ Readable.call(this, options);
+}
+
+MyReadable.prototype._read = function(n, cb) {
+ // your magic goes here.
+ // call the cb at some time in the future with either up to n bytes,
+ // or an error, like cb(err, resultData)
+ //
+ // The code in the Readable class will call this to keep an internal
+ // buffer at a healthy level, as the user calls var chunk=stream.read(n)
+ // to consume chunks.
+};
+
+var r = new MyReadable();
+
+r.on('end', function() {
+ // no more bytes will be provided.
+});
+
+r.on('readable', function() {
+ // now is the time to call read() again.
+});
+
+// to get some bytes out of it:
+var data = r.read(optionalLengthArgument);
+// now data is either null, or a buffer of optionalLengthArgument
+// length. If you don't provide an argument, then it returns whatever
+// it has.
+
+// typically you'd just r.pipe() into some writable stream, but you
+// can of course do stuff like this, as well:
+function flow() {
+ var chunk;
+ while (null !== (chunk = r.read())) {
+ doSomethingWithData(chunk);
+ }
+ r.once('readable', flow);
+}
+flow();
+```
+
+### new Readable(options)
+
+* `options` {Object}
+ * `lowWaterMark` {Number} The minimum number of bytes before the
+ stream is considered 'readable'. Default = `1024`
+ * `bufferSize` {Number} The number of bytes to try to read from the
+ underlying `_read` function. Default = `16 * 1024`
+
+Make sure to call the `Readable` constructor in your extension
+classes, or else the stream will not be properly initialized.
+
+### readable.read([n])
+
+* `n` {Number} Optional number of bytes to read. If not provided,
+ then return however many bytes are available.
+* Returns: {Buffer | null}
+
+Pulls the requested number of bytes out of the internal buffer. If
+that many bytes are not available, then it returns `null`.
+
+### readable.\_read(n, callback)
+
+* `n` {Number} Number of bytes to read from the underlying
+ asynchronous data source.
+* `callback` {Function} Callback function
+ * `error` {Error Object}
+ * `data` {Buffer | null}
+
+**Note: This function is not implemented in the Readable base class.**
+Rather, it is up to you to implement `_read` in your extension
+classes.
+
+`_read` should fetch the specified number of bytes, and call the
+provided callback with `cb(error, data)`, where `error` is any error
+encountered, and `data` is the returned data.
+
+This method is prefixed with an underscore because it is internal to
+the class that defines it, and should not be called directly by user
+programs. However, you **are** expected to override this method in
+your own extension classes.
+
+### readable.pipe(destination)
+
+* `destination` {Writable Stream object}
+
+Continually `read()` data out of the readable stream, and `write()` it
+into the writable stream. When the `writable.write(chunk)` call
+returns `false`, then it will back off until the next `drain` event,
+to do backpressure.
+
+Piping to multiple destinations is supported. The slowest destination
+stream will limit the speed of the `pipe()` flow.
+
+Note that this puts the readable stream into a state where not very
+much can be done with it. You can no longer `read()` from the stream
+in other code, without upsetting the pipe() process. However, since
+multiple pipe destinations are supported, you can always create a
+`PassThrough` stream, and pipe the reader to that. For example:
+
+```
+var r = new ReadableWhatever();
+var pt = new PassThrough();
+
+r.pipe(someWritableThing);
+r.pipe(pt);
+
+// now I can call pt.read() to my heart's content.
+// note that if I *don't* call pt.read(), then it'll back up and
+// prevent the pipe() from flowing!
+```
+
+### readable.unpipe([destination])
+
+* `destination` {Writable Stream object} Optional
+
+Remove the provided `destination` stream from the pipe flow. If no
+argument is provided, then it will unhook all piped destinations.
+
+### readable.on('readable')
+
+An event that signals more data is now available to be read from the
+stream. Emitted when more data arrives, after previously calling
+`read()` and getting a null result.
+
+### readable.on('end')
+
+An event that signals that no more data will ever be available on this
+stream. It's over.
+
+### readable.\_readableState
+
+* {Object}
+
+An object that tracks the state of the stream. Buffer information,
+whether or not it has reached the end of the underlying data source,
+etc., are all tracked on this object.
+
+You are strongly encouraged not to modify this in any way, but it is
+often useful to read from.
+
+## Class: Writable
+
+A base class for creating Writable streams. Similar to Readable, you
+can create child classes by overriding the asynchronous
+`_write(chunk,cb)` method, and it will take care of buffering,
+backpressure, and so on.
+
+### new Writable(options)
+
+* `options` {Object}
+ * `highWaterMark` {Number} The number of bytes to store up before it
+ starts returning `false` from write() calls. Default = `16 * 1024`
+ * `lowWaterMark` {Number} The number of bytes that the buffer must
+ get down to before it emits `drain`. Default = `1024`
+
+Make sure to call the `Writable` constructor in your extension
+classes, or else the stream will not be properly initialized.
+
+### writable.write(chunk, [encoding])
+
+* `chunk` {Buffer | String}
+* `encoding` {String} An encoding argument to turn the string chunk
+ into a buffer. Only relevant if `chunk` is a string.
+ Default = `'utf8'`.
+* Returns `false` if you should not write until the next `drain`
+ event, or some other value otherwise.
+
+The basic write function.
+
+### writable.\_write(chunk, callback)
+
+* `chunk` {Buffer}
+* `callback` {Function}
+ * `error` {Error | null} Call with an error object as the first
+ argument to indicate that the write() failed for unfixable
+ reasons.
+
+**Note: This function is not implemented in the Writable base class.**
+Rather, it is up to you to implement `_write` in your extension
+classes.
+
+`_write` should do whatever has to be done in this specific Writable
+class, to handle the bytes being written. Write to a file, send along
+a socket, encrypt as an mp3, whatever needs to be done. Do your I/O
+asynchronously, and call the callback when it's complete.
+
+This method is prefixed with an underscore because it is internal to
+the class that defines it, and should not be called directly by user
+programs. However, you **are** expected to override this method in
+your own extension classes.
+
+### writable.end([chunk], [encoding])
+
+* `chunk` {Buffer | String}
+* `encoding` {String}
+
+If a chunk (and, optionally, an encoding) are provided, then that
+chunk is first passed to `this.write(chunk, encoding)`.
+
+This method is a way to signal to the writable stream that you will
+not be writing any more data. It should be called exactly once for
+every writable stream.
+
+Calling `write()` *after* calling `end()` will trigger an error.
+
+### writable.on('pipe', source)
+
+Emitted when calling `source.pipe(writable)`. See above for the
+description of the `readable.pipe()` method.
+
+### writable.on('unpipe', source)
+
+Emitted when calling `source.unpipe(writable)`. See above for the
+description of the `readable.unpipe()` method.
+
+### writable.on('drain')
+
+If a call to `writable.write()` returns false, then at some point in
+the future, this event will tell you to start writing again.
+
+### writable.on('finish')
+
+When the stream has been ended, and all the data in its internal
+buffer has been consumed, then it emits a `finish` event to let you
+know that it's completely done.
+
+This is particularly handy if you want to know when it is safe to shut
+down a socket or close a file descriptor. At this time, the writable
+stream may be safely disposed. Its mission in life has been
+accomplished.
+
+## Class: Transform
+
+A duplex (ie, both readable and writable) stream that is designed to
@Raynos
Raynos Oct 2, 2012

RW does not mean duplex. Transform is actually a through stream.

through :-
A call to write eventually causes the code belonging to the writable side of the stream to mutate the underlying readable buffer

duplex :-
A call to write does something. At some point later there is data in the readable side. At the high level description you can understand there is some connection between the two. Almost always this connection through another stream or another process

A duplex stream is not allowed to modify the readable buffer from the writable side. Not directly anyway.

@isaacs
isaacs Oct 2, 2012

RW does exactly mean duplex. That's what duplex means, and it's all duplex means. The concept of "through" streams presented here in this comment is incomplete and flawed, and a subset of what we call "duplex".

The paradigm case of a duplex is a TCP socket. The paradigm case of a "through" stream is a null pass-through. Consider this:

net.createServer(function(sock) {
  sock.pipe(sock);
}).listen(1234);

var duplexOrThroughYouBeTheJudge = net.connect(1234);

Is this a duplex stream? Clearly. Yet there is no semantic or effective difference between this and a "through" stream that has a slight time delay. It is only an implementation detail.

A call to write eventually causes the code belonging to the writable side of the stream to mutate the underlying readable buffer

In the case of a zlib deflate stream, many writes will not cause the underlying readable buffer to be mutated, at least not directly, immediately, or with any sort of symmetry (assuming you are not writing random bytes, but actual data that can be compressed). And yet in the clearly duplex stream in the example above, whatever you write is the sole cause of what you can later read.

From a semantic and API point of view, a through stream is exactly a duplex stream. It is readable. It is writable. Reads will be causally connected to the writes, of course, but that gets us into a very philosophical territory. When you are on the phone with your mother, the words you say are causally connected to the words she says, but I'd still call that conversation duplex.

It seems almost like what you want to say is something like through streams happen "here", and duplex streams are connected to something "over there", in the sense of which program "controls" what happens. But that is an extremely fuzzy boundary! Every so-called "through stream" in node-core will be connected up to some underlying computation. In this day and age of SaaS and distributed workers and clusters of servers connecting via HTTP and job registries, the difference between "here" and "there" is very vague. Before we had zlib streams in core, I used a gzip child process to compress and decompress data. Consider this:

// a naive cp-to-stream example for the purpose of discussion
// irl, you'd want to be a little more clever so that you do
// backpressure properly if no one is consuming the output.
util.inherits(Gzip, Transform);
function Gzip(options) {
  Transform.call(this, options);
  this._cp = child_process.spawn('gzip',[]);
  this._cp.stdout.on('data', this._output);
}
Gzip.prototype._transform = function(chunk, output, cb) {
  var ret = this._cp.stdin.write(chunk);
  if (ret === false)
    this._cp.stdin.once('drain', cb);
  else
    cb();
};

From the outside, this would look exactly like a zlib "through" stream. But the implementation is a child process, a clearly duplex sort of thing!

@Raynos
Raynos Oct 2, 2012

@isaacs what about this difference

through1.pipe(through2).pipe(through1)

is an infinite loop

duplex1.pipe(duplex2).pipe(duplex1)

is not (unless duplex1 is an echo duplex stream)

@isaacs
isaacs Oct 2, 2012

It might be an infinite loop, you never know :) That's actually not a difference.

+make it easy to implement transform operations such as encryption,
+decryption, compression, and so on.
+
+Transform streams are `instanceof` Readable, but they have all of the
+methods and properties of both Readable and Writable streams. See
+above for the list of events and methods that Transform inherits from
+Writable and Readable.
+
+Override the `_transform(chunk, outputFunction, callback)` method in
+your implementation classes to take advantage of it.
+
+### new Transform(options)
+
+* `options` {Object} Passed to both the Writable and Readable
+ constructors.
+
+Make sure to call the `Transform` constructor in your extension
+classes, or else the stream will not be properly initialized.
+
+### transform.\_transform(chunk, outputFn, callback)
+
+* `chunk` {Buffer} The chunk to be transformed.
+* `outputFn` {Function} Call this function with any output data to be
+ passed to the readable interface.
+* `callback` {Function} Call this function (optionally with an error
+ argument) when you are done processing the supplied chunk.
+
+**Note: This function is not implemented in the Transform base class.**
+Rather, it is up to you to implement `_transform` in your extension
+classes.
+
+`_transform` should do whatever has to be done in this specific
+Transform class, to handle the bytes being written, and pass them off
+to the readable portion of the interface. Do asynchronous I/O,
+process things, and so on.
+
+Call the callback function only when the current chunk is completely
+consumed. Note that this may mean that you call the `outputFn` zero
+or more times, depending on how much data you want to output as a
+result of this chunk.
+
+This method is prefixed with an underscore because it is internal to
+the class that defines it, and should not be called directly by user
+programs. However, you **are** expected to override this method in
+your own extension classes.
+
+### transform.\_flush(outputFn, callback)
+
+* `outputFn` {Function} Call this function with any output data to be
+ passed to the readable interface.
+* `callback` {Function} Call this function (optionally with an error
+ argument) when you are done flushing any remaining data.
+
+**Note: This function is not implemented in the Transform base class.**
+Rather, it is up to you to implement `_flush` in your extension
+classes optionally, if it applies to your use case.
+
+In some cases, your transform operation may need to emit a bit more
+data at the end of the stream. For example, a `Zlib` compression
+stream will store up some internal state so that it can optimally
+compress the output. At the end, however, it needs to do the best it
+can with what is left, so that the data will be complete.
+
+In those cases, you can implement a `_flush` method, which will be
+called at the very end, after all the written data is consumed, but
+before emitting `end` to signal the end of the readable side. Just
+like with `_transform`, call `outputFn` zero or more times, as
+appropriate, and call `callback` when the flush operation is complete.
+
+This method is prefixed with an underscore because it is internal to
+the class that defines it, and should not be called directly by user
+programs. However, you **are** expected to override this method in
+your own extension classes.
+
+
+## Class: PassThrough
+
+This is a trivial implementation of a `Transform` stream that simply
+passes the input bytes across to the output. Its purpose is mainly
+for examples and testing, but there are occasionally use cases where
+it can come in handy.

0 comments on commit eeb890f

Please sign in to comment.