Permalink
Browse files

Add merge, forkMerge and matchMerge functions

  • Loading branch information...
1 parent 9bdd7e4 commit a222e57d2a44368e9d4b02af7c31ecb8a722b5e1 @mixu committed Nov 8, 2014
Showing with 266 additions and 26 deletions.
  1. +1 −0 .gitignore
  2. +33 −24 index.js
  3. +1 −1 lib/duplex.js
  4. +1 −0 package.json
  5. +56 −1 readme.md
  6. +162 −0 test/control-flow.test.js
  7. +12 −0 test/io-construct.test.js
View
@@ -1 +1,2 @@
node_modules/
+output/
View
@@ -1,6 +1,7 @@
var through = require('through2'),
cloneLib = require('clone'),
- Readable = require('readable-stream').Readable;
+ Readable = require('readable-stream').Readable,
+ xtend = require('xtend');
var isStream = require('./lib/is-stream.js'),
Match = require('./lib/match.js');
@@ -30,6 +31,7 @@ exports.reduce = function(fn, initial) {
var index = 0,
captureFirst = (arguments.length < 2),
acc = (!captureFirst ? initial : null);
+
return through.obj(function(obj, enc, onDone) {
if (captureFirst) {
acc = obj;
@@ -129,7 +131,7 @@ exports.writable = require('./lib/writable.js');
exports.readable = require('./lib/readable.js');
exports.duplex = require('./lib/duplex.js');
-// based on https://github.com/deoxxa/duplexer2/pull/6
+// based on https://github.com/deoxxa/duplexer2/pull/6 (with an additional bugfix)
exports.combine = function(writable, readable) {
if (!isStream.isWritable(writable)) {
throw new Error('The first stream must be writable.');
@@ -147,39 +149,34 @@ exports.combine = function(writable, readable) {
readable = new Readable().wrap(readable);
}
- var shouldRead = false,
- stream = exports.duplex.obj(function(chunk, enc, done) {
+ var stream = exports.duplex.obj(function(chunk, enc, done) {
// Node 0.8.x writable streams do not accept the third parameter, done
var ok = writable.write(chunk, enc);
if (ok) {
done();
} else {
writable.once('drain', done);
}
- }, function() {
- if (shouldRead) { return; }
- shouldRead = true;
- forwardRead();
- });
+ }, forwardRead);
writable.once('finish', function() { stream.end(); });
stream.once('finish', function() { writable.end(); });
- writable.on('error', function(err) { return stream.emit('error', err); });
- readable.on('readable', forwardRead);
- readable.once('end', function() { return stream.push(null); });
- readable.on('error', function(err) { return stream.emit('error', err); });
+ readable.once('end', function() { stream.push(null); });
+
+ writable.on('error', function(err) { stream.emit('error', err); });
+ readable.on('error', function(err) { stream.emit('error', err); });
function forwardRead() {
- if (!shouldRead) { return; }
var data, waitingToRead = true;
while ((data = readable.read()) !== null) {
waitingToRead = false;
stream.push(data);
}
- shouldRead = waitingToRead;
+ if (waitingToRead) {
+ readable.once('readable', forwardRead);
+ }
}
-
return stream;
};
@@ -228,9 +225,8 @@ exports.fork = function() {
function trueFn() { return true; }
-exports.match = function() {
- var args = (Array.isArray(arguments[0]) ? arguments[0] : Array.prototype.slice.call(arguments)),
- conditions = [],
+function parseMatch(args) {
+ var conditions = [],
streams = [],
i = 0;
@@ -246,12 +242,25 @@ exports.match = function() {
conditions.push(trueFn);
streams.push(args[i]);
}
+ return { conditions: conditions, streams: streams };
+}
- return new Match({
- objectMode: true,
- conditions: conditions,
- streams: streams
- });
+exports.match = function() {
+ var args = (Array.isArray(arguments[0]) ? arguments[0] : Array.prototype.slice.call(arguments));
+ return new Match(xtend({ objectMode: true }, parseMatch(args)));
+};
+
+exports.merge = require('merge-stream');
+
+exports.forkMerge = function() {
+ var args = (Array.isArray(arguments[0]) ? arguments[0] : Array.prototype.slice.call(arguments));
+ return exports.combine(exports.fork(args), exports.merge(args));
+};
+
+exports.matchMerge = function() {
+ var args = (Array.isArray(arguments[0]) ? arguments[0] : Array.prototype.slice.call(arguments)),
+ parsed = xtend({ objectMode: true }, parseMatch(args));
+ return exports.combine(new Match(parsed), exports.merge(parsed.streams));
};
// Constructing pipelines from individual elements
View
@@ -10,7 +10,7 @@ function ctor(options, _write, _read) {
}
if (typeof _write !== 'function') {
- throw new Error('You must implement an _read function');
+ throw new Error('You must implement a _write function');
}
if (typeof _read !== 'function') {
View
@@ -24,6 +24,7 @@
"homepage": "https://github.com/mixu/pipe-iterators",
"dependencies": {
"clone": "0.1.18",
+ "merge-stream": "0.1.6",
"readable-stream": "*",
"through2": "*",
"xtend": "4.0.0"
View
@@ -12,6 +12,13 @@ Preamble:
var pi = require('pipe-iterators');
```
+## Changelog
+
+`v1.1.0`:
+
+- added the `merge`, `forkMerge` and `matchMerge` functions.
+- fixed a bug in `pipeline`.
+
## Iteration functions
The iterator functions closely follow the native `Array.*` iteration API (e.g. `forEach`, `map`, `filter`), but the functions return object mode streams instead of operating on arrays.
@@ -86,7 +93,7 @@ pi.mapKey(key, callback, [thisArg])
pi.mapKey(hash, [thisArg])
```
-Returns a duplex stream which produces a new stream of values by mapping a single key (when given `key` and `callback`) or multiple keys (when given `hash`) through a transformation callback. The callback is invoked with two arguments: `value` (the value `element[key]`), and `obj` (the element itself). The return value from the callback is set on the element, and the element itself is written back to the stream.
+Returns a duplex stream which produces a new stream of values by mapping a single key (when given `key` and `callback`) or multiple keys (when given `hash`) through a transformation callback. The callback is invoked with three arguments: `value` (the value `element[key]`), `obj` (the element itself) and `index` (the element index). The return value from the callback is set on the element, and the element itself is written back to the stream.
If `thisArg` is provided, it is available as `this` within the callback.
@@ -304,6 +311,53 @@ pi.fromArray([
));
```
+## merge
+
+```js
+pi.merge(stream1, [stream2], [...])
+pi.merge([ stream1, stream2, ... ])
+```
+
+Takes multiple readable streams and merges them into one stream. Accepts any number of readable streams and returns a duplex stream.
+
+# forkMerge
+
+```js
+pi.forkMerge(stream1, [stream2], [...])
+pi.forkMerge([ stream1, stream2, ... ])
+```
+
+Fork followed by merge on a set of streams. Accepts any number of duplex streams; returns a duplex stream that:
+
+- `fork`s each input, writes each input into the streams,
+- reads and `merge`s the inputs from the streams and writes them out
+
+Useful if you need to concurrently apply different operations on a single input but want to produce a single merged output.
+
+```
+ / to-html() \
+read .md() - to-pdf() - write-to-disk()
+ \ to-rtf() /
+```
+
+For example, imagine converting a set of Markdown files into the HTML, PDF and RTF formats - the same file goes in, each of the processing operations are applied, but at the end there are three objects (binary files in the different formats) that go into the same "write to disk" pipeline.
+
+# matchMerge
+
+```js
+pi.matchMerge(condition1, stream1, [condition2], [stream2], [...], [rest])
+pi.matchMerge([ condition1, stream1, condition2, stream2, ..., rest ])
+```
+
+Match followed by merge on a set of streams. Accepts any number of duplex streams; returns a duplex stream that:
+
+- `match`es conditions, selects the correct stream and writes to that stream
+- reads and `merge`s the inputs from each of the streams and writes them out
+
+Useful if you want to conditionally process some elements differently, while sharing the same downstream pipeline.
+
+For example, if you want to first check a cache and skip some processing for items that hit in the cache, you could do something like `pi.matchMerge(checkCache, getResultFromCache, performFullProcessing)` (where `checkCache` is a function and the other two are through streams).
+
## Constructing pipelines from individual elements
These functions apply `pipe` in various ways to make it easier to go from an array of streams to a pipeline.
@@ -430,3 +484,4 @@ Best handled by something that can do that in an efficient manner, such as [bina
- [KylePDavis/node-stream-utils](https://github.com/KylePDavis/node-stream-utils): duplex() for old (0.8.x) style streams
- [sterpe/composite-pipes](https://github.com/sterpe/composite-pipes)
- [rvagg/isstream](https://github.com/rvagg/isstream)
+- [grncdr/merge-stream](https://github.com/grncdr/merge-stream)
@@ -154,3 +154,165 @@ describe('fork', function() {
});
});
});
+
+describe('merge', function() {
+
+ it('returns a duplex stream', function() {
+ assert.ok(isDuplex(pi.merge()));
+ });
+
+ it('merges multiple streams', function(done) {
+ pi.merge(pi.fromArray(1, 2), pi.fromArray(3, 4), pi.fromArray(5, 6))
+ .pipe(pi.toArray(function(result) {
+ assert.deepEqual(result.sort(), [ 1, 2, 3, 4, 5, 6 ]);
+ done();
+ }));
+ });
+
+ it('merges multiple streams, arg is array', function(done) {
+ pi.merge([pi.fromArray(1, 2), pi.fromArray(3, 4), pi.fromArray(5, 6)])
+ .pipe(pi.toArray(function(result) {
+ assert.deepEqual(result.sort(), [ 1, 2, 3, 4, 5, 6 ]);
+ done();
+ }));
+ });
+
+
+ it('works with just one stream', function(done) {
+ pi.merge(pi.fromArray(1))
+ .pipe(pi.toArray(function(result) {
+ assert.deepEqual(result.sort(), [ 1 ]);
+ done();
+ }));
+ });
+
+ it('works with one empty stream', function(done) {
+ pi.merge(pi.fromArray(1), pi.fromArray(), pi.fromArray(2))
+ .pipe(pi.toArray(function(result) {
+ assert.deepEqual(result.sort(), [ 1, 2 ]);
+ done();
+ }));
+ });
+
+ it('works with just empty streams', function(done) {
+ pi.merge(pi.fromArray(), pi.fromArray())
+ .pipe(pi.toArray(function(result) {
+ assert.deepEqual(result.sort(), []);
+ done();
+ }));
+ });
+
+ it('works in flowing mode', function(done) {
+ var result = [];
+ pi.merge(pi.fromArray(1, 2), pi.fromArray(3, 4), pi.fromArray(5, 6))
+ .on('data', function(data) { result.push(data); })
+ .once('end', function() {
+ assert.deepEqual(result.sort(), [ 1, 2, 3, 4, 5, 6]);
+ done();
+ });
+ });
+
+});
+
+function logEvts(id, stream) {
+ // readable (non-flowing) stream
+ return stream.on('readable', function() {
+ console.log('[' + id +'] "readable"');
+ })
+ .on('end', function() {
+ console.log('[' + id +'] "end"');
+ })
+ .on('close', function() {
+ console.log('[' + id +'] "close"');
+ })
+ .on('error', function(err) {
+ console.log('[' + id +'] "error"', err);
+ })
+ // writable (non-flowing) stream
+ .on('drain', function() {
+ console.log('[' + id +'] "drain"');
+ })
+ .on('finish', function() {
+ console.log('[' + id +'] "finish"');
+ })
+ .on('pipe', function() {
+ console.log('[' + id +'] "pipe"');
+ })
+ .on('unpipe', function() {
+ console.log('[' + id +'] "unpipe"');
+ });
+}
+
+function logStream(id) {
+ return logEvts(id, pi.thru.obj(function(data, enc, done) {
+ console.log('[' + id + '] _transform ' + data);
+ this.push(data);
+ done();
+ }, function(done) {
+ console.log('[' + id +'] _flush');
+ done();
+ }));
+}
+
+
+describe('forkMerge', function() {
+
+ function doubler(val) { return val * 2; }
+ function add100(val) { return val + 100; }
+
+ it('combines a fork stream and a merge stream', function(done) {
+
+ pi.fromArray(1, 2, 3)
+ .pipe(
+ pi.forkMerge(
+ pi.pipeline(pi.map(doubler), pi.map(doubler)),
+ pi.pipeline(pi.map(add100), pi.map(add100))
+ )
+ ).pipe(pi.toArray(function(result) {
+ assert.deepEqual(
+ result.sort(function(a, b){ return a-b; }),
+ [ 4, 8, 12, 201, 202, 203 ]
+ );
+ done();
+ }))
+
+ });
+
+});
+
+describe('matchMerge', function() {
+
+ function add10(val) { return val + 10; }
+ function add100(val) { return val + 100; }
+
+ it('combines a match stream and a merge stream', function(done) {
+
+ pi.fromArray([ 1, 2, 3, 4, 5, 6 ])
+ .pipe(pi.matchMerge(
+ function(obj) { return obj % 2 == 0; },
+ pi.map(add10),
+ function(obj) { return obj % 3 == 0; },
+ pi.map(add100),
+ pi.thru.obj()
+ ))
+ .pipe(pi.toArray(function(result) {
+
+
+ assert.deepEqual(
+ result.sort(function(a, b){ return a-b; }),
+ [
+ 1, // 1 -> 1
+ 5, // 5 -> 5
+
+ 12, // 2 -> + 10 -> 12
+ 14, // 4 -> + 10 -> 14
+ 16, // 6 -> + 10 -> 16
+
+ 103 // 3 -> + 100 -> 103
+ ]
+ );
+ done();
+ }));
+ });
+
+});
Oops, something went wrong.

0 comments on commit a222e57

Please sign in to comment.