Skip to content

Commit

Permalink
Accepting fucntion returning streams
Browse files Browse the repository at this point in the history
  • Loading branch information
nfroidure committed Feb 15, 2014
1 parent a355962 commit 46b3e5f
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 47 deletions.
11 changes: 11 additions & 0 deletions README.md
Expand Up @@ -19,6 +19,17 @@ var queue = streamqueue(
Fs.createReadStream('input3.txt')
).pipe(process.stdout);
```
StreamQueue also accept functions returning streams, the above can be written
like this, doing system calls only when piping:
```js
var streamqueue = require('streamqueue');

var queue = streamqueue(
Fs.createReadStream.bind(null, 'input.txt'),
Fs.createReadStream.bind(null, 'input2.txt'),
Fs.createReadStream.bind(null, 'input3.txt')
).pipe(process.stdout);
```

Object-oriented traditionnal API offers more flexibility:
```js
Expand Down
43 changes: 30 additions & 13 deletions src/index.js
Expand Up @@ -18,7 +18,7 @@ function StreamQueue(options) {
// Options
this._pauseFlowingStream = true;
this._resumeFlowingStream = true;
if(!(options instanceof Stream)) {
if(!(options instanceof Stream || 'function' === typeof options)) {
if('boolean' == typeof options.pauseFlowingStream) {
this._pauseFlowingStream = options.pauseFlowingStream;
delete options.pauseFlowingStream;
Expand All @@ -30,7 +30,11 @@ function StreamQueue(options) {
}

// Parent constructor
Stream.PassThrough.call(this, options instanceof Stream ? undefined : options);
Stream.PassThrough.call(this,
options instanceof Stream || 'function' === typeof options
? undefined
: options
);

// Prepare streams queue
this._streams = [];
Expand All @@ -39,9 +43,11 @@ function StreamQueue(options) {
this._objectMode = options.objectMode || false;

// Queue given streams and ends
if(arguments.length > 1 || options instanceof Stream) {
if(arguments.length > 1 || options instanceof Stream
|| 'function' === typeof options) {
this.done.apply(this,
[].slice.call(arguments, options instanceof Stream ? 0 : 1));
[].slice.call(arguments,
options instanceof Stream || 'function' === typeof options ? 0 : 1));
}

}
Expand All @@ -56,17 +62,25 @@ StreamQueue.prototype.queue = function() {
}

streams = streams.map(function(stream) {
stream.on('error', function(err) {
_self.emit('error', err);
});
if('undefined' == typeof stream._readableState) {
stream = (new Stream.Readable({objectMode: _self._objectMode}))
.wrap(stream);
function wrapper(stream) {
stream.on('error', function(err) {
_self.emit('error', err);
});
if('undefined' == typeof stream._readableState) {
stream = (new Stream.Readable({objectMode: _self._objectMode}))
.wrap(stream);
}
if(this._pauseFlowingStream&&stream._readableState.flowing) {
stream.pause();
}
return stream;
}
if(this._pauseFlowingStream&&stream._readableState.flowing) {
stream.pause();
if('function' === typeof stream) {
return function() {
return wrapper(stream());
};
}
return stream;
return wrapper(stream);
});

this._streams = this._streams.length ? this._streams.concat(streams) : streams;
Expand All @@ -91,6 +105,9 @@ StreamQueue.prototype._pipeNextStream = function() {
return;
}
var stream = this._streams.shift();
if('function' === typeof stream) {
stream = stream();
}
if(this._resumeFlowingStream&&stream._readableState.flowing) {
stream.resume();
}
Expand Down
206 changes: 172 additions & 34 deletions tests/index.mocha.js
Expand Up @@ -40,6 +40,14 @@ function readableStream(chunks) {
stream.resume();
return stream;
}
function erroredStream(msg) {
var erroredStream = new Stream.PassThrough();
setImmediate(function() {
erroredStream.emit('error', new Error(msg));
erroredStream.end();
});
return erroredStream;
}

// Tests
describe('StreamQueue', function() {
Expand All @@ -61,14 +69,10 @@ describe('StreamQueue', function() {
});

it('should work with functionnal API and options', function(done) {
var stream1 = new Stream.PassThrough()
, stream2 = new Stream.PassThrough()
, stream3 = new Stream.PassThrough()
;
StreamQueue({pause: true},
writeToStream(stream1, ['wa','dup']),
writeToStream(stream2, ['pl','op']),
writeToStream(stream3, ['ki','koo','lol'])
writeToStream(new Stream.PassThrough(), ['wa','dup']),
writeToStream(new Stream.PassThrough(), ['pl','op']),
writeToStream(new Stream.PassThrough(), ['ki','koo','lol'])
).pipe(es.wait(function(err, data) {
assert.equal(err, null);
assert.equal(data, 'wadupplopkikoolol');
Expand Down Expand Up @@ -111,14 +115,10 @@ describe('StreamQueue', function() {
var queue = new StreamQueue({
pauseFlowingStream: true,
resumeFlowingStream: true
})
, stream1 = new Stream.PassThrough()
, stream2 = new Stream.PassThrough()
, stream3 = new Stream.PassThrough()
;
queue.queue(writeToStream(stream1, ['wa','dup']));
queue.queue(writeToStream(stream2, ['pl','op']));
queue.queue(writeToStream(stream3, ['ki','koo','lol']));
});
queue.queue(writeToStream(new Stream.PassThrough(), ['wa','dup']));
queue.queue(writeToStream(new Stream.PassThrough(), ['pl','op']));
queue.queue(writeToStream(new Stream.PassThrough(), ['ki','koo','lol']));
assert.equal(queue.length, 3);
queue.pipe(es.wait(function(err, data) {
assert.equal(err, null);
Expand All @@ -129,14 +129,10 @@ describe('StreamQueue', function() {
});

it('should work with POO API and a late done call', function(done) {
var queue = new StreamQueue()
, stream1 = new Stream.PassThrough()
, stream2 = new Stream.PassThrough()
, stream3 = new Stream.PassThrough()
;
queue.queue(writeToStream(stream1, ['wa','dup']));
queue.queue(writeToStream(stream2, ['pl','op']));
queue.queue(writeToStream(stream3, ['ki','koo','lol']));
var queue = new StreamQueue();
queue.queue(writeToStream(new Stream.PassThrough(), ['wa','dup']));
queue.queue(writeToStream(new Stream.PassThrough(), ['pl','op']));
queue.queue(writeToStream(new Stream.PassThrough(), ['ki','koo','lol']));
assert.equal(queue.length, 3);
queue.pipe(es.wait(function(err, data) {
assert.equal(err, null);
Expand All @@ -149,10 +145,9 @@ describe('StreamQueue', function() {
});

it('should reemit errors', function(done) {
var erroredStream = new Stream.PassThrough();
var gotError = false;
var queue = new StreamQueue();
queue.queue(erroredStream);
queue.queue(erroredStream('Aouch!'));
queue.queue(writeToStream(new Stream.PassThrough(), ['wa','dup']));
queue.queue(writeToStream(new Stream.PassThrough(), ['pl','op']));
queue.queue(writeToStream(new Stream.PassThrough(), ['ki','koo','lol']));
Expand All @@ -167,10 +162,6 @@ describe('StreamQueue', function() {
done();
}));
queue.done();
process.nextTick(function() {
erroredStream.emit('error', new Error('Aouch !'));
erroredStream.end();
});
});

});
Expand Down Expand Up @@ -238,10 +229,9 @@ describe('StreamQueue', function() {
});

it('should reemit errors', function(done) {
var erroredStream = new Stream.PassThrough();
var gotError = false;
var queue = new StreamQueue();
queue.queue(erroredStream);
queue.queue(erroredStream('Aouch!'));
queue.queue(writeToStreamSync(new Stream.PassThrough(), ['wa','dup']));
queue.queue(writeToStreamSync(new Stream.PassThrough(), ['pl','op']));
queue.queue(writeToStreamSync(new Stream.PassThrough(), ['ki','koo','lol']));
Expand All @@ -257,10 +247,158 @@ describe('StreamQueue', function() {
done();
}));
queue.done();
setImmediate(function() {
erroredStream.emit('error', new Error('Aouch!'));
erroredStream.end();
});

});

describe('and with functions returning streams', function() {

it('should work with functionnal API', function(done) {
StreamQueue(function() {
return writeToStream(new Stream.PassThrough(), ['wa','dup']);
}, function() {
return writeToStream(new Stream.PassThrough(), ['pl','op']);
}, function() {
return writeToStream(new Stream.PassThrough(), ['ki','koo','lol']);
}).pipe(es.wait(function(err, data) {
assert.equal(err, null);
assert.equal(data, 'wadupplopkikoolol');
done();
}));
});

it('should work with functionnal API and options', function(done) {
var stream1 = new Stream.PassThrough()
, stream2 = new Stream.PassThrough()
, stream3 = new Stream.PassThrough()
;
StreamQueue({pause: true},
function() {
return writeToStream(new Stream.PassThrough(), ['wa','dup']);
}, function() {
return writeToStream(new Stream.PassThrough(), ['pl','op']);
}, function() {
return writeToStream(new Stream.PassThrough(), ['ki','koo','lol']);
})
.pipe(es.wait(function(err, data) {
assert.equal(err, null);
assert.equal(data, 'wadupplopkikoolol');
done();
}));
});

it('should work with POO API', function(done) {
var queue = new StreamQueue();
queue.queue(function() {
return writeToStream(new Stream.PassThrough(), ['wa','dup']);
});
queue.queue(function() {
return writeToStream(new Stream.PassThrough(), ['pl','op']);
});
queue.queue(function() {
return writeToStream(new Stream.PassThrough(), ['ki','koo','lol']);
});
assert.equal(queue.length, 3);
queue.pipe(es.wait(function(err, data) {
assert.equal(err, null);
assert.equal(data, 'wadupplopkikoolol');
done();
}));
queue.done();
});

it('should pause streams in flowing mode', function(done) {
var queue = new StreamQueue({
pauseFlowingStream: true,
resumeFlowingStream: true
});
queue.queue(function() {
return readableStream(['wa','dup']);
});
queue.queue(function() {
return writeToStream(new Stream.PassThrough(), ['pl','op']);
});
queue.queue(function() {
return writeToStream(new Stream.PassThrough(), ['ki','koo','lol']);
});
assert.equal(queue.length, 3);
queue.pipe(es.wait(function(err, data) {
assert.equal(err, null);
assert.equal(data, 'wadupplopkikoolol');
done();
}));
queue.done();
});

it('should work with POO API and options', function(done) {
var queue = new StreamQueue({
pauseFlowingStream: true,
resumeFlowingStream: true
});
queue.queue(function() {
return writeToStream(new Stream.PassThrough(), ['wa','dup']);
});
queue.queue(function() {
return writeToStream(new Stream.PassThrough(), ['pl','op'])
});
queue.queue(function() {
return writeToStream(new Stream.PassThrough(), ['ki','koo','lol'])
});
assert.equal(queue.length, 3);
queue.pipe(es.wait(function(err, data) {
assert.equal(err, null);
assert.equal(data, 'wadupplopkikoolol');
done();
}));
queue.done();
});

it('should work with POO API and a late done call', function(done) {
var queue = new StreamQueue();
queue.queue(function() {
return writeToStream(new Stream.PassThrough(), ['wa','dup']);
});
queue.queue(function() {
return writeToStream(new Stream.PassThrough(), ['pl','op'])
});
queue.queue(function() {
return writeToStream(new Stream.PassThrough(), ['ki','koo','lol'])
});
assert.equal(queue.length, 3);
queue.pipe(es.wait(function(err, data) {
assert.equal(err, null);
assert.equal(data, 'wadupplopkikoolol');
done();
}));
setTimeout(function() {
queue.done();
}, 100);
});

it('should reemit errors', function(done) {
var gotError = false;
var queue = new StreamQueue();
queue.queue(erroredStream('Aouch!'));
queue.queue(function() {
return writeToStream(new Stream.PassThrough(), ['wa','dup']);
});
queue.queue(function() {
return writeToStream(new Stream.PassThrough(), ['pl','op'])
});
queue.queue(function() {
return writeToStream(new Stream.PassThrough(), ['ki','koo','lol'])
});
assert.equal(queue.length, 4);
queue.on('error', function(err) {
gotError = true;
});
queue.pipe(es.wait(function(err, data) {
assert(gotError);
assert.equal(err, null);
assert.equal(data, 'wadupplopkikoolol');
done();
}));
queue.done();
});

});
Expand Down

0 comments on commit 46b3e5f

Please sign in to comment.