Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
how to write node programs with streams
tree: 6ad2b07043

Fetching latest commit…

Cannot retrieve the latest commit at this time

Failed to load latest commit information.
readme.markdown

readme.markdown

introduction

This document covers the basics of how to write node.js programs with streams.

"We should have some ways of connecting programs like garden hose--screw in
another segment when it becomes necessary to massage data in
another way. This is the way of IO also."

Doug McIlroy. October 11, 1964


Streams come to us from the earliest days of unix and have proven themselves over the decades as a dependable way to compose large systems out of small components that do one thing well. In unix, streams are implemented by the shell with | pipes. In node, the built-in stream module is used by the core libraries and can also be used by user-space modules. Similar to unix, the node stream module's primary composition operator is called .pipe() and you get a backpressure mechanism for free to throttle writes for slow consumers.

Streams can help to separate your concerns because they restrict the implementation surface area into a consistent interface that can be reused. You can then plug the output of one stream to the input of another and use libraries that operate abstractly on streams to institute higher-level flow control.

Streams are an important component of small-program design and unix philosophy but there are many other important abstractions worth considering. Just remember that technical debt is the enemy and to seek the best abstractions for the problem at hand.


why you should use streams

I/O in node is asynchronous, so interacting with the disk and network involves passing callbacks to functions. You might be tempted to write code that serves up a file from disk like this:

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        if (err) {
            res.statusCode = 500;
            res.end(String(err));
        }
        else res.end(data);
    });
});
server.listen(8000);

This code works but it's bulky and buffers up the entire data.txt file into memory for every request before writing the result back to clients. If data.txt is very large, your program could start eating a lot of memory as it serves lots of users concurrently. The latency will also be high as users will need to wait for the entire file to be read before they start receiving the contents.

Luckily both of the (req, res) arguments are streams, which means we can write this in a much better way using fs.createReadStream() instead of fs.readFile():

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.on('error', function (err) {
        res.statusCode = 500;
        res.end(String(err));
    });
    stream.pipe(res);
});
server.listen(8000);

Here .pipe() takes care of listening for 'data' and 'end' events from the fs.createReadStream(). This code is not only cleaner, but now the data.txt file will be written to clients one chunk at a time immediately as they are received from the disk.

Using .pipe() has other benefits too, like handling backpressure automatically so that node won't buffer chunks into memory needlessly when the remote client is on a really slow or high-latency connection.

But this example, while much better than the first one, is still rather verbose. The biggest benefit of streams is their versatility. We can use a module that operates on streams to make that example even simpler:

var http = require('http');
var filed = require('filed');

var server = http.createServer(function (req, res) {
    filed(__dirname + '/data.txt').pipe(res);
});
server.listen(8000);

With the filed module we get mime types, etag caching, and error handling for free in addition to a nice streaming API.

Want compression? There are streaming modules for that too!

var http = require('http');
var filed = require('filed');
var oppressor = require('oppressor');

var server = http.createServer(function (req, res) {
    filed(__dirname + '/data.txt')
        .pipe(oppressor(req))
        .pipe(res)
    ;
});
server.listen(8000);

Now our file is compressed for browsers that support gzip or deflate! We can just let oppressor handle all that content-encoding stuff.

Once you learn the stream api, you can just snap together these streaming modules like lego bricks or garden hoses instead of having to remember how to push data through wonky non-streaming custom APIs.

Streams make programming in node simple, elegant, and composable.

basics

Streams are just EventEmitters that have a .pipe() function and expected to act in a certain way depending if the stream is readable, writable, or both (duplex).

To create a new stream, just do:

var Stream = require('stream');
var s = new Stream;

This new stream doesn't yet do anything because it is neither readable nor writable.

readable

To make that stream s into a readable stream, all we need to do is set the readable property to true:

s.readable = true

Readable streams emit many 'data' events and a single 'end' event. Your stream shouldn't emit any more 'data' events after it emits the 'end' event.

This simple readable stream emits one 'data' event per second for 5 seconds, then it ends. The data is piped to stdout so we can watch the results as they

var Stream = require('stream');

function createStream () {
    var s = new Stream;
    s.readable = true

    var times = 0;
    var iv = setInterval(function () {
        s.emit('data', times + '\n');
        if (++times === 5) {
            s.emit('end');
            clearInterval(iv);
        }
    }, 1000);

    return s;
}

createStream().pipe(process.stdout);
substack : ~ $ node rs.js
0
1
2
3
4
substack : ~ $ 

In this example the 'data' events have a string payload as the first argument. Buffers and strings are the most common types of data to stream but it's sometimes useful to emit other types of objects.

Just make sure that the types you're emitting as data is compatible with the types that the writable stream you're piping into expects. Otherwise you can pipe into an intermediary conversion or parsing stream before piping to your intended destination.

writable

Writable streams

duplex

Duplex streams are just streams that are both readable and writable.

pipe

.pipe() is the only member function of the built-in Stream prototype.

.pipe(target) returns the destination stream, target. This means you can chain .pipe() calls together like in the shell with |.

pause / resume / drain

backpressure

destroy

stream-spec

read more

the future

A big upgrade is planned for the stream api in node 0.9. The basic apis with .pipe() will be the same, only the internals are going to be different. The new api will also be backwards compatible with the existing api documented here for a long time.

You can check the readable-stream repo to see what these future streams will look like.


built-in streams

These streams are built into node itself.

process.stdin

This readable stream contains the standard system input stream for your program.

It is paused by default but the first time you refer to it .resume() will be called implicitly on the next tick.

If process.stdin is a tty (check with tty.isatty()) then input events will be line-buffered. You can turn off line-buffering by calling process.stdin.setRawMode(true) BUT the default handlers for key combinations such as ^C and ^D will be removed.

process.stdout

process.stderr

child_process.spawn()

fs.createReadStream()

fs.createWriteStream()

net.connect()

This function returns a [duplex stream] that connects over tcp to a remote host.

You can start writing to the stream right away and the writes will be buffered until the 'connect' event fires.

net.createServer()

http.request()

http.createServer()

zlib.createGzip()

zlib.createGunzip()


control streams

through

from

pause-stream

concat-stream

duplex

duplexer

emit-stream

invert-stream

map-stream

remote-events

buffer-stream

event-stream

auth-stream


meta streams

mux-demux

stream-router

multi-channel-mdm


state streams

cdrt

delta-stream


http streams

request

filed

oppressor

response-stream


io streams

reconnect

kv

discovery-network


parser streams

tar

trumpet

JSONStream

Use this module to parse and stringify json data from streams.

If you need to pass a large json collection through a slow connection or you have a json object that will populate slowly this module will let you parse data incrementally as it arrives.

json-scrape

stream-serializer


browser streams

shoe

domnode

sorta

graph-stream

arrow-keys

attribute

data-bind


rpc streams

dnode

rpc-stream


test streams

tap

stream-spec

Something went wrong with that request. Please try again.