Permalink
Browse files

updates from node core streams2 branch

  • Loading branch information...
1 parent ffa2bed commit e8efa38f2dd2db8df2d654809b8ab58f5764047e @isaacs isaacs committed Oct 9, 2012
Showing with 184 additions and 60 deletions.
  1. +24 −0 duplex.js
  2. +25 −4 passthrough.js
  3. +98 −46 readable.js
  4. +20 −2 transform.js
  5. +17 −8 writable.js
View
@@ -1,3 +1,24 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
// a duplex stream is just a stream that is both readable and writable.
// Since JS doesn't have multiple prototypal inheritance, this class
// prototypally inherits from Readable, and then parasitically from
@@ -16,6 +37,9 @@ Object.keys(Writable.prototype).forEach(function(method) {
});
function Duplex(options) {
+ if (!(this instanceof Duplex))
+ return new Duplex(options);
+
Readable.call(this, options);
Writable.call(this, options);
View
@@ -1,20 +1,41 @@
-'use strict';
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
// a passthrough stream.
// basically just the most minimal sort of Transform stream.
// Every written chunk gets output as-is.
module.exports = PassThrough;
var Transform = require('./transform.js');
-
var util = require('util');
util.inherits(PassThrough, Transform);
function PassThrough(options) {
+ if (!(this instanceof PassThrough))
+ return new PassThrough(options);
+
Transform.call(this, options);
}
PassThrough.prototype._transform = function(chunk, output, cb) {
- output(chunk);
- cb();
+ cb(null, chunk);
};
View
@@ -28,22 +28,22 @@ var StringDecoder;
util.inherits(Readable, Stream);
-function ReadableState(options, stream) {
+function ReadableState(options) {
options = options || {};
this.bufferSize = options.bufferSize || 16 * 1024;
assert(typeof this.bufferSize === 'number');
// cast to an int
this.bufferSize = ~~this.bufferSize;
- this.lowWaterMark = options.lowWaterMark || 1024;
+ this.lowWaterMark = options.hasOwnProperty('lowWaterMark') ?
+ options.lowWaterMark : 1024;
this.buffer = [];
this.length = 0;
this.pipes = [];
this.flowing = false;
this.ended = false;
this.endEmitted = false;
- this.stream = stream;
this.reading = false;
// whenever we return null, then we set a flag to say
@@ -59,6 +59,9 @@ function ReadableState(options, stream) {
}
function Readable(options) {
+ if (!(this instanceof Readable))
+ return new Readable(options);
+
this._readableState = new ReadableState(options, this);
Stream.apply(this);
}
@@ -70,52 +73,76 @@ Readable.prototype.setEncoding = function(enc) {
this._readableState.decoder = new StringDecoder(enc);
};
-// you can override either this method, or _read(n, cb) below.
-Readable.prototype.read = function(n) {
- var state = this._readableState;
- if (state.length === 0 && state.ended) {
- endReadable(this);
- return null;
- }
+function howMuchToRead(n, state) {
+ if (state.length === 0 && state.ended)
+ return 0;
+
+ if (isNaN(n))
+ return state.length;
- if (isNaN(n) || n <= 0)
- n = state.length;
+ if (n <= 0)
+ return 0;
- // XXX: controversial.
// don't have that much. return null, unless we've ended.
- // However, if the low water mark is lower than the number of bytes,
- // then we still need to return what we have, or else it won't kick
- // off another _read() call. For example,
- // lwm=5
- // len=9
- // read(10)
- // We don't have that many bytes, so it'd be tempting to return null,
- // but then it won't ever cause _read to be called, so in that case,
- // we just return what we have, and let the programmer deal with it.
if (n > state.length) {
- if (!state.ended && state.length < state.lowWaterMark) {
+ if (!state.ended) {
state.needReadable = true;
- n = 0;
+ return 0;
} else
- n = state.length;
+ return state.length;
}
+ return n;
+}
- var ret;
- if (n > 0)
- ret = fromList(n, state.buffer, state.length, !!state.decoder);
- else
- ret = null;
+// you can override either this method, or _read(n, cb) below.
+Readable.prototype.read = function(n) {
+ var state = this._readableState;
+ var nOrig = n;
- if (ret === null || ret.length === 0)
- state.needReadable = true;
+ n = howMuchToRead(n, state);
- state.length -= n;
+ // if we've ended, and we're now clear, then finish it up.
+ if (n === 0 && state.ended) {
+ endReadable(this);
+ return null;
+ }
- if (!state.ended &&
- state.length < state.lowWaterMark &&
- !state.reading) {
+ // All the actual chunk generation logic needs to be
+ // *below* the call to _read. The reason is that in certain
+ // synthetic stream cases, such as passthrough streams, _read
+ // may be a completely synchronous operation which may change
+ // the state of the read buffer, providing enough data when
+ // before there was *not* enough.
+ //
+ // So, the steps are:
+ // 1. Figure out what the state of things will be after we do
+ // a read from the buffer.
+ //
+ // 2. If that resulting state will trigger a _read, then call _read.
+ // Note that this may be asynchronous, or synchronous. Yes, it is
+ // deeply ugly to write APIs this way, but that still doesn't mean
+ // that the Readable class should behave improperly, as streams are
+ // designed to be sync/async agnostic.
+ // Take note if the _read call is sync or async (ie, if the read call
+ // has returned yet), so that we know whether or not it's safe to emit
+ // 'readable' etc.
+ //
+ // 3. Actually pull the requested chunks out of the buffer and return.
+
+ // if we need a readable event, then we need to do some reading.
+ var doRead = state.needReadable;
+ // if we currently have less than the lowWaterMark, then also read some
+ if (state.length - n <= state.lowWaterMark)
+ doRead = true;
+ // however, if we've ended, then there's no point, and if we're already
+ // reading, then it's unnecessary.
+ if (state.ended || state.reading)
+ doRead = false;
+
+ if (doRead) {
+ var sync = true;
state.reading = true;
// call internal read method
this._read(state.bufferSize, function onread(er, chunk) {
@@ -124,41 +151,66 @@ Readable.prototype.read = function(n) {
return this.emit('error', er);
if (!chunk || !chunk.length) {
+ // eof
state.ended = true;
// if we've ended and we have some data left, then emit
// 'readable' now to make sure it gets picked up.
- if (state.length > 0)
- this.emit('readable');
- else
- endReadable(this);
+ if (!sync) {
+ if (state.length > 0)
+ this.emit('readable');
+ else
+ endReadable(this);
+ }
return;
}
if (state.decoder)
chunk = state.decoder.write(chunk);
- state.length += chunk.length;
- state.buffer.push(chunk);
+ // update the buffer info.
+ if (chunk) {
+ state.length += chunk.length;
+ state.buffer.push(chunk);
+ }
// if we haven't gotten enough to pass the lowWaterMark,
// and we haven't ended, then don't bother telling the user
// that it's time to read more data. Otherwise, that'll
// probably kick off another stream.read(), which can trigger
// another _read(n,cb) before this one returns!
- if (state.length < state.lowWaterMark) {
+ if (state.length <= state.lowWaterMark) {
state.reading = true;
this._read(state.bufferSize, onread.bind(this));
return;
}
- // now we have something to call this.read() to get.
- if (state.needReadable) {
+ if (state.needReadable && !sync) {
state.needReadable = false;
this.emit('readable');
}
}.bind(this));
+ sync = false;
+ }
+
+ // If _read called its callback synchronously, then `reading`
+ // will be false, and we need to re-evaluate how much data we
+ // can return to the user.
+ if (doRead && !state.reading)
+ n = howMuchToRead(nOrig, state);
+
+ var ret;
+ if (n > 0)
+ ret = fromList(n, state.buffer, state.length, !!state.decoder);
+ else
+ ret = null;
+
+ if (ret === null || ret.length === 0) {
+ state.needReadable = true;
+ n = 0;
}
+ state.length -= n;
+
return ret;
};
@@ -398,7 +450,7 @@ Readable.prototype.wrap = function(stream) {
var ret = fromList(n, state.buffer, state.length, !!state.decoder);
state.length -= n;
- if (state.length < state.lowWaterMark && paused) {
+ if (state.length <= state.lowWaterMark && paused) {
stream.resume();
paused = false;
}
View
@@ -77,6 +77,9 @@ function TransformState() {
}
function Transform(options) {
+ if (!(this instanceof Transform))
+ return new Transform(options);
+
Duplex.call(this, options);
// bind output so that it can be passed around as a regular function.
@@ -110,13 +113,28 @@ Transform.prototype._transform = function(chunk, output, cb) {
Transform.prototype._write = function(chunk, cb) {
var ts = this._transformState;
+ var rs = this._readableState;
ts.buffer.push([chunk, cb]);
+ // no need for auto-pull if already in the midst of one.
+ if (ts.transforming)
+ return;
+
// now we have something to transform, if we were waiting for it.
- if (ts.pendingReadCb && !ts.transforming) {
+ // kick off a _read to pull it in.
+ if (ts.pendingReadCb) {
var readcb = ts.pendingReadCb;
ts.pendingReadCb = null;
- this._read(-1, readcb);
+ this._read(0, readcb);
+ }
+
+ // if we weren't waiting for it, but nothing is queued up, then
+ // still kick off a transform, just so it's there when the user asks.
+ var doRead = rs.needReadable || rs.length <= rs.lowWaterMark;
+ if (doRead && !rs.reading) {
+ var ret = this.read(0);
+ if (ret !== null)
+ return cb(new Error('invalid stream transform state'));
}
};
Oops, something went wrong.

0 comments on commit e8efa38

Please sign in to comment.