Permalink
Browse files

throwaway

  • Loading branch information...
1 parent 2b3b0c0 commit 82ae1536d350d2d3936b7f1ca2a8e4c1b8f2c217 @isaacs isaacs committed Jul 27, 2012
Showing with 41 additions and 21 deletions.
  1. +41 −21 readable.js
View
62 readable.js
@@ -13,8 +13,8 @@ function Readable(stream) {
}
// override this method.
-Readable.prototype.read = function(n) {
- return null;
+Readable.prototype.read = function(n, cb) {
+ if (cb) process.nextTick(cb.bind(null, null, null));
};
Readable.prototype.pipe = function(dest, opt) {
@@ -28,15 +28,18 @@ Readable.prototype.pipe = function(dest, opt) {
flow.call(this);
function flow() {
- var chunk;
- while (chunk = this.read()) {
+ this.read(function onread(er, chunk) {
+ if (er) return this.emit('error', er);
+
var written = dest.write(chunk);
- if (!written) {
- dest.once('drain', flow.bind(this));
+ if (false === written) {
+ // this backpressure is tricky...
+ // now instead of one dip to the event loop, we have two.
+ dest.once('drain', this.read(onread));
return;
}
- }
- this.once('readable', flow);
+ this.read(onread);
+ })
}
};
@@ -51,30 +54,37 @@ Readable.prototype.addListener = Readable.prototype.on;
function emitDataEvents(stream) {
var paused = false;
- var readable = false;
+ var saved = null;
// convert to an old-style stream.
stream.readable = true;
stream.pipe = Stream.prototype.pipe;
stream.on = stream.addEventListener = Stream.prototype.on;
- stream.on('readable', function() {
- readable = true;
- var c;
- while (!paused && (c = stream.read())) {
- stream.emit('data', c);
- }
- if (c === null) readable = false;
- });
-
stream.pause = function() {
paused = true;
};
stream.resume = function() {
paused = false;
- if (readable) stream.emit('readable');
+ if (saved) {
+ stream.emit('data', saved);
+ saved = null;
+ }
+ stream.read(onread);
};
+
+ stream.read(onread);
+
+ function onread(er, chunk) {
+ if (er) return stream.emit('error', er);
+ if (paused) {
+ saved = chunk;
+ return;
+ }
+ stream.emit('data', chunk);
+ stream.read(onread);
+ }
}
// wrap an old-style stream
@@ -123,8 +133,18 @@ Readable.prototype.wrap = function(stream) {
// consume some bytes. if not all is consumed, then
// pause the underlying stream.
- this.read = function(n) {
- if (this._bufferLength === 0) return null;
+ this.read = function(n, cb) {
+ if (typeof cb !== 'function') {
+ cb = n;
+ n = 0;
+ }
+
+ if (this._bufferLength === 0) {
+ // nothing to read yet.
+ this.once('readable', function() {
+ cb(null, this._read(n));
+ });
+ }
if (isNaN(n) || n <= 0) n = this._bufferLength;

0 comments on commit 82ae153

Please sign in to comment.