how to write node programs with streams
Switch branches/tags
Nothing to show
Clone or download
Pull request Compare This branch is 10 commits ahead, 80 commits behind substack:master.
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
example
readme.markdown

readme.markdown

イントロダクション

このドキュメントはストリームを用いたnode.jsプログラムの基本的な書き方について説明します。

プログラム同士をガーデニング用のホースのようにつなぎ合わせる仕組み、すなわち、データを何か別の方法で操作する必要が出てきたときに、他の部分につなぎ込めるような仕組みがあるべきではないか。IOについても同じことが言える。(ダグラス・マッキルロイ 1964年10月11日

doug mcilroy


Unixの草創期に登場したストリームは、1つのことをうまくやる小さなコンポーネントを組み合わせて巨大なシステムを作り上げる際の信頼できる手法であることを、数十年に渡って証明してきました。Unixにおいては、ストリームはpipe|としてシェルに実装されています。Nodeではビルトインのストリームモジュールはコアライブラリで利用されているほか、ユーザ領域のモジュールでも使用できます。Unixと同様に、Nodeのストリームモジュールの基礎となる合成演算子は .pipe() と呼ばれ、遅い消費者への書き込み速度を自由に調整するための背圧メカニズムも備わっています。

ストリームは、実装の表面を再利用しやすい一貫性したインタフェースとして限定することで、関心の分離を促進します。だからこそ、1つのストリームの出力を他のストリームの入力へとつなぐことができるのです。また、抽象的にストリームを操作するライブラリを用いることで、高レベルなフローコントロールを導入することもできます。

ストリームは、小さなコード設計Unix哲学において重要なコンポーネントです。しかし、他にも考慮すべき抽象的概念はたくさんあります。技術的負債は敵であることと、目の前にある問題に対する最適な抽象的概念を探し求めることを忘れないでください。

brian kernighan


なぜストリームを使うべきなのか

NodeのI/Oは非同期なので、ハードディスクやネットワークとやり取りする際には、関数にコールバックを渡す必要があります。ファイルを配信するサーバーを以下のように書きたい衝動に駆られるでしょう。

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        if (err) {
            res.statusCode = 500;
            res.end(String(err));
        }
        else res.end(data);
    });
});
server.listen(8000);

このコードは動きはしますが、長ったらしい上に、毎回のリクエストごとにdata.txtファイルの全体をバッファをメモリに書き込み、その後でクライアントに結果を返します。もし、data.txtが非常に大きい場合は、このプログラムはユーザー数と同じだけの大量のメモリを消費しはじめるでしょう。また、ユーザーがコンテンツを受け取る前には、ファイルの全てが読み込まれるのを待たなければならず、レイテンシは非常に高いものになるでしょう。

ラッキーなことに(req, res)の両方の引数はStreamです。これはファイルの書き込みにfs.readFile()よりも良い方法であるfs.createReadStream()を使えるという事です。

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.on('error', function (err) {
        res.statusCode = 500;
        res.end(String(err));
    });
    stream.pipe(res);
});
server.listen(8000);

この.pipe()fs.createReadStream()から発生した'data''end'イベントの面倒を見てくれます。このコードはクリーンなだけではなく、いまやdata.txtファイルがハードディスクから読み込まれれば即時に、クライアントに1つのチャンクとして書き込まれるようになりました。

.pipe()のメリットは他にもあります。自動的に背圧の制御をしてくれるので、リモートクライアントが非常に遅い時や、高レイテンシで接続している時などに、Nodeが必要もないのにチャンクをバッファとしてメモリに書き込んだりしなくてもよいという点です。

このサンプルは最初のものよりもずっと良いのですが、まだ冗長です。Streamの一番大きい利点は、Streamが何にでも融通をきかせられる点です。Streamを扱うモジュールを使ってもっとシンプルなサンプルを作ってみましょう。

var http = require('http');
var filed = require('filed');

var server = http.createServer(function (req, res) {
    filed(__dirname + '/data.txt').pipe(res);
});
server.listen(8000);

filedモジュールを使うと、mimeタイプ・etagキャシュや例外処理などの素晴しいStream APIを苦労無く使うことができます。

圧縮がしたいですか? それもストリームモジュールにあります!

var http = require('http');
var filed = require('filed');
var oppressor = require('oppressor');

var server = http.createServer(function (req, res) {
    filed(__dirname + '/data.txt')
        .pipe(oppressor(req))
        .pipe(res)
    ;
});
server.listen(8000);

これでgzipかdeflateがサポートされているブラウザ用にファイルを圧縮することができました! ただoppressorにコンテンツのエンコードを全て任せてしまっただけです。

一度ストリームAPIを使いこなせるようになれば、レゴブロックやガーデニング用のホースのように、ストリームのモジュールをピタっと合わせていくだけでよいのです。ふぞろいな非ストリームAPIを使って、データをどんな風にプッシュするかを覚えておく必要もなくなります。

ストリームはNodeのプログラミングをシンプル、エレガントで、組み立てやすいものにします。

基本

ストリームは単なるEventEmitter.pipe()メソッドをつけたもので、readableか、writableか、それともその両方か(duplex)に応じて、決められた動作を期待されます。

新しいストリームを作成するには以下のようにするだけです。

var Stream = require('stream');
var s = new Stream;

新しいストリームは何もしません。というのも、readableでもwritableでもないからです。

readable

このストリームsをreadableなものにするには、単にreaaableプロパティをtrueにするだけで大丈夫です。

s.readable = true

readableなストリームはdataイベントを何回も発行し、endイベントを1回だけ発行します。endイベントを発行したら、それ以上dataイベントを発行すべきではありません。

この単純なreadableストリームは1秒に1回dataイベントを発行するのを5秒間繰り返した後、終了します。データは標準出力にパイプされているので、そのふるまいを見てとることができます。

var Stream = require('stream');

function createStream () {
    var s = new Stream;
    s.readable = true

    var times = 0;
    var iv = setInterval(function () {
        s.emit('data', times + '\n');
        if (++times === 5) {
            s.emit('end');
            clearInterval(iv);
        }
    }, 1000);
    
    return s;
}

createStream().pipe(process.stdout);
substack : ~ $ node rs.js
0
1
2
3
4
substack : ~ $ 

この例ではdataイベントは第1引数に文字列を格納しています。ストリームで扱うデータはバッファや文字列が最も一般的ですが、他のオブジェクトをemitとするのが有効な場合もあるでしょう。

emitしようとしているデータが、パイプしようとしているwritableストリームが期待するデータと互換性があるかどうかには注意してください。 そうでない場合は、もともと想定している相手にパイプする前に、中継・変換用、パース用のストリームをパイプすることもできます。

writable

writableストリームとは、入力を受け取れるストリームのことです。writableストリームを作るには、writableプロパティをtrueにして、write()end()、そしてdestroy()を定義します。

以下のwritableストリームは、受け取ったストリームからバイト数を数えて、結果をシンプルなend()で出力します。もしストリームが破棄(destroy)された場合には何もしません。

var Stream = require('stream');
var s = new Stream;
s.writable = true;

var bytes = 0;

s.write = function (buf) {
    bytes += buf.length;
};

s.end = function (buf) {
    if (arguments.length) s.write(buf);
    
    s.writable = false;
    console.log(bytes + ' bytes written');
};

s.destroy = function () {
    s.writable = false;
};

このwritableストリームにファイルをパイプしてみます。

var fs = require('fs');
fs.createReadStream('/etc/passwd').pipe(s);
$ node writable.js
2447 bytes written

ここで注意すべきなのは、end(buf)write(buf)end()の組み合わせとして扱うNodeの慣習です。他の人はendがコアモジュールで用いられているものと同様にふるまうと期待するので、これを省略するのは混乱の原因となります。

背圧

背圧は、writableストリームがデータを消費するよりも速いスピードで、readableストリームからのデータがemitされないようにするための仕組みです。

背圧を管理するAPIは、Nodeのバージョン0.8以降で大幅な変更が加えられつつあります。pause()resume()emit('drain')は廃止される予定です。

背圧を正しく実行するには、readableストリームがpause()resume()を実装している必要があります。パイプされているreadableストリームのペースを緩めてほしい場合に、writableストリームの.write()はfalseを返します。そして再びデータを受け取る準備ができたときに'drain'を発行します。

wriableストリームの背圧

readableストリームの流れを緩やかにしたいときには、writableストリームの.write()メソッドはfalseを返すようにするべきです。こうすることで、readableストリームにおいてpause()が呼ばれます。

writableストリームがデータを受け取る準備ができたときには、'drain'イベントを発行すべきです。'drain'イベントが発行されることで、readableストリームにおいてresume()が呼ばれます。

readableストリームの背圧

readableストリームのpause()が呼ばれたということは、下流にあるwritableストリームが上流のreadableストリームの速さを緩やかにするよう望んでいることを意味します。pause()が呼ばれたら、readableストリームはデータのemitを止めるべきです。ただ、これは必ず可能なわけではありません。

下流のストリームがデータを受け入れる準備ができたら、readableストリームのresume()が呼ばれます。

パイプ

.pipe()は、readaleストリームのデータをwritableストリームに混ぜ込む糊の役割を果たします。また、背圧の管理も行います。パイプのAPIは単に以下のようにするだけです。

src.pipe(dst)

readableストリームであるsrcと、writableストリームのdstに対して、.pipe()dstを返します。なので、このdstがreadableストリームでもあるときには、以下のように.pipe()をつなぎ合わせることができます、

a.pipe(b).pipe(c).pipe(d)

これはシェルで|演算子を使うのと似ています。

a | b | c | d

a.pipe(b).pipe(c).pipe(d)という使い方は、以下と同等です。

a.pipe(b);
b.pipe(c);
c.pipe(d);

コアモジュールでのストリームの実装は、単なるパイプ関数つきのEventEmitterです。pipe()ソースコードはごく短いので、目を通しておくべきでしょう。

用語

以下の用語はストリームについて語る上で便利です。

through

throughストリームは入力を変換して出力する、簡単なreadableかつwritableなフィルタを指します。

duplex

duplexストリームはreadableかつwritabeで、たとえば電話でメッセージが行ったり来たりするように、お互いのストリームの端が双方向のインタラクションを受け持ちます。rpcのやり取りはduplexなストリームの好例と言えます。以下のようなコードを見かけたら、おそらくそれはduplexストリームを扱ったものでしょう。

a.pipe(b).pipe(a)

さらなる読み物として

ストリームの将来

バージョン0.9にて大幅なアップグレードが予定されています。ただ、.pipe()による基本的なAPIはそのままで、内部の実装に変更がある見込みです。新しいAPIとここに書かれた既存のAPIとの互換性は、当分の間、維持される予定です。

将来、ストリームがどうなるのかはreadable-streamのリポジトリから見ることができます。


組み込みのストリーム

以下のストリームはNode自体に組み込まれています。

process

process.stdin

このreadableストリームにはプログラムの標準入力が含まれています。

このストリームは初期状態では停止していますが、初めて参照したときにnext tickとして.resume()が暗黙の内に実行されます。

標準入力がtty(tty.isatty()を参照のこと)の場合、入力のイベントにはラインバッファが用いられます。process.stdin.setRawMode(true)を呼び出すことで、ラインバッファを無効化することができます。けれども^C^Dといったキーの組み合わせに対応するハンドラは削除されてしまいます。

process.stdout

このwritableストリームにはプログラムの標準出力が含まれています。

標準出力にデータを送りたいときには、ここにwriteしてください。

process.stderr

このwritableストリームにはプログラムの標準エラー出力が含まれています。

標準エラー出力にデータを送りたいときには、ここにwriteしてください。

child_process.spawn()

fs

fs.createReadStream()

fs.createWriteStream()

net

net.connect()

この関数はtcpを使ってリモートホストへ接続する[duplexストリーム]を返します。

ストリームへの書き込みはすぐに行うことができて、connectイベントが発行されるまでバッファされます。

net.createServer()

http

http.request()

http.createServer()

zlib

zlib.createGzip()

zlib.createGunzip()

zlib.createDeflate()

zlib.createInflate()


ストリームの制御

through

from

pause-stream

concat-stream

duplex

duplexer

emit-stream

invert-stream

map-stream

remote-events

buffer-stream

event-stream

auth-stream


メタストリーム

mux-demux

stream-router

multi-channel-mdm


状態ストリーム

crdt

delta-stream

scuttlebutt

scuttlebuttはP2Pによる状態の同期に利用できます。このメッシュ状のトポロジーにおいては、ノードは仲介者を通じて連結されているだけで、全てのデータのバージョンを管理するような特権的なノードは存在しません。

scuttlebuttが提供するような分散P2Pのネットワークは、異なるネットワーク上にあるノード同士が、同一の状態を共有・更新しなければならないときに特に効果を発揮します。この手のネットワークの1つの例が、クライアントがhttpサーバを経由して、直接には接続できないバックエンドのプロセスにメッセージを送る場合です。他のユースケースとしては、IPv4の不足を解消するために、内部ネットワーク同士を橋渡しするようなシステムなどがありそうです。

scuttlebutt/modelインターフェースを利用してノードを作成し、それらをパイプでつなぎあわせることで、どんなネットワークでも思いのままに作ることができます。

var Model = require('scuttlebutt/model');
var am = new Model;
var as = am.createStream();

var bm = new Model;
var bs = bm.createStream();

var cm = new Model;
var cs = cm.createStream();

var dm = new Model;
var ds = dm.createStream();

var em = new Model;
var es = em.createStream();

as.pipe(bs).pipe(as);
bs.pipe(cs).pipe(bs);
bs.pipe(ds).pipe(bs);
ds.pipe(es).pipe(ds);

em.on('update', function (key, value, source) {
    console.log(key + ' => ' + value + ' from ' + source);
});

am.set('x', 555);

ここで作成したネットワークは以下のような無向グラフです。

a <-> b <-> c
      ^
      |
      v
      d <-> e

ノードaeは直接には連結していせん。しかしこのスクリプトを実行すると……

$ node model.js
x => 555 from 1347857300518

ノードaにセットされた値はノードbdを経由してeへの経路を見つけます。この例では全てのノードが同じプロセス上にありますが、scuttlebuttはシンプルなストリームのインターフェースを利用しているので、ノードはどのプロセスに、どのサーバに置かれていても、また文字列を扱うことができさえすれば、どんなストリームのトランスポートを使っていても大丈夫です。

次に、ネットワーク越しにカウンタ変数をインクリメントするような、より現実的な例を作ってみます。

この例では、サーバはcountの初期値を0にセットし、320ミリ秒ごとにcount ++を実行、countの更新があるたびに出力します。

var Model = require('scuttlebutt/model');
var net = require('net');

var m = new Model;
m.set('count', '0');
m.on('update', function (key, value) {
    console.log(key + ' = ' + m.get('count'));
});

var server = net.createServer(function (stream) {
    stream.pipe(m.createStream()).pipe(stream);
});
server.listen(8888);

setInterval(function () {
    m.set('count', Number(m.get('count')) + 1);
}, 320);

さて、このサーバに接続して、countの値を一定時間ごとに更新、受け取った更新を出力するクライアントを作ることができます。

var Model = require('scuttlebutt/model');
var net = require('net');

var m = new Model;
var s = m.createStream();

s.pipe(net.connect(8888, 'localhost')).pipe(s);

m.on('update', function cb (key) {
    // wait until we've gotten at least one count value from the network
    if (key !== 'count') return;
    m.removeListener('update', cb);
    
    setInterval(function () {
        m.set('count', Number(m.get('count')) + 1);
    }, 100);
});

m.on('update', function (key, value) {
    console.log(key + ' = ' + value);
});

クライアントは少しだけ奇妙な書き方をしています。誰かから更新を受けとるまで、自身によるカウンタの更新を待たなければならないからです。そうしないとカウンタが0になってしまう可能性があります。

サーバの立ち上げに成功し、いくつかのクライアントが実行されると、以下のような出力列が見られます。

count = 183
count = 184
count = 185
count = 186
count = 187
count = 188
count = 189

時折、一部のノードでは出力列に同じ値の繰り返しが見られる場合があります。

count = 147
count = 148
count = 149
count = 149
count = 150
count = 151

このような値をとるのは、scuttlebutt'sの履歴ベースの衝突解消アルゴリズムが、全てのノードの状態を結果的に整合させるよう動作しているからです。

ここで取り上げた例において、サーバとは、接続してくるクライアントと同等の権限しか持たないノードの1つに過ぎません。「クライアント」「サーバ」という用法は、単に誰がコネクションの口火を切るかに関連しているだけで、状態の同期がどのような手順に従うかとは無関係です。このような性質を備えたプロトコルはしばしばシンメトリック・プロトコルと呼ばれています。シンメトリック・プロトコルに関する別の例としてdnodeも見てください。


httpストリーム

request

filed

oppressor

response-stream


IOストリーム

reconnect

kv

discovery-network


パーサストリーム

tar

trumpet

JSONStream

json形式のデータをパースしたり、文字列に変換するのに用います。

遅いコネクション越しにサイズの大きいjsonを渡す必要があるときや、少しずつしかやってこないjsonオブジェクトを扱うときにこのモジュールを使えば、データの到着と同時に少しずつパースすることができます。

json-scrape

stream-serializer


ブラウザストリーム

shoe

domnode

sorta

graph-stream

arrow-keys

attribute

data-bind


オーディオストリーム

baudio

rpcストリーム

dnode

dnodeを使えば、どんなストリームを介しても、リモートの関数を呼び出せるようになります。

以下は基本的なdnoneサーバの例です。

var dnode = require('dnode');
var net = require('net');

var server = net.createServer(function (c) {
    var d = dnode({
        transform : function (s, cb) {
            cb(s.replace(/[aeiou]{2,}/, 'oo').toUpperCase())
        }
    });
    c.pipe(d).pipe(c);
});

server.listen(5004);

さて、サーバの.transform()を呼び出すシンプルなクライアントに取りかかりましょう。

var dnode = require('dnode');
var net = require('net');

var d = dnode();
d.on('remote', function (remote) {
    remote.transform('beep', function (s) {
        console.log('beep => ' + s);
        d.end();
    });
});

var c = net.connect(5004);
c.pipe(d).pipe(c);

サーバを立ち上げて、クライアントを実行すると、以下のように表示されるはずです。

$ node client.js
beep => BOOP

クライアントがサーバのtransform()関数に'beep'を送ると、サーバはその結果と共にクライアントのコールバックを実行します。すごい!

ここでdnodeが提供するストリームのインターフェースがduplexストリームです。というのも、クライアントとサーバの双方がパイプでつながれていて(c.pipe(d).pipe(c))、リクエストとレスポンスが両サイドからやってくるからです。

dnodeのやばいところは、スタブとして利用するコールバックの引数に関数を渡すことができる点です。以下が先ほどのサーバを書き換えたもので、多段式のコールバックがあっちこっちで受け渡されるバージョンです。

var dnode = require('dnode');
var net = require('net');

var server = net.createServer(function (c) {
    var d = dnode({
        transform : function (s, cb) {
            cb(function (n, fn) {
                var oo = Array(n+1).join('o');
                fn(s.replace(/[aeiou]{2,}/, oo).toUpperCase());
            });
        }
    });
    c.pipe(d).pipe(c);
});

server.listen(5004);

クライアント側も書き換えたのが以下です。

var dnode = require('dnode');
var net = require('net');

var d = dnode();
d.on('remote', function (remote) {
    remote.transform('beep', function (cb) {
        cb(10, function (s) {
            console.log('beep:10 => ' + s);
            d.end();
        });
    });
});

var c = net.connect(5004);
c.pipe(d).pipe(c);

サーバを立ち上げて、クライアントを実行すると、以下のように表示されます。

$ node client.js
beep:10 => BOOOOOOOOOOP

ちゃんと動く!™

オブジェクトにしまっておいた関数はストリームの反対側から呼び出された後、スタブとして利用され、ぐるっと回って元々の関数があった最初の場所に戻ってきます。これが基本的な考えです。すばらしいのは、スタブ関数の引数として渡された関数が、__反対側__でスタブとして利用される点です。

このように関数の引数を再帰的にスタブとして利用するアプローチを、今後は「どこまで行っても亀がいる」戦略と呼びたいと思います。関数の返り値は全て無視され、オブジェクトの列挙可能なプロパティのみがjson形式で送られます。

どこまで行っても、亀!

どこまで行っても、亀!

dnodeはどんなストリームでも、Node側でもブラウザ側でも動作するので、関数がどこで定義されていようと呼び出せます。とりわけmux-demuxとあわせてrpcストリームを多重化し、巨大なデータストリームを並列でコントロールする際には特に役に立ちます。

rpc-stream


ストリームのテスト

tap

stream-spec


強力なコンボ

socket.ioをくるむ

先に述べたライブラリを活用することで、ストリーム上にsocket.io風のEventEmitter型APIを構築することができます。

まず、サーバサイドのwebsocketハンドラとしてはshoeが使えます。EventEmiterを、オブジェクトをemit可能なストリームに変換するにはemit-streamが利用できます。オブジェクトのストリームはJSONStreamによってシリアライズされ、シリアライズされたストリームはリモートのブラウザにパイプできます。

var EventEmitter = require('events').EventEmitter;
var shoe = require('shoe');
var emitStream = require('emit-stream');
var JSONStream = require('JSONStream');

var sock = shoe(function (stream) {
    var ev = new EventEmitter;
    emitStream(ev)
        .pipe(JSONStream.stringify())
        .pipe(stream)
    ;
    ...
});

shoeのコールバックの中で、evに登録された関数にイベントを発行することができます。以下では一定時間ごとに異なるイベントを発行しています。

var intervals = [];

intervals.push(setInterval(function () {
    ev.emit('upper', 'abc');
}, 500));

intervals.push(setInterval(function () {
    ev.emit('lower', 'def');
}, 300));

stream.on('end', function () {
    intervals.forEach(clearInterval);
});

最後に、shoeのインスタンスをhttpサーバにバインドしてやる必要があります。

var http = require('http');
var server = http.createServer(require('ecstatic')(__dirname));
server.listen(8080);

sock.install(server, '/sock');

ブラウザ側では、json形式のshoeストリームをパースして、結果のオブジェクトストリームをeventStram()に渡します。eventStream()はサーバサイドのイベントをemitするEventEmitterを返します。

var shoe = require('shoe');
var emitStream = require('emit-stream');
var JSONStream = require('JSONStream');

var parser = JSONStream.parse([true]);
var stream = parser.pipe(shoe('/sock')).pipe(parser);
var ev = emitStream(stream);

ev.on('lower', function (msg) {
    var div = document.createElement('div');
    div.textContent = msg.toLowerCase();
    document.body.appendChild(div);
});

ev.on('upper', function (msg) {
    var div = document.createElement('div');
    div.textContent = msg.toUpperCase();
    document.body.appendChild(div);
});

ブラウザ側のソースコードをビルドするのにbrowserifyを使えば、これらの便利なモジュールをブラウザ側でrequire()することができます。

$ browserify main.js -o bundle.js

あとは適当なhtmlに<script src="/bundle.js"></script>と貼りつけてブラウザで開くと、サーバサイドのイベントがブラウザにストリームされてくるのがわかります。

このようなストリーム的なアプローチを活用することで、ストリームと対話する方法のみを知っているような、コンパクトで再利用可能な部品に頼れるようになります。メッセージのルーティングをsocket.io型のグローバルなイベント機構を使って行う代わりに、アプリケーションを、たった1つのことをうまくやる小さな機能のかたまりに分けることのみに集中すればよくなります。

ちょっとした例を挙げましょう。様々な条件を考慮して別の方法でシリアライズをしようと考えた場合には、上のサンプルで用いたJSONStreamをstream-serializerに交換できます。ストリームのシンプルなインターフェースを利用することで、reconnectionsやheartbeatsを管理するレイヤーをshoeに取りつけることもできます。コアモジュールのEventEmitterの代わりに、eventemitter2による名前空間つきイベントを利用する場合でさえも、ストリームを組み込むことができます。

もし異なるふるまいを見せる別々のストリームを扱いたいのであれば、この例で挙げたshoeのストリームをmux-demuxの上で流れるようにすればよさそうです。こうすれば、必要なストリームの種類に応じて別々のチャネルを作り出すことができます。これも同じようにごくシンプルにできます。

システムの要件が複雑になるにつれて、硬直的なフレームワークによるアプローチには付き物の一か八かのリスクを冒すことなしに、ストリームのピースを必要に応じて取り換えることができます。