Skip to content
Newer
Older
100644 1531 lines (1197 sloc) 48.7 KB
227307e @tj Added node.*.md files
authored
1 # Stream
2
3 Stability: 2 - Unstable
4
328f9ce @tj update node
authored
5 A stream is an abstract interface implemented by various objects in
6 Node. For example a [request to an HTTP
7 server](http.html#http_http_incomingmessage) is a stream, as is
8 [stdout][]. Streams are readable, writable, or both. All streams are
9 instances of [EventEmitter][]
10
11 You can load the Stream base classes by doing `require('stream')`.
12 There are base classes provided for [Readable][] streams, [Writable][]
13 streams, [Duplex][] streams, and [Transform][] streams.
14
15 This document is split up into 3 sections. The first explains the
16 parts of the API that you need to be aware of to use streams in your
17 programs. If you never implement a streaming API yourself, you can
18 stop there.
19
20 The second section explains the parts of the API that you need to use
21 if you implement your own custom streams yourself. The API is
22 designed to make this easy for you to do.
23
24 The third section goes into more depth about how streams work,
25 including some of the internal mechanisms and functions that you
26 should probably not modify unless you definitely know what you are
27 doing.
28
29
30 ## API for Stream Consumers
31
32 <!--type=misc-->
33
34 Streams can be either [Readable][], [Writable][], or both ([Duplex][]).
35
36 All streams are EventEmitters, but they also have other custom methods
37 and properties depending on whether they are Readable, Writable, or
38 Duplex.
39
40 If a stream is both Readable and Writable, then it implements all of
41 the methods and events below. So, a [Duplex][] or [Transform][] stream is
42 fully described by this API, though their implementation may be
43 somewhat different.
44
45 It is not necessary to implement Stream interfaces in order to consume
46 streams in your programs. If you **are** implementing streaming
47 interfaces in your own program, please also refer to
48 [API for Stream Implementors][] below.
49
50 Almost all Node programs, no matter how simple, use Streams in some
51 way. Here is an example of using Streams in a Node program:
52
53 ```javascript
54 var http = require('http');
55
56 var server = http.createServer(function (req, res) {
57 // req is an http.IncomingMessage, which is a Readable Stream
58 // res is an http.ServerResponse, which is a Writable Stream
59
60 var body = '';
61 // we want to get the data as utf8 strings
62 // If you don't set an encoding, then you'll get Buffer objects
63 req.setEncoding('utf8');
64
65 // Readable streams emit 'data' events once a listener is added
66 req.on('data', function (chunk) {
67 body += chunk;
68 })
69
70 // the end event tells you that you have entire body
71 req.on('end', function () {
72 try {
73 var data = JSON.parse(body);
74 } catch (er) {
75 // uh oh! bad json!
76 res.statusCode = 400;
77 return res.end('error: ' + er.message);
78 }
79
80 // write back something interesting to the user:
81 res.write(typeof data);
82 res.end();
83 })
84 })
85
86 server.listen(1337);
87
88 // $ curl localhost:1337 -d '{}'
89 // object
90 // $ curl localhost:1337 -d '"foo"'
91 // string
92 // $ curl localhost:1337 -d 'not json'
93 // error: Unexpected token o
94 ```
95
96 ### Class: stream.Readable
227307e @tj Added node.*.md files
authored
97
328f9ce @tj update node
authored
98 <!--type=class-->
227307e @tj Added node.*.md files
authored
99
328f9ce @tj update node
authored
100 The Readable stream interface is the abstraction for a *source* of
101 data that you are reading from. In other words, data comes *out* of a
102 Readable stream.
227307e @tj Added node.*.md files
authored
103
328f9ce @tj update node
authored
104 A Readable stream will not start emitting data until you indicate that
105 you are ready to receive it.
227307e @tj Added node.*.md files
authored
106
328f9ce @tj update node
authored
107 Readable streams have two "modes": a **flowing mode** and a **paused
108 mode**. When in flowing mode, data is read from the underlying system
109 and provided to your program as fast as possible. In paused mode, you
110 must explicitly call `stream.read()` to get chunks of data out.
111 Streams start out in paused mode.
227307e @tj Added node.*.md files
authored
112
328f9ce @tj update node
authored
113 **Note**: If no data event handlers are attached, and there are no
114 [`pipe()`][] destinations, and the stream is switched into flowing
115 mode, then data will be lost.
227307e @tj Added node.*.md files
authored
116
328f9ce @tj update node
authored
117 You can switch to flowing mode by doing any of the following:
227307e @tj Added node.*.md files
authored
118
328f9ce @tj update node
authored
119 * Adding a [`'data'` event][] handler to listen for data.
120 * Calling the [`resume()`][] method to explicitly open the flow.
121 * Calling the [`pipe()`][] method to send the data to a [Writable][].
227307e @tj Added node.*.md files
authored
122
328f9ce @tj update node
authored
123 You can switch back to paused mode by doing either of the following:
227307e @tj Added node.*.md files
authored
124
328f9ce @tj update node
authored
125 * If there are no pipe destinations, by calling the [`pause()`][]
126 method.
127 * If there are pipe destinations, by removing any [`'data'` event][]
128 handlers, and removing all pipe destinations by calling the
129 [`unpipe()`][] method.
227307e @tj Added node.*.md files
authored
130
328f9ce @tj update node
authored
131 Note that, for backwards compatibility reasons, removing `'data'`
132 event handlers will **not** automatically pause the stream. Also, if
133 there are piped destinations, then calling `pause()` will not
134 guarantee that the stream will *remain* paused once those
135 destinations drain and ask for more data.
227307e @tj Added node.*.md files
authored
136
328f9ce @tj update node
authored
137 Examples of readable streams include:
227307e @tj Added node.*.md files
authored
138
328f9ce @tj update node
authored
139 * [http responses, on the client](http.html#http_http_incomingmessage)
140 * [http requests, on the server](http.html#http_http_incomingmessage)
141 * [fs read streams](fs.html#fs_class_fs_readstream)
142 * [zlib streams][]
143 * [crypto streams][]
144 * [tcp sockets][]
145 * [child process stdout and stderr][]
146 * [process.stdin][]
227307e @tj Added node.*.md files
authored
147
328f9ce @tj update node
authored
148 #### Event: 'readable'
227307e @tj Added node.*.md files
authored
149
328f9ce @tj update node
authored
150 When a chunk of data can be read from the stream, it will emit a
151 `'readable'` event.
152
153 In some cases, listening for a `'readable'` event will cause some data
154 to be read into the internal buffer from the underlying system, if it
155 hadn't already.
227307e @tj Added node.*.md files
authored
156
328f9ce @tj update node
authored
157 ```javascript
158 var readable = getReadableStreamSomehow();
159 readable.on('readable', function() {
160 // there is some data to read now
161 })
162 ```
227307e @tj Added node.*.md files
authored
163
328f9ce @tj update node
authored
164 Once the internal buffer is drained, a `readable` event will fire
165 again when more data is available.
227307e @tj Added node.*.md files
authored
166
328f9ce @tj update node
authored
167 #### Event: 'data'
227307e @tj Added node.*.md files
authored
168
328f9ce @tj update node
authored
169 * `chunk` {Buffer | String} The chunk of data.
227307e @tj Added node.*.md files
authored
170
328f9ce @tj update node
authored
171 Attaching a `data` event listener to a stream that has not been
172 explicitly paused will switch the stream into flowing mode. Data will
173 then be passed as soon as it is available.
227307e @tj Added node.*.md files
authored
174
328f9ce @tj update node
authored
175 If you just want to get all the data out of the stream as fast as
176 possible, this is the best way to do so.
227307e @tj Added node.*.md files
authored
177
328f9ce @tj update node
authored
178 ```javascript
179 var readable = getReadableStreamSomehow();
180 readable.on('data', function(chunk) {
181 console.log('got %d bytes of data', chunk.length);
182 })
183 ```
227307e @tj Added node.*.md files
authored
184
328f9ce @tj update node
authored
185 #### Event: 'end'
227307e @tj Added node.*.md files
authored
186
328f9ce @tj update node
authored
187 This event fires when no more data will be provided.
227307e @tj Added node.*.md files
authored
188
328f9ce @tj update node
authored
189 Note that the `end` event **will not fire** unless the data is
190 completely consumed. This can be done by switching into flowing mode,
191 or by calling `read()` repeatedly until you get to the end.
227307e @tj Added node.*.md files
authored
192
328f9ce @tj update node
authored
193 ```javascript
194 var readable = getReadableStreamSomehow();
195 readable.on('data', function(chunk) {
196 console.log('got %d bytes of data', chunk.length);
197 })
198 readable.on('end', function() {
199 console.log('there will be no more data.');
200 });
201 ```
227307e @tj Added node.*.md files
authored
202
328f9ce @tj update node
authored
203 #### Event: 'close'
227307e @tj Added node.*.md files
authored
204
328f9ce @tj update node
authored
205 Emitted when the underlying resource (for example, the backing file
206 descriptor) has been closed. Not all streams will emit this.
227307e @tj Added node.*.md files
authored
207
328f9ce @tj update node
authored
208 #### Event: 'error'
227307e @tj Added node.*.md files
authored
209
328f9ce @tj update node
authored
210 Emitted if there was an error receiving data.
211
212 #### readable.read([size])
213
214 * `size` {Number} Optional argument to specify how much data to read.
215 * Return {String | Buffer | null}
216
217 The `read()` method pulls some data out of the internal buffer and
218 returns it. If there is no data available, then it will return
219 `null`.
220
221 If you pass in a `size` argument, then it will return that many
222 bytes. If `size` bytes are not available, then it will return `null`.
223
224 If you do not specify a `size` argument, then it will return all the
225 data in the internal buffer.
226
227 This method should only be called in paused mode. In flowing mode,
228 this method is called automatically until the internal buffer is
229 drained.
230
231 ```javascript
232 var readable = getReadableStreamSomehow();
233 readable.on('readable', function() {
234 var chunk;
235 while (null !== (chunk = readable.read())) {
236 console.log('got %d bytes of data', chunk.length);
237 }
238 });
239 ```
240
241 If this method returns a data chunk, then it will also trigger the
242 emission of a [`'data'` event][].
243
244 #### readable.setEncoding(encoding)
245
246 * `encoding` {String} The encoding to use.
247 * Return: `this`
248
249 Call this function to cause the stream to return strings of the
250 specified encoding instead of Buffer objects. For example, if you do
251 `readable.setEncoding('utf8')`, then the output data will be
252 interpreted as UTF-8 data, and returned as strings. If you do
253 `readable.setEncoding('hex')`, then the data will be encoded in
254 hexadecimal string format.
255
256 This properly handles multi-byte characters that would otherwise be
257 potentially mangled if you simply pulled the Buffers directly and
258 called `buf.toString(encoding)` on them. If you want to read the data
259 as strings, always use this method.
260
261 ```javascript
262 var readable = getReadableStreamSomehow();
263 readable.setEncoding('utf8');
264 readable.on('data', function(chunk) {
265 assert.equal(typeof chunk, 'string');
266 console.log('got %d characters of string data', chunk.length);
267 })
268 ```
227307e @tj Added node.*.md files
authored
269
328f9ce @tj update node
authored
270 #### readable.resume()
227307e @tj Added node.*.md files
authored
271
328f9ce @tj update node
authored
272 * Return: `this`
227307e @tj Added node.*.md files
authored
273
328f9ce @tj update node
authored
274 This method will cause the readable stream to resume emitting `data`
275 events.
227307e @tj Added node.*.md files
authored
276
328f9ce @tj update node
authored
277 This method will switch the stream into flowing mode. If you do *not*
278 want to consume the data from a stream, but you *do* want to get to
279 its `end` event, you can call [`readable.resume()`][] to open the flow of
280 data.
281
282 ```javascript
283 var readable = getReadableStreamSomehow();
284 readable.resume();
285 readable.on('end', function(chunk) {
286 console.log('got to the end, but did not read anything');
287 })
288 ```
289
290 #### readable.pause()
291
292 * Return: `this`
293
294 This method will cause a stream in flowing mode to stop emitting
295 `data` events, switching out of flowing mode. Any data that becomes
296 available will remain in the internal buffer.
297
298 ```javascript
299 var readable = getReadableStreamSomehow();
300 readable.on('data', function(chunk) {
301 console.log('got %d bytes of data', chunk.length);
302 readable.pause();
303 console.log('there will be no more data for 1 second');
304 setTimeout(function() {
305 console.log('now data will start flowing again');
306 readable.resume();
307 }, 1000);
308 })
309 ```
310
311 #### readable.pipe(destination, [options])
312
313 * `destination` {[Writable][] Stream} The destination for writing data
314 * `options` {Object} Pipe options
315 * `end` {Boolean} End the writer when the reader ends. Default = `true`
316
317 This method pulls all the data out of a readable stream, and writes it
318 to the supplied destination, automatically managing the flow so that
319 the destination is not overwhelmed by a fast readable stream.
320
321 Multiple destinations can be piped to safely.
322
323 ```javascript
324 var readable = getReadableStreamSomehow();
325 var writable = fs.createWriteStream('file.txt');
326 // All the data from readable goes into 'file.txt'
327 readable.pipe(writable);
328 ```
329
330 This function returns the destination stream, so you can set up pipe
331 chains like so:
332
333 ```javascript
334 var r = fs.createReadStream('file.txt');
335 var z = zlib.createGzip();
336 var w = fs.createWriteStream('file.txt.gz');
337 r.pipe(z).pipe(w);
338 ```
339
340 For example, emulating the Unix `cat` command:
341
342 ```javascript
343 process.stdin.pipe(process.stdout);
344 ```
345
346 By default [`end()`][] is called on the destination when the source stream
347 emits `end`, so that `destination` is no longer writable. Pass `{ end:
348 false }` as `options` to keep the destination stream open.
349
350 This keeps `writer` open so that "Goodbye" can be written at the
351 end.
352
353 ```javascript
354 reader.pipe(writer, { end: false });
355 reader.on('end', function() {
356 writer.end('Goodbye\n');
357 });
358 ```
359
360 Note that `process.stderr` and `process.stdout` are never closed until
361 the process exits, regardless of the specified options.
362
363 #### readable.unpipe([destination])
364
365 * `destination` {[Writable][] Stream} Optional specific stream to unpipe
366
367 This method will remove the hooks set up for a previous `pipe()` call.
368
369 If the destination is not specified, then all pipes are removed.
370
371 If the destination is specified, but no pipe is set up for it, then
372 this is a no-op.
373
374 ```javascript
375 var readable = getReadableStreamSomehow();
376 var writable = fs.createWriteStream('file.txt');
377 // All the data from readable goes into 'file.txt',
378 // but only for the first second
379 readable.pipe(writable);
380 setTimeout(function() {
381 console.log('stop writing to file.txt');
382 readable.unpipe(writable);
383 console.log('manually close the file stream');
384 writable.end();
385 }, 1000);
386 ```
387
388 #### readable.unshift(chunk)
389
390 * `chunk` {Buffer | String} Chunk of data to unshift onto the read queue
391
392 This is useful in certain cases where a stream is being consumed by a
393 parser, which needs to "un-consume" some data that it has
394 optimistically pulled out of the source, so that the stream can be
395 passed on to some other party.
396
397 If you find that you must often call `stream.unshift(chunk)` in your
398 programs, consider implementing a [Transform][] stream instead. (See API
399 for Stream Implementors, below.)
400
401 ```javascript
402 // Pull off a header delimited by \n\n
403 // use unshift() if we get too much
404 // Call the callback with (error, header, stream)
405 var StringDecoder = require('string_decoder').StringDecoder;
406 function parseHeader(stream, callback) {
407 stream.on('error', callback);
408 stream.on('readable', onReadable);
409 var decoder = new StringDecoder('utf8');
410 var header = '';
411 function onReadable() {
412 var chunk;
413 while (null !== (chunk = stream.read())) {
414 var str = decoder.write(chunk);
415 if (str.match(/\n\n/)) {
416 // found the header boundary
417 var split = str.split(/\n\n/);
418 header += split.shift();
419 var remaining = split.join('\n\n');
420 var buf = new Buffer(remaining, 'utf8');
421 if (buf.length)
422 stream.unshift(buf);
423 stream.removeListener('error', callback);
424 stream.removeListener('readable', onReadable);
425 // now the body of the message can be read from the stream.
426 callback(null, header, stream);
427 } else {
428 // still reading the header.
429 header += str;
430 }
431 }
432 }
433 }
434 ```
435
436 #### readable.wrap(stream)
437
438 * `stream` {Stream} An "old style" readable stream
439
440 Versions of Node prior to v0.10 had streams that did not implement the
441 entire Streams API as it is today. (See "Compatibility" below for
442 more information.)
443
444 If you are using an older Node library that emits `'data'` events and
445 has a [`pause()`][] method that is advisory only, then you can use the
446 `wrap()` method to create a [Readable][] stream that uses the old stream
447 as its data source.
448
449 You will very rarely ever need to call this function, but it exists
450 as a convenience for interacting with old Node programs and libraries.
451
452 For example:
453
454 ```javascript
455 var OldReader = require('./old-api-module.js').OldReader;
456 var oreader = new OldReader;
457 var Readable = require('stream').Readable;
458 var myReader = new Readable().wrap(oreader);
459
460 myReader.on('readable', function() {
461 myReader.read(); // etc.
462 });
463 ```
464
465
466 ### Class: stream.Writable
467
468 <!--type=class-->
227307e @tj Added node.*.md files
authored
469
328f9ce @tj update node
authored
470 The Writable stream interface is an abstraction for a *destination*
471 that you are writing data *to*.
472
473 Examples of writable streams include:
474
475 * [http requests, on the client](http.html#http_class_http_clientrequest)
476 * [http responses, on the server](http.html#http_class_http_serverresponse)
477 * [fs write streams](fs.html#fs_class_fs_writestream)
478 * [zlib streams][]
479 * [crypto streams][]
480 * [tcp sockets][]
481 * [child process stdin](child_process.html#child_process_child_stdin)
482 * [process.stdout][], [process.stderr][]
483
484 #### writable.write(chunk, [encoding], [callback])
485
486 * `chunk` {String | Buffer} The data to write
487 * `encoding` {String} The encoding, if `chunk` is a String
488 * `callback` {Function} Callback for when this chunk of data is flushed
489 * Returns: {Boolean} True if the data was handled completely.
490
491 This method writes some data to the underlying system, and calls the
492 supplied callback once the data has been fully handled.
493
494 The return value indicates if you should continue writing right now.
495 If the data had to be buffered internally, then it will return
496 `false`. Otherwise, it will return `true`.
497
498 This return value is strictly advisory. You MAY continue to write,
499 even if it returns `false`. However, writes will be buffered in
500 memory, so it is best not to do this excessively. Instead, wait for
501 the `drain` event before writing more data.
502
503 #### Event: 'drain'
504
505 If a [`writable.write(chunk)`][] call returns false, then the `drain`
506 event will indicate when it is appropriate to begin writing more data
507 to the stream.
508
509 ```javascript
510 // Write the data to the supplied writable stream 1MM times.
511 // Be attentive to back-pressure.
512 function writeOneMillionTimes(writer, data, encoding, callback) {
513 var i = 1000000;
514 write();
515 function write() {
516 var ok = true;
517 do {
518 i -= 1;
519 if (i === 0) {
520 // last time!
521 writer.write(data, encoding, callback);
522 } else {
523 // see if we should continue, or wait
524 // don't pass the callback, because we're not done yet.
525 ok = writer.write(data, encoding);
526 }
527 } while (i > 0 && ok);
528 if (i > 0) {
529 // had to stop early!
530 // write some more once it drains
531 writer.once('drain', write);
532 }
533 }
534 }
535 ```
536
537 #### writable.cork()
538
539 Forces buffering of all writes.
540
541 Buffered data will be flushed either at `.uncork()` or at `.end()` call.
227307e @tj Added node.*.md files
authored
542
328f9ce @tj update node
authored
543 #### writable.uncork()
227307e @tj Added node.*.md files
authored
544
328f9ce @tj update node
authored
545 Flush all data, buffered since `.cork()` call.
546
547 #### writable.end([chunk], [encoding], [callback])
548
549 * `chunk` {String | Buffer} Optional data to write
550 * `encoding` {String} The encoding, if `chunk` is a String
551 * `callback` {Function} Optional callback for when the stream is finished
552
553 Call this method when no more data will be written to the stream. If
554 supplied, the callback is attached as a listener on the `finish` event.
555
556 Calling [`write()`][] after calling [`end()`][] will raise an error.
557
558 ```javascript
559 // write 'hello, ' and then end with 'world!'
560 http.createServer(function (req, res) {
561 res.write('hello, ');
562 res.end('world!');
563 // writing more now is not allowed!
564 });
565 ```
566
567 #### Event: 'finish'
568
569 When the [`end()`][] method has been called, and all data has been flushed
570 to the underlying system, this event is emitted.
571
572 ```javascript
573 var writer = getWritableStreamSomehow();
574 for (var i = 0; i < 100; i ++) {
575 writer.write('hello, #' + i + '!\n');
576 }
577 writer.end('this is the end\n');
578 write.on('finish', function() {
579 console.error('all writes are now complete.');
580 });
581 ```
582
583 #### Event: 'pipe'
584
585 * `src` {[Readable][] Stream} source stream that is piping to this writable
586
587 This is emitted whenever the `pipe()` method is called on a readable
588 stream, adding this writable to its set of destinations.
589
590 ```javascript
591 var writer = getWritableStreamSomehow();
592 var reader = getReadableStreamSomehow();
593 writer.on('pipe', function(src) {
594 console.error('something is piping into the writer');
595 assert.equal(src, reader);
596 });
597 reader.pipe(writer);
598 ```
599
600 #### Event: 'unpipe'
601
602 * `src` {[Readable][] Stream} The source stream that [unpiped][] this writable
603
604 This is emitted whenever the [`unpipe()`][] method is called on a
605 readable stream, removing this writable from its set of destinations.
606
607 ```javascript
608 var writer = getWritableStreamSomehow();
609 var reader = getReadableStreamSomehow();
610 writer.on('unpipe', function(src) {
611 console.error('something has stopped piping into the writer');
612 assert.equal(src, reader);
613 });
614 reader.pipe(writer);
615 reader.unpipe(writer);
616 ```
617
618 ### Class: stream.Duplex
619
620 Duplex streams are streams that implement both the [Readable][] and
621 [Writable][] interfaces. See above for usage.
622
623 Examples of Duplex streams include:
624
625 * [tcp sockets][]
626 * [zlib streams][]
627 * [crypto streams][]
628
629
630 ### Class: stream.Transform
631
632 Transform streams are [Duplex][] streams where the output is in some way
633 computed from the input. They implement both the [Readable][] and
634 [Writable][] interfaces. See above for usage.
635
636 Examples of Transform streams include:
637
638 * [zlib streams][]
639 * [crypto streams][]
640
641
642 ## API for Stream Implementors
643
644 <!--type=misc-->
645
646 To implement any sort of stream, the pattern is the same:
647
648 1. Extend the appropriate parent class in your own subclass. (The
649 [`util.inherits`][] method is particularly helpful for this.)
650 2. Call the appropriate parent class constructor in your constructor,
651 to be sure that the internal mechanisms are set up properly.
652 2. Implement one or more specific methods, as detailed below.
653
654 The class to extend and the method(s) to implement depend on the sort
655 of stream class you are writing:
656
657 <table>
658 <thead>
659 <tr>
660 <th>
661 <p>Use-case</p>
662 </th>
663 <th>
664 <p>Class</p>
665 </th>
666 <th>
667 <p>Method(s) to implement</p>
668 </th>
669 </tr>
670 </thead>
671 <tr>
672 <td>
673 <p>Reading only</p>
674 </td>
675 <td>
676 <p>[Readable](#stream_class_stream_readable_1)</p>
677 </td>
678 <td>
679 <p><code>[_read][]</code></p>
680 </td>
681 </tr>
682 <tr>
683 <td>
684 <p>Writing only</p>
685 </td>
686 <td>
687 <p>[Writable](#stream_class_stream_writable_1)</p>
688 </td>
689 <td>
690 <p><code>[_write][]</code></p>
691 </td>
692 </tr>
693 <tr>
694 <td>
695 <p>Reading and writing</p>
696 </td>
697 <td>
698 <p>[Duplex](#stream_class_stream_duplex_1)</p>
699 </td>
700 <td>
701 <p><code>[_read][]</code>, <code>[_write][]</code></p>
702 </td>
703 </tr>
704 <tr>
705 <td>
706 <p>Operate on written data, then read the result</p>
707 </td>
708 <td>
709 <p>[Transform](#stream_class_stream_transform_1)</p>
710 </td>
711 <td>
712 <p><code>_transform</code>, <code>_flush</code></p>
713 </td>
714 </tr>
715 </table>
716
717 In your implementation code, it is very important to never call the
718 methods described in [API for Stream Consumers][] above. Otherwise, you
719 can potentially cause adverse side effects in programs that consume
720 your streaming interfaces.
721
722 ### Class: stream.Readable
723
724 <!--type=class-->
227307e @tj Added node.*.md files
authored
725
328f9ce @tj update node
authored
726 `stream.Readable` is an abstract class designed to be extended with an
727 underlying implementation of the [`_read(size)`][] method.
728
729 Please see above under [API for Stream Consumers][] for how to consume
730 streams in your programs. What follows is an explanation of how to
731 implement Readable streams in your programs.
732
733 #### Example: A Counting Stream
734
735 <!--type=example-->
736
737 This is a basic example of a Readable stream. It emits the numerals
738 from 1 to 1,000,000 in ascending order, and then ends.
739
740 ```javascript
741 var Readable = require('stream').Readable;
742 var util = require('util');
743 util.inherits(Counter, Readable);
744
745 function Counter(opt) {
746 Readable.call(this, opt);
747 this._max = 1000000;
748 this._index = 1;
749 }
750
751 Counter.prototype._read = function() {
752 var i = this._index++;
753 if (i > this._max)
754 this.push(null);
755 else {
756 var str = '' + i;
757 var buf = new Buffer(str, 'ascii');
758 this.push(buf);
759 }
760 };
761 ```
762
763 #### Example: SimpleProtocol v1 (Sub-optimal)
764
765 This is similar to the `parseHeader` function described above, but
766 implemented as a custom stream. Also, note that this implementation
767 does not convert the incoming data to a string.
768
769 However, this would be better implemented as a [Transform][] stream. See
770 below for a better implementation.
771
772 ```javascript
773 // A parser for a simple data protocol.
774 // The "header" is a JSON object, followed by 2 \n characters, and
775 // then a message body.
776 //
777 // NOTE: This can be done more simply as a Transform stream!
778 // Using Readable directly for this is sub-optimal. See the
779 // alternative example below under the Transform section.
780
781 var Readable = require('stream').Readable;
782 var util = require('util');
783
784 util.inherits(SimpleProtocol, Readable);
785
786 function SimpleProtocol(source, options) {
787 if (!(this instanceof SimpleProtocol))
788 return new SimpleProtocol(options);
789
790 Readable.call(this, options);
791 this._inBody = false;
792 this._sawFirstCr = false;
793
794 // source is a readable stream, such as a socket or file
795 this._source = source;
796
797 var self = this;
798 source.on('end', function() {
799 self.push(null);
800 });
801
802 // give it a kick whenever the source is readable
803 // read(0) will not consume any bytes
804 source.on('readable', function() {
805 self.read(0);
806 });
807
808 this._rawHeader = [];
809 this.header = null;
810 }
811
812 SimpleProtocol.prototype._read = function(n) {
813 if (!this._inBody) {
814 var chunk = this._source.read();
815
816 // if the source doesn't have data, we don't have data yet.
817 if (chunk === null)
818 return this.push('');
819
820 // check if the chunk has a \n\n
821 var split = -1;
822 for (var i = 0; i < chunk.length; i++) {
823 if (chunk[i] === 10) { // '\n'
824 if (this._sawFirstCr) {
825 split = i;
826 break;
827 } else {
828 this._sawFirstCr = true;
829 }
830 } else {
831 this._sawFirstCr = false;
832 }
833 }
834
835 if (split === -1) {
836 // still waiting for the \n\n
837 // stash the chunk, and try again.
838 this._rawHeader.push(chunk);
839 this.push('');
840 } else {
841 this._inBody = true;
842 var h = chunk.slice(0, split);
843 this._rawHeader.push(h);
844 var header = Buffer.concat(this._rawHeader).toString();
845 try {
846 this.header = JSON.parse(header);
847 } catch (er) {
848 this.emit('error', new Error('invalid simple protocol data'));
849 return;
850 }
851 // now, because we got some extra data, unshift the rest
852 // back into the read queue so that our consumer will see it.
853 var b = chunk.slice(split);
854 this.unshift(b);
855
856 // and let them know that we are done parsing the header.
857 this.emit('header', this.header);
858 }
859 } else {
860 // from there on, just provide the data to our consumer.
861 // careful not to push(null), since that would indicate EOF.
862 var chunk = this._source.read();
863 if (chunk) this.push(chunk);
864 }
865 };
866
867 // Usage:
868 // var parser = new SimpleProtocol(source);
869 // Now parser is a readable stream that will emit 'header'
870 // with the parsed header data.
871 ```
872
873
874 #### new stream.Readable([options])
875
876 * `options` {Object}
877 * `highWaterMark` {Number} The maximum number of bytes to store in
878 the internal buffer before ceasing to read from the underlying
879 resource. Default=16kb, or 16 for `objectMode` streams
880 * `encoding` {String} If specified, then buffers will be decoded to
881 strings using the specified encoding. Default=null
882 * `objectMode` {Boolean} Whether this stream should behave
883 as a stream of objects. Meaning that stream.read(n) returns
884 a single value instead of a Buffer of size n. Default=false
885
886 In classes that extend the Readable class, make sure to call the
887 Readable constructor so that the buffering settings can be properly
888 initialized.
889
890 #### readable.\_read(size)
891
892 * `size` {Number} Number of bytes to read asynchronously
893
894 Note: **Implement this function, but do NOT call it directly.**
895
896 This function should NOT be called directly. It should be implemented
897 by child classes, and only called by the internal Readable class
898 methods.
899
900 All Readable stream implementations must provide a `_read` method to
901 fetch data from the underlying resource.
902
903 This method is prefixed with an underscore because it is internal to
904 the class that defines it, and should not be called directly by user
905 programs. However, you **are** expected to override this method in
906 your own extension classes.
907
908 When data is available, put it into the read queue by calling
909 `readable.push(chunk)`. If `push` returns false, then you should stop
910 reading. When `_read` is called again, you should start pushing more
911 data.
912
913 The `size` argument is advisory. Implementations where a "read" is a
914 single call that returns data can use this to know how much data to
915 fetch. Implementations where that is not relevant, such as TCP or
916 TLS, may ignore this argument, and simply provide data whenever it
917 becomes available. There is no need, for example to "wait" until
918 `size` bytes are available before calling [`stream.push(chunk)`][].
919
920 #### readable.push(chunk, [encoding])
921
922 * `chunk` {Buffer | null | String} Chunk of data to push into the read queue
923 * `encoding` {String} Encoding of String chunks. Must be a valid
924 Buffer encoding, such as `'utf8'` or `'ascii'`
925 * return {Boolean} Whether or not more pushes should be performed
926
927 Note: **This function should be called by Readable implementors, NOT
928 by consumers of Readable streams.**
929
930 The `_read()` function will not be called again until at least one
931 `push(chunk)` call is made.
932
933 The `Readable` class works by putting data into a read queue to be
934 pulled out later by calling the `read()` method when the `'readable'`
935 event fires.
936
937 The `push()` method will explicitly insert some data into the read
938 queue. If it is called with `null` then it will signal the end of the
939 data (EOF).
940
941 This API is designed to be as flexible as possible. For example,
942 you may be wrapping a lower-level source which has some sort of
943 pause/resume mechanism, and a data callback. In those cases, you
944 could wrap the low-level source object by doing something like this:
945
946 ```javascript
947 // source is an object with readStop() and readStart() methods,
948 // and an `ondata` member that gets called when it has data, and
949 // an `onend` member that gets called when the data is over.
950
951 util.inherits(SourceWrapper, Readable);
952
953 function SourceWrapper(options) {
954 Readable.call(this, options);
227307e @tj Added node.*.md files
authored
955
328f9ce @tj update node
authored
956 this._source = getLowlevelSourceObject();
957 var self = this;
958
959 // Every time there's data, we push it into the internal buffer.
960 this._source.ondata = function(chunk) {
961 // if push() returns false, then we need to stop reading from source
962 if (!self.push(chunk))
963 self._source.readStop();
964 };
227307e @tj Added node.*.md files
authored
965
328f9ce @tj update node
authored
966 // When the source ends, we push the EOF-signalling `null` chunk
967 this._source.onend = function() {
968 self.push(null);
969 };
970 }
227307e @tj Added node.*.md files
authored
971
328f9ce @tj update node
authored
972 // _read will be called when the stream wants to pull more data in
973 // the advisory size argument is ignored in this case.
974 SourceWrapper.prototype._read = function(size) {
975 this._source.readStart();
976 };
977 ```
227307e @tj Added node.*.md files
authored
978
979
328f9ce @tj update node
authored
980 ### Class: stream.Writable
227307e @tj Added node.*.md files
authored
981
982 <!--type=class-->
983
328f9ce @tj update node
authored
984 `stream.Writable` is an abstract class designed to be extended with an
985 underlying implementation of the [`_write(chunk, encoding, callback)`][] method.
227307e @tj Added node.*.md files
authored
986
328f9ce @tj update node
authored
987 Please see above under [API for Stream Consumers][] for how to consume
988 writable streams in your programs. What follows is an explanation of
989 how to implement Writable streams in your programs.
227307e @tj Added node.*.md files
authored
990
328f9ce @tj update node
authored
991 #### new stream.Writable([options])
227307e @tj Added node.*.md files
authored
992
328f9ce @tj update node
authored
993 * `options` {Object}
994 * `highWaterMark` {Number} Buffer level when [`write()`][] starts
995 returning false. Default=16kb, or 16 for `objectMode` streams
996 * `decodeStrings` {Boolean} Whether or not to decode strings into
997 Buffers before passing them to [`_write()`][]. Default=true
227307e @tj Added node.*.md files
authored
998
328f9ce @tj update node
authored
999 In classes that extend the Writable class, make sure to call the
1000 constructor so that the buffering settings can be properly
1001 initialized.
227307e @tj Added node.*.md files
authored
1002
328f9ce @tj update node
authored
1003 #### writable.\_write(chunk, encoding, callback)
1004
1005 * `chunk` {Buffer | String} The chunk to be written. Will always
1006 be a buffer unless the `decodeStrings` option was set to `false`.
1007 * `encoding` {String} If the chunk is a string, then this is the
1008 encoding type. Ignore if chunk is a buffer. Note that chunk will
1009 **always** be a buffer unless the `decodeStrings` option is
1010 explicitly set to `false`.
1011 * `callback` {Function} Call this function (optionally with an error
1012 argument) when you are done processing the supplied chunk.
1013
1014 All Writable stream implementations must provide a [`_write()`][]
1015 method to send data to the underlying resource.
1016
1017 Note: **This function MUST NOT be called directly.** It should be
1018 implemented by child classes, and called by the internal Writable
1019 class methods only.
1020
1021 Call the callback using the standard `callback(error)` pattern to
1022 signal that the write completed successfully or with an error.
1023
1024 If the `decodeStrings` flag is set in the constructor options, then
1025 `chunk` may be a string rather than a Buffer, and `encoding` will
1026 indicate the sort of string that it is. This is to support
1027 implementations that have an optimized handling for certain string
1028 data encodings. If you do not explicitly set the `decodeStrings`
1029 option to `false`, then you can safely ignore the `encoding` argument,
1030 and assume that `chunk` will always be a Buffer.
1031
1032 This method is prefixed with an underscore because it is internal to
1033 the class that defines it, and should not be called directly by user
1034 programs. However, you **are** expected to override this method in
1035 your own extension classes.
1036
1037 ### writable.\_writev(chunks, callback)
1038
1039 * `chunks` {Array} The chunks to be written. Each chunk has following
1040 format: `{ chunk: ..., encoding: ... }`.
1041 * `callback` {Function} Call this function (optionally with an error
1042 argument) when you are done processing the supplied chunks.
1043
1044 Note: **This function MUST NOT be called directly.** It may be
1045 implemented by child classes, and called by the internal Writable
1046 class methods only.
1047
1048 This function is completely optional to implement. In most cases it is
1049 unnecessary. If implemented, it will be called with all the chunks
1050 that are buffered in the write queue.
1051
1052
1053 ### Class: stream.Duplex
1054
1055 <!--type=class-->
227307e @tj Added node.*.md files
authored
1056
328f9ce @tj update node
authored
1057 A "duplex" stream is one that is both Readable and Writable, such as a
1058 TCP socket connection.
227307e @tj Added node.*.md files
authored
1059
328f9ce @tj update node
authored
1060 Note that `stream.Duplex` is an abstract class designed to be extended
1061 with an underlying implementation of the `_read(size)` and
1062 [`_write(chunk, encoding, callback)`][] methods as you would with a
1063 Readable or Writable stream class.
227307e @tj Added node.*.md files
authored
1064
328f9ce @tj update node
authored
1065 Since JavaScript doesn't have multiple prototypal inheritance, this
1066 class prototypally inherits from Readable, and then parasitically from
1067 Writable. It is thus up to the user to implement both the lowlevel
1068 `_read(n)` method as well as the lowlevel
1069 [`_write(chunk, encoding, callback)`][] method on extension duplex classes.
227307e @tj Added node.*.md files
authored
1070
328f9ce @tj update node
authored
1071 #### new stream.Duplex(options)
227307e @tj Added node.*.md files
authored
1072
328f9ce @tj update node
authored
1073 * `options` {Object} Passed to both Writable and Readable
1074 constructors. Also has the following fields:
1075 * `allowHalfOpen` {Boolean} Default=true. If set to `false`, then
1076 the stream will automatically end the readable side when the
1077 writable side ends and vice versa.
227307e @tj Added node.*.md files
authored
1078
328f9ce @tj update node
authored
1079 In classes that extend the Duplex class, make sure to call the
1080 constructor so that the buffering settings can be properly
1081 initialized.
227307e @tj Added node.*.md files
authored
1082
1083
328f9ce @tj update node
authored
1084 ### Class: stream.Transform
227307e @tj Added node.*.md files
authored
1085
328f9ce @tj update node
authored
1086 A "transform" stream is a duplex stream where the output is causally
1087 connected in some way to the input, such as a [zlib][] stream or a
1088 [crypto][] stream.
227307e @tj Added node.*.md files
authored
1089
328f9ce @tj update node
authored
1090 There is no requirement that the output be the same size as the input,
1091 the same number of chunks, or arrive at the same time. For example, a
1092 Hash stream will only ever have a single chunk of output which is
1093 provided when the input is ended. A zlib stream will produce output
1094 that is either much smaller or much larger than its input.
227307e @tj Added node.*.md files
authored
1095
328f9ce @tj update node
authored
1096 Rather than implement the [`_read()`][] and [`_write()`][] methods, Transform
1097 classes must implement the `_transform()` method, and may optionally
1098 also implement the `_flush()` method. (See below.)
1099
1100 #### new stream.Transform([options])
1101
1102 * `options` {Object} Passed to both Writable and Readable
1103 constructors.
1104
1105 In classes that extend the Transform class, make sure to call the
1106 constructor so that the buffering settings can be properly
1107 initialized.
1108
1109 #### transform.\_transform(chunk, encoding, callback)
1110
1111 * `chunk` {Buffer | String} The chunk to be transformed. Will always
1112 be a buffer unless the `decodeStrings` option was set to `false`.
1113 * `encoding` {String} If the chunk is a string, then this is the
1114 encoding type. (Ignore if `decodeStrings` chunk is a buffer.)
1115 * `callback` {Function} Call this function (optionally with an error
1116 argument) when you are done processing the supplied chunk.
1117
1118 Note: **This function MUST NOT be called directly.** It should be
1119 implemented by child classes, and called by the internal Transform
1120 class methods only.
1121
1122 All Transform stream implementations must provide a `_transform`
1123 method to accept input and produce output.
1124
1125 `_transform` should do whatever has to be done in this specific
1126 Transform class, to handle the bytes being written, and pass them off
1127 to the readable portion of the interface. Do asynchronous I/O,
1128 process things, and so on.
1129
1130 Call `transform.push(outputChunk)` 0 or more times to generate output
1131 from this input chunk, depending on how much data you want to output
1132 as a result of this chunk.
1133
1134 Call the callback function only when the current chunk is completely
1135 consumed. Note that there may or may not be output as a result of any
1136 particular input chunk.
1137
1138 This method is prefixed with an underscore because it is internal to
1139 the class that defines it, and should not be called directly by user
1140 programs. However, you **are** expected to override this method in
1141 your own extension classes.
1142
1143 #### transform.\_flush(callback)
1144
1145 * `callback` {Function} Call this function (optionally with an error
1146 argument) when you are done flushing any remaining data.
1147
1148 Note: **This function MUST NOT be called directly.** It MAY be implemented
1149 by child classes, and if so, will be called by the internal Transform
1150 class methods only.
1151
1152 In some cases, your transform operation may need to emit a bit more
1153 data at the end of the stream. For example, a `Zlib` compression
1154 stream will store up some internal state so that it can optimally
1155 compress the output. At the end, however, it needs to do the best it
1156 can with what is left, so that the data will be complete.
1157
1158 In those cases, you can implement a `_flush` method, which will be
1159 called at the very end, after all the written data is consumed, but
1160 before emitting `end` to signal the end of the readable side. Just
1161 like with `_transform`, call `transform.push(chunk)` zero or more
1162 times, as appropriate, and call `callback` when the flush operation is
1163 complete.
1164
1165 This method is prefixed with an underscore because it is internal to
1166 the class that defines it, and should not be called directly by user
1167 programs. However, you **are** expected to override this method in
1168 your own extension classes.
1169
1170 #### Example: `SimpleProtocol` parser v2
1171
1172 The example above of a simple protocol parser can be implemented
1173 simply by using the higher level [Transform][] stream class, similar to
1174 the `parseHeader` and `SimpleProtocol v1` examples above.
1175
1176 In this example, rather than providing the input as an argument, it
1177 would be piped into the parser, which is a more idiomatic Node stream
1178 approach.
1179
1180 ```javascript
1181 var util = require('util');
1182 var Transform = require('stream').Transform;
1183 util.inherits(SimpleProtocol, Transform);
1184
1185 function SimpleProtocol(options) {
1186 if (!(this instanceof SimpleProtocol))
1187 return new SimpleProtocol(options);
1188
1189 Transform.call(this, options);
1190 this._inBody = false;
1191 this._sawFirstCr = false;
1192 this._rawHeader = [];
1193 this.header = null;
1194 }
1195
1196 SimpleProtocol.prototype._transform = function(chunk, encoding, done) {
1197 if (!this._inBody) {
1198 // check if the chunk has a \n\n
1199 var split = -1;
1200 for (var i = 0; i < chunk.length; i++) {
1201 if (chunk[i] === 10) { // '\n'
1202 if (this._sawFirstCr) {
1203 split = i;
1204 break;
1205 } else {
1206 this._sawFirstCr = true;
1207 }
1208 } else {
1209 this._sawFirstCr = false;
1210 }
1211 }
1212
1213 if (split === -1) {
1214 // still waiting for the \n\n
1215 // stash the chunk, and try again.
1216 this._rawHeader.push(chunk);
1217 } else {
1218 this._inBody = true;
1219 var h = chunk.slice(0, split);
1220 this._rawHeader.push(h);
1221 var header = Buffer.concat(this._rawHeader).toString();
1222 try {
1223 this.header = JSON.parse(header);
1224 } catch (er) {
1225 this.emit('error', new Error('invalid simple protocol data'));
1226 return;
1227 }
1228 // and let them know that we are done parsing the header.
1229 this.emit('header', this.header);
1230
1231 // now, because we got some extra data, emit this first.
1232 this.push(chunk.slice(split));
1233 }
1234 } else {
1235 // from there on, just provide the data to our consumer as-is.
1236 this.push(chunk);
1237 }
1238 done();
1239 };
1240
1241 // Usage:
1242 // var parser = new SimpleProtocol();
1243 // source.pipe(parser)
1244 // Now parser is a readable stream that will emit 'header'
1245 // with the parsed header data.
1246 ```
1247
1248
1249 ### Class: stream.PassThrough
1250
1251 This is a trivial implementation of a [Transform][] stream that simply
1252 passes the input bytes across to the output. Its purpose is mainly
1253 for examples and testing, but there are occasionally use cases where
1254 it can come in handy as a building block for novel sorts of streams.
1255
1256
1257 ## Streams: Under the Hood
1258
1259 <!--type=misc-->
1260
1261 ### Buffering
1262
1263 <!--type=misc-->
1264
1265 Both Writable and Readable streams will buffer data on an internal
1266 object called `_writableState.buffer` or `_readableState.buffer`,
1267 respectively.
1268
1269 The amount of data that will potentially be buffered depends on the
1270 `highWaterMark` option which is passed into the constructor.
1271
1272 Buffering in Readable streams happens when the implementation calls
1273 [`stream.push(chunk)`][]. If the consumer of the Stream does not call
1274 `stream.read()`, then the data will sit in the internal queue until it
1275 is consumed.
1276
1277 Buffering in Writable streams happens when the user calls
1278 [`stream.write(chunk)`][] repeatedly, even when `write()` returns `false`.
1279
1280 The purpose of streams, especially with the `pipe()` method, is to
1281 limit the buffering of data to acceptable levels, so that sources and
1282 destinations of varying speed will not overwhelm the available memory.
1283
1284 ### `stream.read(0)`
1285
1286 There are some cases where you want to trigger a refresh of the
1287 underlying readable stream mechanisms, without actually consuming any
1288 data. In that case, you can call `stream.read(0)`, which will always
1289 return null.
1290
1291 If the internal read buffer is below the `highWaterMark`, and the
1292 stream is not currently reading, then calling `read(0)` will trigger
1293 a low-level `_read` call.
1294
1295 There is almost never a need to do this. However, you will see some
1296 cases in Node's internals where this is done, particularly in the
1297 Readable stream class internals.
1298
1299 ### `stream.push('')`
1300
1301 Pushing a zero-byte string or Buffer (when not in [Object mode][]) has an
1302 interesting side effect. Because it *is* a call to
1303 [`stream.push()`][], it will end the `reading` process. However, it
1304 does *not* add any data to the readable buffer, so there's nothing for
1305 a user to consume.
227307e @tj Added node.*.md files
authored
1306
328f9ce @tj update node
authored
1307 Very rarely, there are cases where you have no data to provide now,
1308 but the consumer of your stream (or, perhaps, another bit of your own
1309 code) will know when to check again, by calling `stream.read(0)`. In
1310 those cases, you *may* call `stream.push('')`.
227307e @tj Added node.*.md files
authored
1311
328f9ce @tj update node
authored
1312 So far, the only use case for this functionality is in the
1313 [tls.CryptoStream][] class, which is deprecated in Node v0.12. If you
1314 find that you have to use `stream.push('')`, please consider another
1315 approach, because it almost certainly indicates that something is
1316 horribly wrong.
227307e @tj Added node.*.md files
authored
1317
328f9ce @tj update node
authored
1318 ### Compatibility with Older Node Versions
227307e @tj Added node.*.md files
authored
1319
328f9ce @tj update node
authored
1320 <!--type=misc-->
227307e @tj Added node.*.md files
authored
1321
328f9ce @tj update node
authored
1322 In versions of Node prior to v0.10, the Readable stream interface was
1323 simpler, but also less powerful and less useful.
227307e @tj Added node.*.md files
authored
1324
328f9ce @tj update node
authored
1325 * Rather than waiting for you to call the `read()` method, `'data'`
1326 events would start emitting immediately. If you needed to do some
1327 I/O to decide how to handle data, then you had to store the chunks
1328 in some kind of buffer so that they would not be lost.
1329 * The [`pause()`][] method was advisory, rather than guaranteed. This
1330 meant that you still had to be prepared to receive `'data'` events
1331 even when the stream was in a paused state.
227307e @tj Added node.*.md files
authored
1332
328f9ce @tj update node
authored
1333 In Node v0.10, the Readable class described below was added. For
1334 backwards compatibility with older Node programs, Readable streams
1335 switch into "flowing mode" when a `'data'` event handler is added, or
1336 when the [`resume()`][] method is called. The effect is that, even if
1337 you are not using the new `read()` method and `'readable'` event, you
1338 no longer have to worry about losing `'data'` chunks.
227307e @tj Added node.*.md files
authored
1339
328f9ce @tj update node
authored
1340 Most programs will continue to function normally. However, this
1341 introduces an edge case in the following conditions:
227307e @tj Added node.*.md files
authored
1342
328f9ce @tj update node
authored
1343 * No [`'data'` event][] handler is added.
1344 * The [`resume()`][] method is never called.
1345 * The stream is not piped to any writable destination.
227307e @tj Added node.*.md files
authored
1346
328f9ce @tj update node
authored
1347 For example, consider the following code:
227307e @tj Added node.*.md files
authored
1348
328f9ce @tj update node
authored
1349 ```javascript
1350 // WARNING! BROKEN!
1351 net.createServer(function(socket) {
227307e @tj Added node.*.md files
authored
1352
328f9ce @tj update node
authored
1353 // we add an 'end' method, but never consume the data
1354 socket.on('end', function() {
1355 // It will never get here.
1356 socket.end('I got your message (but didnt read it)\n');
1357 });
227307e @tj Added node.*.md files
authored
1358
328f9ce @tj update node
authored
1359 }).listen(1337);
1360 ```
1361
1362 In versions of node prior to v0.10, the incoming message data would be
1363 simply discarded. However, in Node v0.10 and beyond, the socket will
1364 remain paused forever.
1365
1366 The workaround in this situation is to call the `resume()` method to
1367 start the flow of data:
1368
1369 ```javascript
1370 // Workaround
1371 net.createServer(function(socket) {
1372
1373 socket.on('end', function() {
1374 socket.end('I got your message (but didnt read it)\n');
1375 });
1376
1377 // start the flow of data, discarding it.
1378 socket.resume();
1379
1380 }).listen(1337);
1381 ```
1382
1383 In addition to new Readable streams switching into flowing mode,
1384 pre-v0.10 style streams can be wrapped in a Readable class using the
1385 `wrap()` method.
1386
1387
1388 ### Object Mode
1389
1390 <!--type=misc-->
1391
1392 Normally, Streams operate on Strings and Buffers exclusively.
1393
1394 Streams that are in **object mode** can emit generic JavaScript values
1395 other than Buffers and Strings.
1396
1397 A Readable stream in object mode will always return a single item from
1398 a call to `stream.read(size)`, regardless of what the size argument
1399 is.
1400
1401 A Writable stream in object mode will always ignore the `encoding`
1402 argument to `stream.write(data, encoding)`.
1403
1404 The special value `null` still retains its special value for object
1405 mode streams. That is, for object mode readable streams, `null` as a
1406 return value from `stream.read()` indicates that there is no more
1407 data, and [`stream.push(null)`][] will signal the end of stream data
1408 (`EOF`).
1409
1410 No streams in Node core are object mode streams. This pattern is only
1411 used by userland streaming libraries.
1412
1413 You should set `objectMode` in your stream child class constructor on
1414 the options object. Setting `objectMode` mid-stream is not safe.
1415
1416 ### State Objects
1417
1418 [Readable][] streams have a member object called `_readableState`.
1419 [Writable][] streams have a member object called `_writableState`.
1420 [Duplex][] streams have both.
1421
1422 **These objects should generally not be modified in child classes.**
1423 However, if you have a Duplex or Transform stream that should be in
1424 `objectMode` on the readable side, and not in `objectMode` on the
1425 writable side, then you may do this in the constructor by setting the
1426 flag explicitly on the appropriate state object.
1427
1428 ```javascript
1429 var util = require('util');
1430 var StringDecoder = require('string_decoder').StringDecoder;
1431 var Transform = require('stream').Transform;
1432 util.inherits(JSONParseStream, Transform);
1433
1434 // Gets \n-delimited JSON string data, and emits the parsed objects
1435 function JSONParseStream(options) {
1436 if (!(this instanceof JSONParseStream))
1437 return new JSONParseStream(options);
1438
1439 Transform.call(this, options);
1440 this._writableState.objectMode = false;
1441 this._readableState.objectMode = true;
1442 this._buffer = '';
1443 this._decoder = new StringDecoder('utf8');
1444 }
1445
1446 JSONParseStream.prototype._transform = function(chunk, encoding, cb) {
1447 this._buffer += this._decoder.write(chunk);
1448 // split on newlines
1449 var lines = this._buffer.split(/\r?\n/);
1450 // keep the last partial line buffered
1451 this._buffer = lines.pop();
1452 for (var l = 0; l < lines.length; l++) {
1453 var line = lines[l];
1454 try {
1455 var obj = JSON.parse(line);
1456 } catch (er) {
1457 this.emit('error', er);
1458 return;
1459 }
1460 // push the parsed object out to the readable consumer
1461 this.push(obj);
1462 }
1463 cb();
1464 };
1465
1466 JSONParseStream.prototype._flush = function(cb) {
1467 // Just handle any leftover
1468 var rem = this._buffer.trim();
1469 if (rem) {
1470 try {
1471 var obj = JSON.parse(rem);
1472 } catch (er) {
1473 this.emit('error', er);
1474 return;
1475 }
1476 // push the parsed object out to the readable consumer
1477 this.push(obj);
1478 }
1479 cb();
1480 };
1481 ```
1482
1483 The state objects contain other useful information for debugging the
1484 state of streams in your programs. It is safe to look at them, but
1485 beyond setting option flags in the constructor, it is **not** safe to
1486 modify them.
1487
1488
1489 [EventEmitter]: events.html#events_class_events_eventemitter
1490 [Object mode]: #stream_object_mode
1491 [`stream.push(chunk)`]: #stream_readable_push_chunk_encoding
1492 [`stream.push(null)`]: #stream_readable_push_chunk_encoding
1493 [`stream.push()`]: #stream_readable_push_chunk_encoding
1494 [`unpipe()`]: #stream_readable_unpipe_destination
1495 [unpiped]: #stream_readable_unpipe_destination
1496 [tcp sockets]: net.html#net_class_net_socket
1497 [zlib streams]: zlib.html
1498 [zlib]: zlib.html
1499 [crypto streams]: crypto.html
1500 [crypto]: crypto.html
1501 [tls.CryptoStream]: tls.html#tls_class_cryptostream
1502 [process.stdin]: process.html#process_process_stdin
1503 [stdout]: process.html#process_process_stdout
1504 [process.stdout]: process.html#process_process_stdout
1505 [process.stderr]: process.html#process_process_stderr
1506 [child process stdout and stderr]: child_process.html#child_process_child_stdout
1507 [API for Stream Consumers]: #stream_api_for_stream_consumers
1508 [API for Stream Implementors]: #stream_api_for_stream_implementors
1509 [Readable]: #stream_class_stream_readable
1510 [Writable]: #stream_class_stream_writable
1511 [Duplex]: #stream_class_stream_duplex
1512 [Transform]: #stream_class_stream_transform
1513 [`_read(size)`]: #stream_readable_read_size_1
1514 [`_read()`]: #stream_readable_read_size_1
1515 [_read]: #stream_readable_read_size_1
1516 [`writable.write(chunk)`]: #stream_writable_write_chunk_encoding_callback
1517 [`write(chunk, encoding, callback)`]: #stream_writable_write_chunk_encoding_callback
1518 [`write()`]: #stream_writable_write_chunk_encoding_callback
1519 [`stream.write(chunk)`]: #stream_writable_write_chunk_encoding_callback
1520 [`_write(chunk, encoding, callback)`]: #stream_writable_write_chunk_encoding_callback_1
1521 [`_write()`]: #stream_writable_write_chunk_encoding_callback_1
1522 [_write]: #stream_writable_write_chunk_encoding_callback_1
1523 [`util.inherits`]: util.html#util_util_inherits_constructor_superconstructor
1524 [`end()`]: #stream_writable_end_chunk_encoding_callback
1525 [`'data'` event]: #stream_event_data
1526 [`resume()`]: #stream_readable_resume
1527 [`readable.resume()`]: #stream_readable_resume
1528 [`pause()`]: #stream_readable_pause
1529 [`unpipe()`]: #stream_readable_unpipe_destination
1530 [`pipe()`]: #stream_readable_pipe_destination_options
Something went wrong with that request. Please try again.