Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

stream: Correct Transform class backpressure

The refactor in b43e544 to use
stream.push() in Transform inadvertently caused it to immediately
consume all the written data, regardless of whether or not the readable
side was being consumed.

Only pull data through the _transform() process when the readable side
is being consumed.

Fix #4667
  • Loading branch information...
commit e0f2a3e834c376a0ca949cfec01dc26825c10697 1 parent acd0df4
@isaacs authored
Showing with 86 additions and 50 deletions.
  1. +59 −50 lib/_stream_transform.js
  2. +27 −0 test/simple/test-stream2-transform.js
View
109 lib/_stream_transform.js
@@ -70,28 +70,63 @@ var Duplex = require('_stream_duplex');
var util = require('util');
util.inherits(Transform, Duplex);
-function TransformState(stream) {
- this.buffer = [];
- this.transforming = false;
- this.pendingReadCb = null;
+
+function TransformState(options, stream) {
+ var ts = this;
this.output = function(chunk) {
+ ts.needTransform = false;
stream.push(chunk);
};
+
+ this.afterTransform = function(er, data) {
+ return afterTransform(stream, er, data);
+ };
+
+ this.needTransform = false;
+ this.transforming = false;
+ this.writecb = null;
+ this.writechunk = null;
+}
+
+function afterTransform(stream, er, data) {
+ var ts = stream._transformState;
+ ts.transforming = false;
+
+ var cb = ts.writecb;
+
+ if (!cb)
+ return this.emit('error', new Error('no writecb in Transform class'));
+
+ ts.writechunk = null;
+ ts.writecb = null;
+
+ if (data !== null && data !== undefined)
+ ts.output(data);
+
+ if (cb)
+ cb(er);
+
+ var rs = stream._readableState;
+ if (rs.needReadable || rs.length < rs.highWaterMark) {
+ stream._read();
+ }
}
+
function Transform(options) {
if (!(this instanceof Transform))
return new Transform(options);
Duplex.call(this, options);
- // bind output so that it can be passed around as a regular function.
+ var ts = this._transformState = new TransformState(options, this);
+
+ // when the writable side finishes, then flush out anything remaining.
var stream = this;
- // the queue of _write chunks that are pending being transformed
- var ts = this._transformState = new TransformState(stream);
+ // start out asking for a readable event once data is transformed.
+ this._readableState.needReadable = true;
- // when the writable side finishes, then flush out anything remaining.
this.once('finish', function() {
if ('function' === typeof this._flush)
this._flush(ts.output, function(er) {
@@ -118,56 +153,30 @@ Transform.prototype._transform = function(chunk, output, cb) {
Transform.prototype._write = function(chunk, cb) {
var ts = this._transformState;
- var rs = this._readableState;
- ts.buffer.push([chunk, cb]);
-
- // no need for auto-pull if already in the midst of one.
+ ts.writecb = cb;
+ ts.writechunk = chunk;
if (ts.transforming)
return;
-
- // now we have something to transform, if we were waiting for it.
- // kick off a _read to pull it in.
- if (ts.pendingReadCb) {
- var readcb = ts.pendingReadCb;
- ts.pendingReadCb = null;
- this._read(0, readcb);
- }
-
- // if we weren't waiting for it, but nothing is queued up, then
- // still kick off a transform, just so it's there when the user asks.
- var doRead = rs.needReadable || rs.length <= rs.highWaterMark;
- if (doRead && !rs.reading) {
- var ret = this.read(0);
- if (ret !== null)
- return cb(new Error('invalid stream transform state'));
- }
+ var rs = this._readableState;
+ if (ts.needTransform || rs.needReadable || rs.length < rs.highWaterMark)
+ this._read();
};
-Transform.prototype._read = function(n, readcb) {
- var ws = this._writableState;
- var rs = this._readableState;
+// Doesn't matter what the args are here.
+// the output and callback functions passed to _transform do all the work.
+// That we got here means that the readable side wants more data.
+Transform.prototype._read = function(n, cb) {
var ts = this._transformState;
- ts.pendingReadCb = readcb;
-
- // if there's nothing pending, then we just wait.
- // if we're already transforming, then also just hold on a sec.
- // we've already stashed the readcb, so we can come back later
- // when we have something to transform
- if (ts.buffer.length === 0 || ts.transforming)
+ if (ts.writechunk && ts.writecb && !ts.transforming) {
+ ts.transforming = true;
+ this._transform(ts.writechunk, ts.output, ts.afterTransform);
return;
+ }
- // go ahead and transform that thing, now that someone wants it
- var req = ts.buffer.shift();
- var chunk = req[0];
- var writecb = req[1];
- ts.transforming = true;
- this._transform(chunk, ts.output, function(er, data) {
- ts.transforming = false;
- if (data)
- ts.output(data);
- writecb(er);
- });
+ // mark that we need a transform, so that any data that comes in
+ // will get processed, now that we've asked for it.
+ ts.needTransform = true;
};
View
27 test/simple/test-stream2-transform.js
@@ -61,6 +61,33 @@ process.nextTick(run);
/////
+test('writable side consumption', function(t) {
+ var tx = new Transform({
+ highWaterMark: 10
+ });
+
+ var transformed = 0;
+ tx._transform = function(chunk, output, cb) {
+ transformed += chunk.length;
+ output(chunk);
+ cb();
+ };
+
+ for (var i = 1; i <= 10; i++) {
+ tx.write(new Buffer(i));
+ }
+ tx.end();
+
+ t.equal(tx._readableState.length, 10);
+ t.equal(transformed, 10);
+ t.equal(tx._transformState.writechunk.length, 5);
+ t.same(tx._writableState.buffer.map(function(c) {
+ return c[0].length;
+ }), [6, 7, 8, 9, 10]);
+
+ t.end();
+});
+
test('passthrough', function(t) {
var pt = new PassThrough();
Please sign in to comment.
Something went wrong with that request. Please try again.