diff --git a/pullstream.js b/pullstream.js index 39ec671..43d8195 100644 --- a/pullstream.js +++ b/pullstream.js @@ -11,6 +11,7 @@ function PullStream(opts) { var self = this; Stream.apply(this); this.opts = opts || {}; + this.opts.minBufferSize = this.opts.minBufferSize | (1 * 1024 * 1024); this.opts.maxBufferSize = this.opts.maxBufferSize | (10 * 1024 * 1024); this.readable = false; this.writable = true; @@ -86,6 +87,8 @@ PullStream.prototype.pull = over([ return; } + self._resumeSrcStream(); + if ((len !== null && self._buffer.size() >= len) || (len === null && self._recvEnd)) { self._serviceRequests = null; var results = self._buffer.getContents(len); @@ -99,10 +102,6 @@ PullStream.prototype.pull = over([ } else if (self._recvEnd && self._buffer.size() === 0) { callback(new Error('End of Stream')); self._finish(); - } else { - if (self._srcStream) { - self._srcStream.resume(); - } } } }] @@ -124,6 +123,8 @@ PullStream.prototype.pipe = over([ return; } + self._resumeSrcStream(); + var lenToRemove; if (lenLeft === null) { lenToRemove = self._buffer.size(); @@ -145,10 +146,6 @@ PullStream.prototype.pipe = over([ destStream.end(); destStream = null; } - } else { - if (self._srcStream) { - self._srcStream.resume(); - } } if (self._recvEnd && self._buffer.size() === 0) { @@ -164,6 +161,12 @@ PullStream.prototype.pipe = over([ }] ]); +PullStream.prototype._resumeSrcStream = function () { + if (this._srcStream && this._buffer.size() < this.opts.minBufferSize) { + this._srcStream.resume(); + } +}; + PullStream.prototype.pause = function () { this.paused = true; if (this._srcStream && this._srcStream.pause) {