Permalink
Browse files

Async matchers, allow pipeline of length 1

  • Loading branch information...
1 parent 2506533 commit 048c44793d97a587b28f980cce78e0f84de543d9 @mixu committed Feb 20, 2015
Showing with 44 additions and 20 deletions.
  1. +9 −2 index.js
  2. +35 −18 lib/match.js
View
@@ -150,6 +150,9 @@ exports.combine = function(writable, readable) {
}
var stream = exports.duplex.obj(function(chunk, enc, done) {
+ if (!writable.writable) {
+ return done(); // if the stream has already ended, stop writing to it
+ }
// Node 0.8.x writable streams do not accept the third parameter, done
var ok = writable.write(chunk, enc);
if (ok) {
@@ -332,8 +335,12 @@ exports.tail = function() {
};
exports.pipeline = function() {
- var streams = exports.pipe((Array.isArray(arguments[0]) ? arguments[0] : Array.prototype.slice.call(arguments))),
- last = streams[streams.length - 1],
+ var streams = exports.pipe((Array.isArray(arguments[0]) ? arguments[0] : Array.prototype.slice.call(arguments)));
+ if (streams.length === 1) {
+ return streams[0];
+ }
+
+ var last = streams[streams.length - 1],
isDuplex = isStream.isDuplex(last),
head = isDuplex ? exports.combine(streams[0], last) : exports.cap(streams[0]);
View
@@ -12,6 +12,7 @@ function Match(opts) {
this._writables = opts.streams;
this._rest = opts.rest;
this._ok = this._writables.map(function() { return true; });
+ this._counter = 1;
this._writables.forEach(function(stream, i) {
stream.on('drain', function() {
@@ -28,29 +29,45 @@ function Match(opts) {
}
Match.prototype._write = function(chunk, enc, done) {
- var i, stream,
- self = this;
- for (i = 0; i < this._conditions.length; i++) {
- if (this._conditions[i](chunk)) {
- stream = this._writables[i];
- break;
+ var stream,
+ i = -1,
+ self = this,
+ counter = this.counter++;
+
+ // allow async matchers (arity = 3)
+ next(false);
+
+ function next(result) {
+ if (result) {
+ stream = self._writables[i];
+ return last();
+ }
+ i++;
+ if (i < self._conditions.length) {
+ if (self._conditions[i].length === 3) {
+ self._conditions[i](chunk, counter, next);
+ } else {
+ next(self._conditions[i](chunk, counter));
+ }
}
}
- // no match -> call done
- if (!stream) {
- return done();
- }
+ function last() {
+ // no match -> call done
+ if (!stream) {
+ return done();
+ }
- function write() {
- if (!self._ok[i]) {
- stream.once('drain', write);
- } else {
- self._ok[i] = stream.write(chunk);
- done();
+ function write() {
+ if (!self._ok[i]) {
+ stream.once('drain', write);
+ } else {
+ self._ok[i] = stream.write(chunk);
+ done();
+ }
}
- }
- write();
+ write();
+ }
};
module.exports = Match;

0 comments on commit 048c447

Please sign in to comment.