Skip to content

Commit

Permalink
Fix: Multiple pipes to the same stream were broken
Browse files Browse the repository at this point in the history
When creating multiple .pipe()s to the same destination stream, the
first source to end would close the destination, breaking all remaining
pipes. This patch fixes the problem by keeping track of all open
pipes, so that we only call end on destinations that have no more
sources piping to them.

closes nodejs#929
  • Loading branch information
felixge authored and ry committed Apr 14, 2011
1 parent 8417870 commit 6c5b31b
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 9 deletions.
29 changes: 21 additions & 8 deletions lib/stream.js
Expand Up @@ -28,9 +28,13 @@ function Stream() {
util.inherits(Stream, events.EventEmitter);
exports.Stream = Stream;

var pipes = [];

Stream.prototype.pipe = function(dest, options) {
var source = this;

pipes.push(dest);

function ondata(chunk) {
if (dest.writable) {
if (false === dest.write(chunk)) source.pause();
Expand All @@ -52,10 +56,18 @@ Stream.prototype.pipe = function(dest, options) {

if (!options || options.end !== false) {
function onend() {
var index = pipes.indexOf(dest);
pipes.splice(index, 1);

if (pipes.indexOf(dest) > -1) {
return;
}

dest.end();
}

source.on('end', onend);
source.on('close', onend);
}

/*
Expand All @@ -73,34 +85,35 @@ Stream.prototype.pipe = function(dest, options) {
source.emit('resume');
};
}

var onpause = function() {
source.pause();
}

dest.on('pause', onpause);

var onresume = function() {
if (source.readable) source.resume();
};

dest.on('resume', onresume);

var cleanup = function () {
source.removeListener('data', ondata);
dest.removeListener('drain', ondrain);
source.removeListener('end', onend);

source.removeListener('close', onend);

dest.removeListener('pause', onpause);
dest.removeListener('resume', onresume);

source.removeListener('end', cleanup);
source.removeListener('close', cleanup);

dest.removeListener('end', cleanup);
dest.removeListener('close', cleanup);
}

source.on('end', cleanup);
source.on('close', cleanup);

Expand Down
21 changes: 20 additions & 1 deletion test/simple/test-stream-pipe-cleanup.js
Expand Up @@ -28,10 +28,13 @@ var util = require('util');

function Writable () {
this.writable = true;
this.endCalls = 0;
stream.Stream.call(this);
}
util.inherits(Writable, stream.Stream);
Writable.prototype.end = function () {}
Writable.prototype.end = function () {
this.endCalls++;
}

function Readable () {
this.readable = true;
Expand All @@ -56,13 +59,29 @@ for (i = 0; i < limit; i++) {
r.emit('end')
}
assert.equal(0, r.listeners('end').length);
assert.equal(limit, w.endCalls);

w.endCalls = 0;

for (i = 0; i < limit; i++) {
r = new Readable()
r.pipe(w)
r.emit('close')
}
assert.equal(0, r.listeners('close').length);
assert.equal(limit, w.endCalls);

w.endCalls = 0;

var r2;
r = new Readable()
r2 = new Readable();

r.pipe(w)
r2.pipe(w)
r.emit('close')
r2.emit('close')
assert.equal(1, w.endCalls);

r = new Readable();

Expand Down

0 comments on commit 6c5b31b

Please sign in to comment.