Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

doc: Provide 2 examples of SimpleProtocol parser

The first example uses Readable, and shows the use of
readable.unshift().  The second uses the Transform class, showing that
it's much simpler in this case.
  • Loading branch information...
commit 6479405d545c8d7c24fbd05f16ba8a438840ba8d 1 parent 7410a93
@isaacs authored
Showing with 210 additions and 16 deletions.
  1. +210 −16 doc/api/stream.markdown
View
226 doc/api/stream.markdown
@@ -84,7 +84,7 @@ method.
A `Readable Stream` has the following methods, members, and events.
Note that `stream.Readable` is an abstract class designed to be
-extended with an underlying implementation of the `_read(size, cb)`
+extended with an underlying implementation of the `_read(size)`
method. (See below.)
### new stream.Readable([options])
@@ -105,32 +105,39 @@ In classes that extend the Readable class, make sure to call the
constructor so that the buffering settings can be properly
initialized.
-### readable.\_read(size, callback)
+### readable.\_read(size)
* `size` {Number} Number of bytes to read asynchronously
* `callback` {Function} Called with an error or with data
-All Readable stream implementations must provide a `_read` method
-to fetch data from the underlying resource.
-
-Note: **This function MUST NOT be called directly.** It should be
+Note: **This function should NOT be called directly.** It should be
implemented by child classes, and called by the internal Readable
class methods only.
-Call the callback using the standard `callback(error, data)` pattern.
-When no more data can be fetched, call `callback(null, null)` to
-signal the EOF.
+All Readable stream implementations must provide a `_read` method
+to fetch data from the underlying resource.
This method is prefixed with an underscore because it is internal to
the class that defines it, and should not be called directly by user
programs. However, you **are** expected to override this method in
your own extension classes.
+When data is available, put it into the read queue by calling
+`readable.push(chunk)`. If `push` returns false, then you should stop
+reading. When `_read` is called again, you should start pushing more
+data.
+
### readable.push(chunk)
* `chunk` {Buffer | null | String} Chunk of data to push into the read queue
* return {Boolean} Whether or not more pushes should be performed
+Note: **This function should be called by Readable implementors, NOT
+by consumers of Readable subclasses.** The `_read()` function will not
+be called again until at least one `push(chunk)` call is made. If no
+data is available, then you MAY call `push('')` (an empty string) to
+allow a future `_read` call, without adding any data to the queue.
+
The `Readable` class works by putting data into a read queue to be
pulled out later by calling the `read()` method when the `'readable'`
event fires.
@@ -167,6 +174,115 @@ stream._read = function(size, cb) {
};
```
+### readable.unshift(chunk)
+
+* `chunk` {Buffer | null | String} Chunk of data to unshift onto the read queue
+* return {Boolean} Whether or not more pushes should be performed
+
+This is the corollary of `readable.push(chunk)`. Rather than putting
+the data at the *end* of the read queue, it puts it at the *front* of
+the read queue.
+
+This is useful in certain use-cases where a stream is being consumed
+by a parser, which needs to "un-consume" some data that it has
+optimistically pulled out of the source.
+
+```javascript
+// A parser for a simple data protocol.
+// The "header" is a JSON object, followed by 2 \n characters, and
+// then a message body.
+//
+// Note: This can be done more simply as a Transform stream. See below.
+
+function SimpleProtocol(source, options) {
+ if (!(this instanceof SimpleProtocol))
+ return new SimpleProtocol(options);
+
+ Readable.call(this, options);
+ this._inBody = false;
+ this._sawFirstCr = false;
+
+ // source is a readable stream, such as a socket or file
+ this._source = source;
+
+ var self = this;
+ source.on('end', function() {
+ self.push(null);
+ });
+
+ // give it a kick whenever the source is readable
+ // read(0) will not consume any bytes
+ source.on('readable', function() {
+ self.read(0);
+ });
+
+ this._rawHeader = [];
+ this.header = null;
+}
+
+SimpleProtocol.prototype = Object.create(
+ Readable.prototype, { constructor: { value: SimpleProtocol }});
+
+SimpleProtocol.prototype._read = function(n) {
+ if (!this._inBody) {
+ var chunk = this._source.read();
+
+ // if the source doesn't have data, we don't have data yet.
+ if (chunk === null)
+ return this.push('');
+
+ // check if the chunk has a \n\n
+ var split = -1;
+ for (var i = 0; i < chunk.length; i++) {
+ if (chunk[i] === 10) { // '\n'
+ if (this._sawFirstCr) {
+ split = i;
+ break;
+ } else {
+ this._sawFirstCr = true;
+ }
+ } else {
+ this._sawFirstCr = false;
+ }
+ }
+
+ if (split === -1) {
+ // still waiting for the \n\n
+ // stash the chunk, and try again.
+ this._rawHeader.push(chunk);
+ this.push('');
+ } else {
+ this._inBody = true;
+ var h = chunk.slice(0, split);
+ this._rawHeader.push(h);
+ var header = Buffer.concat(this._rawHeader).toString();
+ try {
+ this.header = JSON.parse(header);
+ } catch (er) {
+ this.emit('error', new Error('invalid simple protocol data'));
+ return;
+ }
+ // now, because we got some extra data, unshift the rest
+ // back into the read queue so that our consumer will see it.
+ this.unshift(b);
+
+ // and let them know that we are done parsing the header.
+ this.emit('header', this.header);
+ }
+ } else {
+ // from there on, just provide the data to our consumer.
+ // careful not to push(null), since that would indicate EOF.
+ var chunk = this._source.read();
+ if (chunk) this.push(chunk);
+ }
+};
+
+// Usage:
+var parser = new SimpleProtocol(source);
+// Now parser is a readable stream that will emit 'header'
+// with the parsed header data.
+```
+
### readable.wrap(stream)
* `stream` {Stream} An "old style" readable stream
@@ -232,6 +348,8 @@ constructor.
* `size` {Number | null} Optional number of bytes to read.
* Return: {Buffer | String | null}
+Note: **This function SHOULD be called by Readable stream users.**
+
Call this method to consume data once the `'readable'` event is
emitted.
@@ -243,8 +361,8 @@ If there is no data to consume, or if there are fewer bytes in the
internal buffer than the `size` argument, then `null` is returned, and
a future `'readable'` event will be emitted when more is available.
-Note that calling `stream.read(0)` will always return `null`, and will
-trigger a refresh of the internal buffer, but otherwise be a no-op.
+Calling `stream.read(0)` will always return `null`, and will trigger a
+refresh of the internal buffer, but otherwise be a no-op.
### readable.pipe(destination, [options])
@@ -416,14 +534,14 @@ A "duplex" stream is one that is both Readable and Writable, such as a
TCP socket connection.
Note that `stream.Duplex` is an abstract class designed to be
-extended with an underlying implementation of the `_read(size, cb)`
+extended with an underlying implementation of the `_read(size)`
and `_write(chunk, callback)` methods as you would with a Readable or
Writable stream class.
Since JavaScript doesn't have multiple prototypal inheritance, this
class prototypally inherits from Readable, and then parasitically from
Writable. It is thus up to the user to implement both the lowlevel
-`_read(n,cb)` method as well as the lowlevel `_write(chunk,cb)` method
+`_read(n)` method as well as the lowlevel `_write(chunk,cb)` method
on extension duplex classes.
### new stream.Duplex(options)
@@ -471,13 +589,13 @@ initialized.
* `callback` {Function} Call this function (optionally with an error
argument) when you are done processing the supplied chunk.
-All Transform stream implementations must provide a `_transform`
-method to accept input and produce output.
-
Note: **This function MUST NOT be called directly.** It should be
implemented by child classes, and called by the internal Transform
class methods only.
+All Transform stream implementations must provide a `_transform`
+method to accept input and produce output.
+
`_transform` should do whatever has to be done in this specific
Transform class, to handle the bytes being written, and pass them off
to the readable portion of the interface. Do asynchronous I/O,
@@ -521,6 +639,82 @@ the class that defines it, and should not be called directly by user
programs. However, you **are** expected to override this method in
your own extension classes.
+### Example: `SimpleProtocol` parser
+
+The example above of a simple protocol parser can be implemented much
+more simply by using the higher level `Transform` stream class.
+
+In this example, rather than providing the input as an argument, it
+would be piped into the parser, which is a more idiomatic Node stream
+approach.
+
+```javascript
+function SimpleProtocol(options) {
+ if (!(this instanceof SimpleProtocol))
+ return new SimpleProtocol(options);
+
+ Transform.call(this, options);
+ this._inBody = false;
+ this._sawFirstCr = false;
+ this._rawHeader = [];
+ this.header = null;
+}
+
+SimpleProtocol.prototype = Object.create(
+ Transform.prototype, { constructor: { value: SimpleProtocol }});
+
+SimpleProtocol.prototype._transform = function(chunk, output, done) {
+ if (!this._inBody) {
+ // check if the chunk has a \n\n
+ var split = -1;
+ for (var i = 0; i < chunk.length; i++) {
+ if (chunk[i] === 10) { // '\n'
+ if (this._sawFirstCr) {
+ split = i;
+ break;
+ } else {
+ this._sawFirstCr = true;
+ }
+ } else {
+ this._sawFirstCr = false;
+ }
+ }
+
+ if (split === -1) {
+ // still waiting for the \n\n
+ // stash the chunk, and try again.
+ this._rawHeader.push(chunk);
+ } else {
+ this._inBody = true;
+ var h = chunk.slice(0, split);
+ this._rawHeader.push(h);
+ var header = Buffer.concat(this._rawHeader).toString();
+ try {
+ this.header = JSON.parse(header);
+ } catch (er) {
+ this.emit('error', new Error('invalid simple protocol data'));
+ return;
+ }
+ // and let them know that we are done parsing the header.
+ this.emit('header', this.header);
+
+ // now, because we got some extra data, emit this first.
+ output(b);
+ }
+ } else {
+ // from there on, just provide the data to our consumer as-is.
+ output(b);
+ }
+ done();
+};
+
+var parser = new SimpleProtocol();
+source.pipe(parser)
+
+// Now parser is a readable stream that will emit 'header'
+// with the parsed header data.
+```
+
## Class: stream.PassThrough
Please sign in to comment.
Something went wrong with that request. Please try again.