Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
mafintosh committed Aug 14, 2013
0 parents commit 1eb1680
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -0,0 +1 @@
node_modules
46 changes: 46 additions & 0 deletions README.md
@@ -0,0 +1,46 @@
# pump

pump is a small node module that pipes streams together and destroy them if one of them closes.

npm install pump

When using standard `source.pipe(dest)` source will _not_ be destroyed if dest emits close or an error.
You are also not able to provide a callback to tell when then pipe has finished.

pump does these two things for you

## Usage

Simply pass the streams you want to pipe together to pump and add an optional callback

``` js
var pump = require('pump');
var fs = require('fs');

var source = fs.createReadStream('/dev/random');
var dest = fs.createWriteStream('/dev/null');

pump(source, dest, function(err) {
console.log('pipe finished', err);
});

setTimeout(function() {
dest.destroy();
}, 1000);
```

You can use pump to pipe more than two streams together as well

``` js
var transform = someTransformStream();

pump(source, transform, anotherTransform, dest, function(err) {
console.log('pipe finished', err);
});
```

If `source`, `transform`, `anotherTransform` or `dest` closes all of them will be destroyed.

## License

MIT
62 changes: 62 additions & 0 deletions index.js
@@ -0,0 +1,62 @@
var once = require('once');
var noop = function() {};

var destroyer = function(stream, callback) {
var ended = false;
var closed = false;
var destroyed = false;

callback = once(callback);

var onend = function() {
ended = true;
callback();
};

var onclose = function() {
closed = true;
if (ended || (stream._readableState && stream._readableState.ended)) return;
callback(new Error('stream closed'));
};

stream.on('error', callback);
stream.on('close', onclose);
stream.on('finish', onend);
stream.on('end', onend);

var destroy = function() {
if (ended || closed || destroyed || !stream.destroy) return;
destroyed = true;
stream.destroy();
};

return destroy;
};

var call = function(fn) {
fn();
};

var pipe = function(from, to) {
return from.pipe(to);
};

var pump = function() {
var streams = Array.prototype.slice.call(arguments);
var callback = typeof streams[streams.length-1] === 'function' ? streams.pop() : noop;

if (Array.isArray(streams[0])) streams = streams[0];

var destroys = streams.map(function(stream, i) {
return destroyer(stream, function(err) {
if (err) destroys.forEach(call);
if (i < streams.length-1) return;
destroys.forEach(call);
callback(err);
});
});

return streams.reduce(pipe);
};

module.exports = pump;
15 changes: 15 additions & 0 deletions package.json
@@ -0,0 +1,15 @@
{
"name": "pump",
"version": "0.1.0",
"repository": "git://github.com/mafintosh/pump.git",
"license": "MIT",
"description": "pipe streams together and close all of them if one of them closes",
"keywords": ["streams", "pipe", "destroy", "callback"],
"author": "Mathias Buus Madsen <mathiasbuus@gmail.com>",
"dependencies": {
"once": "~1.2.0"
},
"scripts": {
"test": "node test.js"
}
}
47 changes: 47 additions & 0 deletions test.js
@@ -0,0 +1,47 @@
var assert = require('assert');
var pump = require('./index');

var rs = require('fs').createReadStream('/dev/random');
var ws = require('fs').createWriteStream('/dev/null');

var toHex = function() {
var reverse = new (require('stream').Transform)();

reverse._transform = function(chunk, enc, callback) {
reverse.push(chunk.toString('hex'));
callback();
};

return reverse;
};

var wsClosed = false;
var rsClosed = false;
var callbackCalled = false;

var check = function() {
if (wsClosed && rsClosed && callbackCalled) process.exit(0);
};

ws.on('close', function() {
wsClosed = true;
check();
});

rs.on('close', function() {
rsClosed = true;
check();
});

pump(rs, toHex(), toHex(), toHex(), ws, function(err) {
callbackCalled = true;
check();
});

setTimeout(function() {
rs.destroy();
}, 1000);

setTimeout(function() {
throw new Error('timeout');
}, 5000);

0 comments on commit 1eb1680

Please sign in to comment.