Skip to content

Commit

Permalink
Stream callbacks are now async
Browse files Browse the repository at this point in the history
  • Loading branch information
mallocator committed May 12, 2016
1 parent 767dabc commit f76af6d
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 11 deletions.
4 changes: 1 addition & 3 deletions CountStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ class CountStream extends stream.Transform {
var buf = chunk instanceof Buffer ? chunk : new Buffer(chunk, encoding);
this._bytes += buf.length;
if (this.push(buf)) {
cb();
} else {
cb('Target stream not ready');
setImmediate(cb);
}
}

Expand Down
6 changes: 4 additions & 2 deletions MultiplexStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ class MultiplexStream extends stream.Writable {
*
* @param {Buffer|*} chunk
* @param {string} [encoding]
* @private
* @param {function} cb
* @private
*/
_write(chunk, encoding) {
_write(chunk, encoding, cb) {
for (let stream of this._streams) {
stream.write(chunk, encoding);
}
setImmediate(cb);
}

/**
Expand Down
6 changes: 3 additions & 3 deletions OffsetStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ class OffsetStream extends stream.Transform {
var right = left + length;
this._step += length;
if (this._start >= right) {
return cb();
return setImmediate(cb);
}
if (this._end <= left) {
return cb();
return setImmediate(cb);
}
var posStart = 0, posEnd = length;
if(left < this._start && this._start < right) {
Expand All @@ -115,7 +115,7 @@ class OffsetStream extends stream.Transform {
posEnd = this._end - left;
}
this.push(buf.slice(posStart, posEnd));
cb();
setImmediate(cb);
}
}

Expand Down
2 changes: 1 addition & 1 deletion Sink.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Sink extends stream.Writable {

_write(chunk, encoding, cb) {
this._buffer && this._buffer.push(chunk instanceof Buffer ? chunk : new Buffer(chunk, encoding));
cb();
setImmediate(cb);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "more-streams",
"version": "0.5.2",
"version": "0.5.3",
"description": "A collection of useful stream implementations",
"license": "Apache-2.0",
"homepage": "http://github.com/mallocator/more-streams",
Expand Down
2 changes: 1 addition & 1 deletion test/SequenceStream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ describe('SequenceStream', () => {
message += 'msg' + i;
ss.chain(new Writer().write('msg' + i));
}
ss.on('end', () => {
out.on('finish', () => {
expect(out.buffer.length).to.equal(streamNr);
expect(out.message()).to.equal(message);
done();
Expand Down

0 comments on commit f76af6d

Please sign in to comment.