Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Better stream.pipe() tracking.

This commit does three things:

1. Uses an exposed counter rather than a hidden array for tracking dest
streams that may have multiple inputs.  This allows for significantly
faster lookups, since the counter can be checked in constant time rather
than searching an array for the dest object.  (A proper O(1) WeakMap
would be better, but that may have to wait for Harmony.)

2. Calls the 'end' event logic when there is an 'error' event on the
source object (and then throws if there are no other error listeners.)
This is important, because otherwise 'error' events would lead to
memory leaks.

3. Clean up the style a bit.  Function Declarations are not allowed
within blocks in ES strict.  Prefer Function Declarations to Function
Expressions, because hoisting allows for more expressive ordering of
logic.

Downside: It adds "_pipeCount" as part of the Stream API.  It'll work
fine if the member is missing, but if anyone tries to use it for some
other purpose, it can mess things up.
  • Loading branch information...
commit f4fcd6f176c7307ca13dad74b9086bac3ebbea3c 1 parent bbffd9e
@isaacs authored
Showing with 43 additions and 25 deletions.
  1. +43 −25 lib/stream.js
View
68 lib/stream.js
@@ -28,13 +28,9 @@ function Stream() {
util.inherits(Stream, events.EventEmitter);
exports.Stream = Stream;
-var pipes = [];
-
Stream.prototype.pipe = function(dest, options) {
var source = this;
- pipes.push(dest);
-
function ondata(chunk) {
if (dest.writable) {
if (false === dest.write(chunk)) source.pause();
@@ -49,31 +45,48 @@ Stream.prototype.pipe = function(dest, options) {
dest.on('drain', ondrain);
- /*
- * If the 'end' option is not supplied, dest.end() will be called when
- * source gets the 'end' event.
- */
-
+ // If the 'end' option is not supplied, dest.end() will be called when
+ // source gets the 'end' or 'close' events. Only dest.end() once, and
+ // only when all sources have ended.
if (!options || options.end !== false) {
- function onend() {
- var index = pipes.indexOf(dest);
- pipes.splice(index, 1);
+ dest._pipeCount = dest._pipeCount || 0;
+ dest._pipeCount++;
+
+ source.on('end', onend);
+ source.on('close', onend);
+ }
+
+ var didOnEnd = false;
+ function onend() {
+ if (didOnEnd) return;
+ didOnEnd = true;
+
+ dest._pipeCount--;
- if (pipes.indexOf(dest) > -1) {
- return;
- }
+ // remove the listeners
+ cleanup();
- dest.end();
+ if (dest._pipeCount > 0) {
+ // waiting for other incoming streams to end.
+ return;
}
- source.on('end', onend);
- source.on('close', onend);
+ dest.end();
+ }
+
+ // don't leave dangling pipes when there are errors.
+ function onerror(er) {
+ cleanup();
+ if (this.listeners('error').length === 1) {
+ throw er; // Unhandled stream error in pipe.
+ }
}
- /*
- * Questionable:
- */
+ source.on('error', onerror);
+ dest.on('error', onerror);
+ // guarantee that source streams can be paused and resumed, even
+ // if the only effect is to proxy the event back up the pipe chain.
if (!source.pause) {
source.pause = function() {
source.emit('pause');
@@ -86,27 +99,32 @@ Stream.prototype.pipe = function(dest, options) {
};
}
- var onpause = function() {
+ function onpause() {
source.pause();
}
dest.on('pause', onpause);
- var onresume = function() {
+ function onresume() {
if (source.readable) source.resume();
- };
+ }
dest.on('resume', onresume);
- var cleanup = function () {
+ // remove all the event listeners that were added.
+ function cleanup() {
source.removeListener('data', ondata);
dest.removeListener('drain', ondrain);
+
source.removeListener('end', onend);
source.removeListener('close', onend);
dest.removeListener('pause', onpause);
dest.removeListener('resume', onresume);
+ source.removeListener('error', onerror);
+ dest.removeListener('error', onerror);
+
source.removeListener('end', cleanup);
source.removeListener('close', cleanup);
Please sign in to comment.
Something went wrong with that request. Please try again.