Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Commit

Permalink
stream: Use push() for Transform._output()
Browse files Browse the repository at this point in the history
This also slightly changes the semantics, in that a 'readable'
event may be triggered by the first write() call, even if a
user has not yet called read().

This happens because the Transform _write() handler is calling
read(0) to start the flow of data.  Technically, the new behavior
is more 'correct', since it is more in line with the semantics
of the 'readable' event in other streams.
  • Loading branch information
isaacs committed Jan 10, 2013
1 parent 530585b commit b43e544
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 51 deletions.
44 changes: 2 additions & 42 deletions lib/_stream_transform.js
Expand Up @@ -75,7 +75,7 @@ function TransformState(stream) {
this.transforming = false;
this.pendingReadCb = null;
this.output = function(chunk) {
stream._output(chunk);
stream.push(chunk);
};
}

Expand Down Expand Up @@ -148,9 +148,6 @@ Transform.prototype._read = function(n, readcb) {
var rs = this._readableState;
var ts = this._transformState;

if (ts.pendingReadCb)
throw new Error('_read while _read already in progress');

ts.pendingReadCb = readcb;

// if there's nothing pending, then we just wait.
Expand All @@ -173,31 +170,6 @@ Transform.prototype._read = function(n, readcb) {
});
};

Transform.prototype._output = function(chunk) {
if (!chunk || !chunk.length)
return;

// if we've got a pending readcb, then just call that,
// and let Readable take care of it. If not, then we fill
// the readable buffer ourselves, and emit whatever's needed.
var ts = this._transformState;
var readcb = ts.pendingReadCb;
if (readcb) {
ts.pendingReadCb = null;
readcb(null, chunk);
return;
}

// otherwise, it's up to us to fill the rs buffer.
var rs = this._readableState;
var len = rs.length;
rs.buffer.push(chunk);
rs.length += chunk.length;
if (rs.needReadable) {
rs.needReadable = false;
this.emit('readable');
}
};

function done(stream, er) {
if (er)
Expand All @@ -215,17 +187,5 @@ function done(stream, er) {
if (ts.transforming)
throw new Error('calling transform done when still transforming');

// if we were waiting on a read, let them know that it isn't coming.
var readcb = ts.pendingReadCb;
if (readcb)
return readcb();

rs.ended = true;
// we may have gotten a 'null' read before, and since there is
// no more data coming from the writable side, we need to emit
// now so that the consumer knows to pick up the tail bits.
if (rs.length && rs.needReadable)
stream.emit('readable');
else if (rs.length === 0)
stream.emit('end');
return stream.push(null);
}
26 changes: 17 additions & 9 deletions test/simple/test-stream2-transform.js
Expand Up @@ -215,35 +215,40 @@ test('passthrough event emission', function(t) {
var i = 0;

pt.write(new Buffer('foog'));

console.error('need emit 0');
pt.write(new Buffer('bark'));

console.error('should have emitted readable now 1 === %d', emits);
t.equal(emits, 1);

t.equal(pt.read(5).toString(), 'foogb');
t.equal(pt.read(5) + '', 'null');

console.error('need emit 0');
console.error('need emit 1');

pt.write(new Buffer('bazy'));
console.error('should have emitted, but not again');
pt.write(new Buffer('kuel'));

console.error('should have emitted readable now 1 === %d', emits);
t.equal(emits, 1);
console.error('should have emitted readable now 2 === %d', emits);
t.equal(emits, 2);

t.equal(pt.read(5).toString(), 'arkba');
t.equal(pt.read(5).toString(), 'zykue');
t.equal(pt.read(5), null);

console.error('need emit 1');
console.error('need emit 2');

pt.end();

t.equal(emits, 2);
t.equal(emits, 3);

t.equal(pt.read(5).toString(), 'l');
t.equal(pt.read(5), null);

console.error('should not have emitted again');
t.equal(emits, 2);
t.equal(emits, 3);
t.end();
});

Expand All @@ -256,25 +261,28 @@ test('passthrough event emission reordered', function(t) {
});

pt.write(new Buffer('foog'));
console.error('need emit 0');
pt.write(new Buffer('bark'));
console.error('should have emitted readable now 1 === %d', emits);
t.equal(emits, 1);

t.equal(pt.read(5).toString(), 'foogb');
t.equal(pt.read(5), null);

console.error('need emit 0');
console.error('need emit 1');
pt.once('readable', function() {
t.equal(pt.read(5).toString(), 'arkba');

t.equal(pt.read(5), null);

console.error('need emit 1');
console.error('need emit 2');
pt.once('readable', function() {
t.equal(pt.read(5).toString(), 'zykue');
t.equal(pt.read(5), null);
pt.once('readable', function() {
t.equal(pt.read(5).toString(), 'l');
t.equal(pt.read(5), null);
t.equal(emits, 3);
t.equal(emits, 4);
t.end();
});
pt.end();
Expand Down

0 comments on commit b43e544

Please sign in to comment.