Browse files

Abstract out the endReadable function

Mostly because it was happening more than once, which is bad.
  • Loading branch information...
1 parent 5f3e929 commit 48c8a2e04f16ff8dfe474e641ecf9fb753a5b78d @isaacs isaacs committed Oct 3, 2012
Showing with 27 additions and 6 deletions.
  1. +27 −6 readable.js
View
33 readable.js
@@ -22,6 +22,7 @@ function ReadableState(options, stream) {
this.pipes = [];
this.flowing = false;
this.ended = false;
+ this.endEmitted = false;
this.stream = stream;
this.reading = false;
@@ -40,12 +41,12 @@ Readable.prototype.read = function(n) {
var state = this._readableState;
if (state.length === 0 && state.ended) {
- process.nextTick(this.emit.bind(this, 'end'));
+ endReadable(this);
return null;
}
if (isNaN(n) || n <= 0)
- n = state.length
+ n = state.length;
// XXX: controversial.
// don't have that much. return null, unless we've ended.
@@ -89,13 +90,24 @@ Readable.prototype.read = function(n) {
// 'readable' now to make sure it gets picked up.
if (state.length > 0)
this.emit('readable');
+ else
+ endReadable(this);
+
return;
}
state.length += chunk.length;
state.buffer.push(chunk);
- if (state.length < state.lowWaterMark)
+
+ // if we haven't gotten enough to pass the lowWaterMark,
+ // and we haven't ended, then don't bother telling the user
+ // that it's time to read more data. Otherwise, that'll
+ // probably kick off another stream.read(), which can trigger
+ // another _read(n,cb) before this one returns!
+ if (state.length < state.lowWaterMark) {
this._read(state.bufferSize, onread.bind(this));
+ return;
+ }
// now we have something to call this.read() to get.
if (state.needReadable) {
@@ -124,7 +136,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
state.pipes.push(dest);
if ((!pipeOpts || pipeOpts.end !== false) &&
- dest !== process.stdout &&
+ dest !== process.stdout &&
dest !== process.stderr) {
src.once('end', onend);
dest.on('unpipe', function(readable) {
@@ -289,7 +301,7 @@ Readable.prototype.wrap = function(stream) {
stream.on('end', function() {
state.ended = true;
if (state.length === 0)
- this.emit('end');
+ endReadable(this);
}.bind(this));
stream.on('data', function(chunk) {
@@ -349,7 +361,7 @@ Readable.prototype.wrap = function(stream) {
}
if (state.length === 0 && state.ended)
- process.nextTick(this.emit.bind(this, 'end'));
+ endReadable(this);
return ret;
};
@@ -411,3 +423,12 @@ function fromList(n, list, length) {
return ret;
}
+
+function endReadable(stream) {
+ var state = stream._readableState;
+ if (state.endEmitted)
+ return;
+ state.ended = true;
+ state.endEmitted = true;
+ process.nextTick(stream.emit.bind(stream, 'end'));
+}

0 comments on commit 48c8a2e

Please sign in to comment.