Skip to content

Commit

Permalink
manage backpressure from source stream
Browse files Browse the repository at this point in the history
  • Loading branch information
joeferner committed Aug 13, 2012
1 parent 7d5d404 commit 963de72
Showing 1 changed file with 14 additions and 1 deletion.
15 changes: 14 additions & 1 deletion pullstream.js
Expand Up @@ -7,9 +7,11 @@ var Stream = require('stream').Stream;
var over = require('over');
var streamBuffers = require("stream-buffers");

function PullStream() {
function PullStream(opts) {
var self = this;
Stream.apply(this);
this.opts = opts || {};
this.opts.maxBufferSize = this.opts.maxBufferSize | (10 * 1024 * 1024);
this.readable = false;
this.writable = true;
this._buffer = new streamBuffers.WritableStreamBuffer();
Expand All @@ -31,6 +33,9 @@ PullStream.prototype._sendPauseBuffer = function () {

PullStream.prototype.write = function (data) {
this._buffer.write(data);
if (this._buffer.maxSize() > this.opts.maxBufferSize && this._srcStream) {
this._srcStream.pause();
}
this.process();
return true;
};
Expand Down Expand Up @@ -94,6 +99,10 @@ PullStream.prototype.pull = over([
} else if (self._recvEnd && self._buffer.size() === 0) {
callback(new Error('End of Stream'));
self._finish();
} else {
if (self._srcStream) {
self._srcStream.resume();
}
}
}
}]
Expand Down Expand Up @@ -136,6 +145,10 @@ PullStream.prototype.pipe = over([
destStream.end();
destStream = null;
}
} else {
if (self._srcStream) {
self._srcStream.resume();
}
}

if (self._recvEnd && self._buffer.size() === 0) {
Expand Down

0 comments on commit 963de72

Please sign in to comment.