Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
how to write node programs with streams
JavaScript HTML CSS Shell
Tree: 324ce7f3f6

Fetching latest commit…

Cannot retrieve the latest commit at this time

Failed to load latest commit information.
example
readme.markdown

readme.markdown

introduction

This document covers the basics of how to write node.js programs with streams.

"We should have some ways of connecting programs like garden hose--screw in
another segment when it becomes necessary to massage data in
another way. This is the way of IO also."

Doug McIlroy. October 11, 1964

doug mcilroy


Streams come to us from the earliest days of unix and have proven themselves over the decades as a dependable way to compose large systems out of small components that do one thing well. In unix, streams are implemented by the shell with | pipes. In node, the built-in stream module is used by the core libraries and can also be used by user-space modules. Similar to unix, the node stream module's primary composition operator is called .pipe() and you get a backpressure mechanism for free to throttle writes for slow consumers.

Streams can help to separate your concerns because they restrict the implementation surface area into a consistent interface that can be reused. You can then plug the output of one stream to the input of another and use libraries that operate abstractly on streams to institute higher-level flow control.

Streams are an important component of small-program design and unix philosophy but there are many other important abstractions worth considering. Just remember that technical debt is the enemy and to seek the best abstractions for the problem at hand.

brian kernighan


why you should use streams

I/O in node is asynchronous, so interacting with the disk and network involves passing callbacks to functions. You might be tempted to write code that serves up a file from disk like this:

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        if (err) {
            res.statusCode = 500;
            res.end(String(err));
        }
        else res.end(data);
    });
});
server.listen(8000);

This code works but it's bulky and buffers up the entire data.txt file into memory for every request before writing the result back to clients. If data.txt is very large, your program could start eating a lot of memory as it serves lots of users concurrently. The latency will also be high as users will need to wait for the entire file to be read before they start receiving the contents.

Luckily both of the (req, res) arguments are streams, which means we can write this in a much better way using fs.createReadStream() instead of fs.readFile():

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.on('error', function (err) {
        res.statusCode = 500;
        res.end(String(err));
    });
    stream.pipe(res);
});
server.listen(8000);

Here .pipe() takes care of listening for 'data' and 'end' events from the fs.createReadStream(). This code is not only cleaner, but now the data.txt file will be written to clients one chunk at a time immediately as they are received from the disk.

Using .pipe() has other benefits too, like handling backpressure automatically so that node won't buffer chunks into memory needlessly when the remote client is on a really slow or high-latency connection.

Want compression? There are streaming modules for that too!

var http = require('http');
var fs = require('fs');
var oppressor = require('oppressor');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.on('error', function (err) {
        res.statusCode = 500;
        res.end(String(err));
    });

    stream.pipe(oppressor(req)).pipe(res);
});
server.listen(8000);

Now our file is compressed for browsers that support gzip or deflate! We can just let oppressor handle all that content-encoding stuff.

Once you learn the stream api, you can just snap together these streaming modules like lego bricks or garden hoses instead of having to remember how to push data through wonky non-streaming custom APIs.

Streams make programming in node simple, elegant, and composable.

basics

Streams are just EventEmitters that have a .pipe() function and expected to act in a certain way depending if the stream is readable, writable, or both (duplex).

To create a new stream, just do:

var Stream = require('stream');
var s = new Stream;

This new stream doesn't yet do anything because it is neither readable nor writable.

readable

To make that stream s into a readable stream, all we need to do is set the readable property to true:

s.readable = true

Readable streams emit many 'data' events and a single 'end' event. Your stream shouldn't emit any more 'data' events after it emits the 'end' event.

This simple readable stream emits one 'data' event per second for 5 seconds, then it ends. The data is piped to stdout so we can watch the results as they happen.

var Stream = require('stream');

function createStream () {
    var s = new Stream;
    s.readable = true

    var times = 0;
    var iv = setInterval(function () {
        s.emit('data', times + '\n');
        if (++times === 5) {
            s.emit('end');
            clearInterval(iv);
        }
    }, 1000);

    return s;
}

createStream().pipe(process.stdout);
substack : ~ $ node rs.js
0
1
2
3
4
substack : ~ $ 

In this example the 'data' events have a string payload as the first argument. Buffers and strings are the most common types of data to stream but it's sometimes useful to emit other types of objects.

Just make sure that the types you're emitting as data is compatible with the types that the writable stream you're piping into expects. Otherwise you can pipe into an intermediary conversion or parsing stream before piping to your intended destination.

writable

Writable streams are streams that can accept input. To create a writable stream, set the writable attribute to true and define write(), end(), and destroy().

This writable stream will count all the bytes from an input stream and print the result on a clean end(). If the stream is destroyed it will do nothing.

var Stream = require('stream');
var s = new Stream;
s.writable = true;

var bytes = 0;

s.write = function (buf) {
    bytes += buf.length;
};

s.end = function (buf) {
    if (arguments.length) s.write(buf);

    s.writable = false;
    console.log(bytes + ' bytes written');
};

s.destroy = function () {
    s.writable = false;
};

If we pipe a file to this writable stream:

var fs = require('fs');
fs.createReadStream('/etc/passwd').pipe(s);
$ node writable.js
2447 bytes written

One thing to watch out for is the convention in node to treat end(buf) as a write(buf) then an end(). If you skip this it could lead to confusion because people expect end to behave the way it does in core.

backpressure

Backpressure is the mechanism that streams use to make sure that readable streams don't emit data faster than writable streams can consume data.

Note: the API for handling backpressure is changing substantially in future versions of node (> 0.8). pause(), resume(), and emit('drain') are scheduled for demolition. The notice has been on display in the local planning office for months.

In order to do backpressure correctly readable streams should implement pause() and resume(). Writable streams return false in .write() when they want the readable streams piped into them to slow down and emit 'drain' when they're ready for more data again.

writable stream backpressure

When a writable stream wants a readable stream to slow down it should return false in its .write() function. This causes the pause() to be called on each readable stream source.

When the writable stream is ready to start receiving data again, it should emit the 'drain' event. Emitting 'drain' causes the resume() function to be called on each readable stream source.

readable stream backpressure

When pause() is called on a readable stream, it means that a downstream writable stream wants the upstream to slow down. The readable stream that pause() was called on should stop emitting data but that isn't always possible.

When the downstream is ready for more data, the readable stream's resume() function will be called.

pipe

.pipe() is the glue that shuffles data from readable streams into writable streams and handles backpressure. The pipe api is just:

src.pipe(dst)

for a readable stream src and a writable stream dst. .pipe() returns the dst so if dst is also a readable stream, you can chain .pipe() calls together like:

a.pipe(b).pipe(c).pipe(d)

which resembles what you might do in the shell with the | operator:

a | b | c | d

The a.pipe(b).pipe(c).pipe(d) usage is the same as:

a.pipe(b);
b.pipe(c);
c.pipe(d);

The stream implementation in core is just an event emitter with a pipe function. pipe() is pretty short. You should read the source code.

terms

These terms are useful for talking about streams.

through

Through streams are simple readable/writable filters that transform input and produce output.

duplex

Duplex streams are readable/writable and both ends of the stream engage in a two-way interaction, sending back and forth messages like a telephone. An rpc exchange is a good example of a duplex stream. Any time you see something like:

a.pipe(b).pipe(a)

you're probably dealing with a duplex stream.

read more

the future

A big upgrade is planned for the stream api in node 0.9. The basic apis with .pipe() will be the same, only the internals are going to be different. The new api will also be backwards compatible with the existing api documented here for a long time.

You can check the readable-stream repo to see what these future streams will look like.


built-in streams

These streams are built into node itself.

process

process.stdin

This readable stream contains the standard system input stream for your program.

It is paused by default but the first time you refer to it .resume() will be called implicitly on the next tick.

If process.stdin is a tty (check with tty.isatty()) then input events will be line-buffered. You can turn off line-buffering by calling process.stdin.setRawMode(true) BUT the default handlers for key combinations such as ^C and ^D will be removed.

process.stdout

This writable stream contains the standard system output stream for your program.

write to it if you want send data to stdout

process.stderr

This writable stream contains the standard system error stream for your program.

write to it if you want send data to stderr

child_process.spawn()

fs

fs.createReadStream()

fs.createWriteStream()

net

net.connect()

This function returns a [duplex stream] that connects over tcp to a remote host.

You can start writing to the stream right away and the writes will be buffered until the 'connect' event fires.

net.createServer()

http

http.request()

http.createServer()

zlib

zlib.createGzip()

zlib.createGunzip()

zlib.createDeflate()

zlib.createInflate()


control streams

through

from

pause-stream

concat-stream

concat-stream will buffer up stream contents into a single buffer. concat(cb) just takes a single callback cb(body) with the buffered body when the stream has finished.

For example, in this program, the concat callback fires with the body string "beep boop" once cs.end() is called. The program takes the body and upper-cases it, printing BEEP BOOP.

var concat = require('concat-stream');

var cs = concat(function (body) {
    console.log(body.toUpperCase());
});
cs.write('beep ');
cs.write('boop.');
cs.end();
$ node concat.js
BEEP BOOP.

Here's an example usage of concat-stream that will parse incoming url-encoded form data and reply with a stringified JSON version of the form parameters:

var http = require('http');
var qs = require('querystring');
var concat = require('concat-stream');

var server = http.createServer(function (req, res) {
    req.pipe(concat(function (body) {
        var params = qs.parse(body);
        res.end(JSON.stringify(params) + '\n');
    }));
});
server.listen(5005);
$ curl -X POST -d 'beep=boop&dinosaur=trex' http://localhost:5005
{"beep":"boop","dinosaur":"trex"}

duplex

duplexer

emit-stream

invert-stream

map-stream

remote-events

buffer-stream

event-stream

auth-stream


meta streams

mux-demux

stream-router

multi-channel-mdm


state streams

crdt

delta-stream

scuttlebutt

scuttlebutt can be used for peer-to-peer state synchronization with a mesh topology where nodes might only be connected through intermediaries and there is no node with an authoritative version of all the data.

The kind of distributed peer-to-peer network that scuttlebutt provides is especially useful when nodes on different sides of network barriers need to share and update the same state. An example of this kind of network might be browser clients that send messages through an http server to each other and backend processes that the browsers can't directly connect to. Another use-case might be systems that span internal networks since IPv4 addresses are scarce.

scuttlebutt uses a gossip protocol to pass messages between connected nodes so that state across all the nodes will eventually converge on the same value everywhere.

Using the scuttlebutt/model interface, we can create some nodes and pipe them to each other to create whichever sort of network we want:

var Model = require('scuttlebutt/model');
var am = new Model;
var as = am.createStream();

var bm = new Model;
var bs = bm.createStream();

var cm = new Model;
var cs = cm.createStream();

var dm = new Model;
var ds = dm.createStream();

var em = new Model;
var es = em.createStream();

as.pipe(bs).pipe(as);
bs.pipe(cs).pipe(bs);
bs.pipe(ds).pipe(bs);
ds.pipe(es).pipe(ds);

em.on('update', function (key, value, source) {
    console.log(key + ' => ' + value + ' from ' + source);
});

am.set('x', 555);

The network we've created is an undirected graph that looks like:

a <-> b <-> c
      ^
      |
      v
      d <-> e

Note that nodes a and e aren't directly connected, but when we run this script:

$ node model.js
x => 555 from 1347857300518

the value that node a set finds its way to node e by way of nodes b and d. Here all the nodes are in the same process but because scuttlebutt uses a simple streaming interface, the nodes can be placed on any process or server and connected with any streaming transport that can handle string data.

Next we can make a more realistic example that connects over the network and increments a counter variable.

Here's the server which will set the initial count value to 0 and count ++ every 320 milliseconds, printing all updates to count:

var Model = require('scuttlebutt/model');
var net = require('net');

var m = new Model;
m.set('count', '0');
m.on('update', function (key, value) {
    console.log(key + ' = ' + m.get('count'));
});

var server = net.createServer(function (stream) {
    stream.pipe(m.createStream()).pipe(stream);
});
server.listen(8888);

setInterval(function () {
    m.set('count', Number(m.get('count')) + 1);
}, 320);

Now we can make a client that connects to this server, updates the count on an interval, and prints all the updates it receives:

var Model = require('scuttlebutt/model');
var net = require('net');

var m = new Model;
var s = m.createStream();

s.pipe(net.connect(8888, 'localhost')).pipe(s);

m.on('update', function cb (key) {
    // wait until we've gotten at least one count value from the network
    if (key !== 'count') return;
    m.removeListener('update', cb);

    setInterval(function () {
        m.set('count', Number(m.get('count')) + 1);
    }, 100);
});

m.on('update', function (key, value) {
    console.log(key + ' = ' + value);
});

The client is slightly trickier since it should wait until it has an update from somebody else to start updating the counter itself or else its counter would be zeroed.

Once we get the server and some clients running we should see a sequence like this:

count = 183
count = 184
count = 185
count = 186
count = 187
count = 188
count = 189

Occasionally on some of the nodes we might see a sequence with repeated values like:

count = 147
count = 148
count = 149
count = 149
count = 150
count = 151

These values are due to scuttlebutt's history-based conflict resolution algorithm which is hard at work ensuring that the state of the system across all nodes is eventually consistent.

Note that the server in this example is just another node with the same privledges as the clients connected to it. The terms "client" and "server" here don't affect how the state synchronization proceeds, just who initiates the connection. Protocols with this property are often called symmetric protocols. See dnode for another example of a symmetric protocol.

append-only


http streams

request

oppressor

response-stream


io streams

reconnect

kv

discovery-network


parser streams

tar

trumpet

JSONStream

Use this module to parse and stringify json data from streams.

If you need to pass a large json collection through a slow connection or you have a json object that will populate slowly this module will let you parse data incrementally as it arrives.

json-scrape

stream-serializer


browser streams

shoe

domnode

sorta

graph-stream

arrow-keys

attribute

data-bind


html streams

hyperstream

audio streams

baudio

rpc streams

dnode

dnode lets you call remote functions through any kind of stream.

Here's a basic dnode server:

var dnode = require('dnode');
var net = require('net');

var server = net.createServer(function (c) {
    var d = dnode({
        transform : function (s, cb) {
            cb(s.replace(/[aeiou]{2,}/, 'oo').toUpperCase())
        }
    });
    c.pipe(d).pipe(c);
});

server.listen(5004);

then you can hack up a simple client that calls the server's .transform() function:

var dnode = require('dnode');
var net = require('net');

var d = dnode();
d.on('remote', function (remote) {
    remote.transform('beep', function (s) {
        console.log('beep => ' + s);
        d.end();
    });
});

var c = net.connect(5004);
c.pipe(d).pipe(c);

Fire up the server, then when you run the client you should see:

$ node client.js
beep => BOOP

The client sent 'beep' to the server's transform() function and the server called the client's callback with the result, neat!

The streaming interface that dnode provides here is a duplex stream since both the client and server are piped to each other (c.pipe(d).pipe(c)) with requests and responses coming from both sides.

The craziness of dnode begins when you start to pass function arguments to stubbed callbacks. Here's an updated version of the previous server with a multi-stage callback passing dance:

var dnode = require('dnode');
var net = require('net');

var server = net.createServer(function (c) {
    var d = dnode({
        transform : function (s, cb) {
            cb(function (n, fn) {
                var oo = Array(n+1).join('o');
                fn(s.replace(/[aeiou]{2,}/, oo).toUpperCase());
            });
        }
    });
    c.pipe(d).pipe(c);
});

server.listen(5004);

Here's the updated client:

var dnode = require('dnode');
var net = require('net');

var d = dnode();
d.on('remote', function (remote) {
    remote.transform('beep', function (cb) {
        cb(10, function (s) {
            console.log('beep:10 => ' + s);
            d.end();
        });
    });
});

var c = net.connect(5004);
c.pipe(d).pipe(c);

After we spin up the server, when we run the client now we get:

$ node client.js
beep:10 => BOOOOOOOOOOP

It just works!™

The basic idea is that you just put functions in objects and you call them from the other side of a stream and the functions will be stubbed out on the other end to do a round-trip back to the side that had the original function in the first place. The best thing is that when you pass functions to a stubbed function as arguments, those functions get stubbed out on the other side!

This approach of stubbing function arguments recursively shall henceforth be known as the "turtles all the way down" gambit. The return values of any of your functions will be ignored and only enumerable properties on objects will be sent, json-style.

It's turtles all the way down!

turtles all the way

Since dnode works in node or on the browser over any stream it's easy to call functions defined anywhere and especially useful when paired up with mux-demux to multiplex an rpc stream for control alongside some bulk data streams.

rpc-stream


test streams

tap

stream-spec


power combos

distributed partition-tolerant chat

The append-only module can give us a convenient append-only array on top of scuttlebutt which makes it really easy to write an eventually-consistent, distributed chat that can replicate with other nodes and survive network partitions.

TODO: the rest

roll your own socket.io

We can build a socket.io-style event emitter api over streams using some of the libraries mentioned earlier in this document.

First we can use shoe to create a new websocket handler server-side and emit-stream to turn an event emitter into a stream that emits objects. The object stream can then be fed into JSONStream to serialize the objects and from there the serialized stream can be piped into the remote browser.

var EventEmitter = require('events').EventEmitter;
var shoe = require('shoe');
var emitStream = require('emit-stream');
var JSONStream = require('JSONStream');

var sock = shoe(function (stream) {
    var ev = new EventEmitter;
    emitStream(ev)
        .pipe(JSONStream.stringify())
        .pipe(stream)
    ;
    ...
});

Inside the shoe callback we can emit events to the ev function. Here we'll just emit different kinds of events on intervals:

var intervals = [];

intervals.push(setInterval(function () {
    ev.emit('upper', 'abc');
}, 500));

intervals.push(setInterval(function () {
    ev.emit('lower', 'def');
}, 300));

stream.on('end', function () {
    intervals.forEach(clearInterval);
});

Finally the shoe instance just needs to be bound to an http server:

var http = require('http');
var server = http.createServer(require('ecstatic')(__dirname));
server.listen(8080);

sock.install(server, '/sock');

Meanwhile on the browser side of things just parse the json shoe stream and pass the resulting object stream to eventStream(). eventStream() just returns an event emitter that emits the server-side events:

var shoe = require('shoe');
var emitStream = require('emit-stream');
var JSONStream = require('JSONStream');

var parser = JSONStream.parse([true]);
var stream = parser.pipe(shoe('/sock')).pipe(parser);
var ev = emitStream(stream);

ev.on('lower', function (msg) {
    var div = document.createElement('div');
    div.textContent = msg.toLowerCase();
    document.body.appendChild(div);
});

ev.on('upper', function (msg) {
    var div = document.createElement('div');
    div.textContent = msg.toUpperCase();
    document.body.appendChild(div);
});

Use browserify to build this browser source code so that you can require() all these nifty modules browser-side:

$ browserify main.js -o bundle.js

Then drop a <script src="/bundle.js"></script> into some html and open it up in a browser to see server-side events streamed through to the browser side of things.

With this streaming approach you can rely more on tiny reusable components that only need to know how to talk streams. Instead of routing messages through a global event system socket.io-style, you can focus more on breaking up your application into tinier units of functionality that can do exactly one thing well.

For instance you can trivially swap out JSONStream in this example for stream-serializer to get a different take on serialization with a different set of tradeoffs. You could bolt layers over top of shoe to handle reconnections or heartbeats using simple streaming interfaces. You could even add a stream into the chain to use namespaced events with eventemitter2 instead of the EventEmitter in core.

If you want some different streams that act in different ways it would likewise be pretty simple to run the shoe stream in this example through mux-demux to create separate channels for each different kind of stream that you need.

As the requirements of your system evolve over time, you can swap out each of these streaming pieces as necessary without as many of the all-or-nothing risks that more opinionated framework approaches necessarily entail.

html streams for the browser and the server

We can use some streaming modules to reuse the same html rendering logic for the client and the server! This approach is indexable, SEO-friendly, and gives us realtime updates.

Our renderer takes lines of json as input and returns html strings as its output. Text, the universal interface!

render.js:

var through = require('through');
var hyperglue = require('hyperglue');
var fs = require('fs');
var html = fs.readFileSync(__dirname + '/static/row.html');

module.exports = function () {
    return through(function (line) {
        try { var row = JSON.parse(line) }
        catch (err) { return this.emit('error', err) }

        this.queue(hyperglue(html, {
            '.who': row.who,
            '.message': row.message
        }).outerHTML);
    });
};

We can use brfs to inline the fs.readFileSync() call for browser code and hyperglue to update html based on css selectors. You don't need to use hyperglue necessarily here; anything that can return a string with html in it will work.

The row.html used is just a really simple stub thing:

row.html:

<div class="row">
  <div class="who"></div>
  <div class="message"></div>
</div>

The server will just use slice-file to keep everything simple. slice-file is little more than a glorified tail/tail -f api but the interfaces map well to databases with regular results plus a changes feed like couchdb.

server.js:

var http = require('http');
var fs = require('fs');
var hyperstream = require('hyperstream');
var ecstatic = require('ecstatic')(__dirname + '/static');

var sliceFile = require('slice-file');
var sf = sliceFile(__dirname + '/data.txt');

var render = require('./render');

var server = http.createServer(function (req, res) {
    if (req.url === '/') {
        var hs = hyperstream({
            '#rows': sf.slice(-5).pipe(render())
        });
        hs.pipe(res);
        fs.createReadStream(__dirname + '/static/index.html').pipe(hs);
    }
    else ecstatic(req, res)
});
server.listen(8000);

var shoe = require('shoe');
var sock = shoe(function (stream) {
    sf.follow(-1,0).pipe(stream);
});
sock.install(server, '/sock');

The first part of the server handles the / route and streams the last 5 lines from data.txt into the #rows div.

The second part of the server handles realtime updates to #rows using shoe, a simple streaming websocket polyfill.

Next we can write some simple browser code to populate the realtime updates from shoe into the #rows div:

var through = require('through');
var render = require('./render');

var shoe = require('shoe');
var stream = shoe('/sock');

var rows = document.querySelector('#rows');
stream.pipe(render()).pipe(through(function (html) {
    rows.innerHTML += html;
}));

Just compile with browserify and brfs:

$ browserify -t brfs browser.js > static/bundle.js

And that's it! Now we can populate data.txt with some silly data:

$ echo '{"who":"substack","message":"beep boop."}' >> data.txt
$ echo '{"who":"zoltar","message":"COWER PUNY HUMANS"}' >> data.txt

then spin up the server:

$ node server.js

then navigate to localhost:8000 where we will see our content. If we add some more content:

$ echo '{"who":"substack","message":"oh hello."}' >> data.txt
$ echo '{"who":"zoltar","message":"HEAR ME!"}' >> data.txt

then the page updates automatically with the realtime updates, hooray!

We're now using exactly the same rendering logic on both the client and the server to serve up SEO-friendly, indexable realtime content. Hooray!

Something went wrong with that request. Please try again.