Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Multi-pipe support

  • Loading branch information...
commit e84c55b7cf018928cb2b58cd77a0055680329973 1 parent 8f20e9f
@isaacs isaacs authored
Showing with 131 additions and 31 deletions.
  1. +52 −29 readable.js
  2. +79 −2 test/basic.js
View
81 readable.js
@@ -14,7 +14,8 @@ function Readable(options) {
this.lowWaterMark = options.lowWaterMark || 1024;
this.buffer = [];
this.length = 0;
- this._dest = null;
+ this._pipes = [];
+ this._flowing = false;
Stream.apply(this);
}
@@ -53,52 +54,74 @@ Readable.prototype.read = function(n) {
return ret;
};
+// abstract method. to be overridden in specific implementation classes.
Readable.prototype._read = function(n, cb) {
process.nextTick(cb.bind(this, new Error('not implemented')));
};
Readable.prototype.pipe = function(dest, opt) {
- if (this._dest) this.unpipe();
-
- this._dest = dest;
- if (opt) this._pipeOpt = opt;
-
- if (!this._pipeEndAdded) {
- this._pipeEndAdded = true;
- this.on('end', function() {
- var dest = this._dest;
- if (dest &&
- (!this._pipeOpt || this._pipeOpt.end !== false) &&
- dest !== process.stdout &&
- dest !== process.stderr) {
- dest.end();
+ var src = this;
+ src._pipes.push(dest);
+ if ((!opt || opt.end !== false) &&
+ dest !== process.stdout &&
+ dest !== process.stderr) {
+ src.once('end', onend);
+ dest.on('unpipe', function(readable) {
+ if (readable === src) {
+ src.removeListener('end', onend);
}
});
}
- this._dest.emit('pipe', this);
- flow.call(this);
+ dest.emit('pipe', src);
+ if (!src._flowing) process.nextTick(flow.bind(src));
return dest;
+
+ function onend() {
+ dest.end();
+ }
};
-function flow() {
+function flow(src) {
+ if (!src) src = this;
var chunk;
var dest;
- while ((dest = this._dest) && (chunk = this.read())) {
- var written = dest.write(chunk);
- if (false === written && this._dest) {
- this._dest.once('drain', flow.bind(this));
- return;
+ var needDrain = 0;
+ while (chunk = src.read()) {
+ src._pipes.forEach(function(dest, i, list) {
+ var written = dest.write(chunk);
+ if (false === written) {
+ needDrain++;
+ dest.once('drain', ondrain);
@Raynos
Raynos added a note

What if dest calls src.unpipe(dest) and never emits drain. Is that an unhandled edgecase or a buggy stream?

@isaacs Admin
isaacs added a note

If dest.write() returns false, then dest must emit drain at some point in the future, or it is a buggy stream, yes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ }
+ });
+ if (needDrain > 0) return;
+ }
+
+ src.once('readable', flow);
+
+ function ondrain() {
+ needDrain--;
+ if (needDrain === 0) {
+ flow(src);
}
}
- this.once('readable', flow);
}
-Readable.prototype.unpipe = function() {
- if (!this._dest) return this;
- var dest = this._dest;
- this._dest = null;
- dest.emit('unpipe', this);
+Readable.prototype.unpipe = function(dest) {
+ if (!dest) {
+ // remove all of them.
+ this._pipes.forEach(function(dest, i, list) {
+ dest.emit('unpipe', this);
+ }, this);
+ this._pipes.length = 0;
+ } else {
+ var i = this._pipes.indexOf(dest);
+ if (i !== -1) {
+ dest.emit('unpipe', this);
+ this._pipes.splice(i, 1);
+ }
+ }
return this;
};
View
81 test/basic.js
@@ -127,7 +127,6 @@ tap.test('pipe', function(t) {
'xxxxx',
'xxxxx' ]
- var EE = require('events').EventEmitter;
var w = new TestWriter;
var flush = true;
w.on('end', function(received) {
@@ -157,7 +156,6 @@ tap.test('pipe', function(t) {
'xxxxx' ];
expect = [ expect.slice(0, SPLIT), expect.slice(SPLIT) ];
- var EE = require('events').EventEmitter;
var w = [ new TestWriter(), new TestWriter() ];
var writes = SPLIT;
@@ -186,3 +184,82 @@ tap.test('pipe', function(t) {
r.pipe(w[0]);
});
});
+
+
+// both writers should get the same exact data.
+tap.test('multipipe', function(t) {
+ var r = new TestReader(5);
+ var w = [ new TestWriter, new TestWriter ];
+
+ var expect = [ 'xxxxx',
+ 'xxxxx',
+ 'xxxxx',
+ 'xxxxx',
+ 'xxxxx',
+ 'xxxxx',
+ 'xxxxx',
+ 'xxxxx',
+ 'xxxxx',
+ 'xxxxx' ];
+
+ var c = 2;
+ w[0].on('end', function(received) {
+ t.same(received, expect, 'first');
+ if (--c === 0) t.end();
+ });
+ w[1].on('end', function(received) {
+ t.same(received, expect, 'second');
+ if (--c === 0) t.end();
+ });
+
+ r.pipe(w[0]);
+ r.pipe(w[1]);
+});
+
+
+[1,2,3,4,5,6,7,8,9].forEach(function(SPLIT) {
+ tap.test('multi-unpipe', function(t) {
+ var r = new TestReader(5);
+
+ // unpipe after 3 writes, then write to another stream instead.
+ var expect = [ 'xxxxx',
+ 'xxxxx',
+ 'xxxxx',
+ 'xxxxx',
+ 'xxxxx',
+ 'xxxxx',
+ 'xxxxx',
+ 'xxxxx',
+ 'xxxxx',
+ 'xxxxx' ];
+ expect = [ expect.slice(0, SPLIT), expect.slice(SPLIT) ];
+
+ var w = [ new TestWriter(), new TestWriter(), new TestWriter() ];
+
+ var writes = SPLIT;
+ w[0].on('write', function() {
+ if (--writes === 0) {
+ r.unpipe();
+ w[0].end();
+ r.pipe(w[1]);
+ }
+ });
+
+ var ended = 0;
+
+ w[0].on('end', function(results) {
+ ended++;
+ t.same(results, expect[0]);
+ });
+
+ w[1].on('end', function(results) {
+ ended++;
+ t.equal(ended, 2);
+ t.same(results, expect[1]);
+ t.end();
+ });
+
+ r.pipe(w[0]);
+ r.pipe(w[2]);
+ });
+});
@Raynos

What if dest calls src.unpipe(dest) and never emits drain. Is that an unhandled edgecase or a buggy stream?

@isaacs

If dest.write() returns false, then dest must emit drain at some point in the future, or it is a buggy stream, yes.

Please sign in to comment.
Something went wrong with that request. Please try again.