@@ -43,20 +43,6 @@ function Duplex(options) {
Readable.call(this, options);
Writable.call(this, options);

// Duplex streams often represent one side of a "conversation".
// So, you write "hello", and wait for "hello"
// then you write "what's your name?" and wait for $name
// then you write "Hello, $name, how are you?"
// etc.
//
// The low/high watermark defaults make this break, because
// the reply may not be enough to trigger a 'readable' event,
// so the two parties end up waiting for one another forever.
if (!options || !options.hasOwnProperty('lowWaterMark')) {
this._readableState.lowWaterMark = 0;
this._writableState.lowWaterMark = 0;
}

this.allowHalfOpen = true;
if (options && options.allowHalfOpen === false)
this.allowHalfOpen = false;
@@ -32,13 +32,34 @@ util.inherits(Readable, Stream);
function ReadableState(options, stream) {
options = options || {};

this.bufferSize = options.bufferSize || 16 * 1024;
assert(typeof this.bufferSize === 'number');
// cast to an int
this.bufferSize = ~~this.bufferSize;

// the argument passed to this._read(n,cb)
this.bufferSize = options.hasOwnProperty('bufferSize') ?
options.bufferSize : 16 * 1024;

// the point at which it stops calling _read() to fill the buffer
this.highWaterMark = options.hasOwnProperty('highWaterMark') ?
options.highWaterMark : 16 * 1024;

// the minimum number of bytes to buffer before emitting 'readable'
// default to pushing everything out as fast as possible.
this.lowWaterMark = options.hasOwnProperty('lowWaterMark') ?
options.lowWaterMark : 1024;
options.lowWaterMark : 0;

// cast to ints.
assert(typeof this.bufferSize === 'number');
assert(typeof this.lowWaterMark === 'number');
assert(typeof this.highWaterMark === 'number');
this.bufferSize = ~~this.bufferSize;
this.lowWaterMark = ~~this.lowWaterMark;
this.highWaterMark = ~~this.highWaterMark;
assert(this.bufferSize >= 0);
assert(this.lowWaterMark >= 0);
assert(this.highWaterMark >= this.lowWaterMark,
this.highWaterMark + '>=' + this.lowWaterMark);

this.buffer = [];
this.length = 0;
this.pipes = [];
@@ -52,6 +73,7 @@ function ReadableState(options, stream) {
// whenever we return null, then we set a flag to say
// that we're awaiting a 'readable' event emission.
this.needReadable = false;
this.emittedReadable = false;

this.decoder = null;
if (options.encoding) {
@@ -104,6 +126,9 @@ Readable.prototype.read = function(n) {
var state = this._readableState;
var nOrig = n;

if (typeof n !== 'number' || n > 0)
state.emittedReadable = false;

n = howMuchToRead(n, state);

// if we've ended, and we're now clear, then finish it up.
@@ -136,20 +161,16 @@ Readable.prototype.read = function(n) {

// if we need a readable event, then we need to do some reading.
var doRead = state.needReadable;
// if we currently have less than the lowWaterMark, then also read some
if (state.length - n <= state.lowWaterMark)

// if we currently have less than the highWaterMark, then also read some
if (state.length - n <= state.highWaterMark)
doRead = true;

// if we currently have *nothing*, then always try to get *something*
// no matter what the high water mark says.
if (state.length === 0)
doRead = true;

// if the lwm is 0, then just keep reading whatever we can whenever
// XXX: Need a 'highWaterMark' to specify the point at which we stop
// reading. The lowWaterMark should not be the guideline for this.
if (state.lowWaterMark === 0)
doRead = true;

// however, if we've ended, then there's no point, and if we're already
// reading, then it's unnecessary.
if (state.ended || state.reading)
@@ -207,9 +228,13 @@ function onread(er, chunk) {
// if we've ended and we have some data left, then emit
// 'readable' now to make sure it gets picked up.
if (!sync) {
if (state.length > 0)
this.emit('readable');
else
if (state.length > 0) {
state.needReadable = false;
if (!state.emittedReadable) {
state.emittedReadable = true;
this.emit('readable');
}
} else
endReadable(this);
}
return;
@@ -237,13 +262,10 @@ function onread(er, chunk) {

if (state.needReadable && !sync) {
state.needReadable = false;
this.emit('readable');
// just in case the user doesn't call read() on each readable
// event, start the ball rolling if we need to do another _read()
// call at this point to fill the buffer back up, or get the EOF
// event. This is particularly relevant for TCP streams where
// we get an immediate end() from the other side.
this.read(0);
if (!state.emittedReadable) {
state.emittedReadable = true;
this.emit('readable');
}
}
}

@@ -82,14 +82,6 @@ function Transform(options) {

Duplex.call(this, options);

// duplex defaults to making the lowWaterMark 0,
// but we should use the standard defaults for transform
// streams, since it's not a true duplex conversation.
if (!options || !options.hasOwnProperty('lowWaterMark')) {
this._readableState.lowWaterMark = 1024;
this._writableState.lowWaterMark = 1024;
}

// bind output so that it can be passed around as a regular function.
this._output = this._output.bind(this);

@@ -138,7 +130,7 @@ Transform.prototype._write = function(chunk, cb) {

// if we weren't waiting for it, but nothing is queued up, then
// still kick off a transform, just so it's there when the user asks.
var doRead = rs.needReadable || rs.length <= rs.lowWaterMark;
var doRead = rs.needReadable || rs.length <= rs.highWaterMark;
if (doRead && !rs.reading) {
var ret = this.read(0);
if (ret !== null)
@@ -27,17 +27,32 @@ module.exports = Writable;
Writable.WritableState = WritableState;

var util = require('util');
var assert = require('assert');
var Stream = require('stream');

util.inherits(Writable, Stream);

function WritableState(options) {
function WritableState(options, stream) {
options = options || {};
this.highWaterMark = options.highWaterMark || 16 * 1024;

// the point at which write() starts returning false
this.highWaterMark = options.hasOwnProperty('highWaterMark') ?
options.highWaterMark : 16 * 1024;

// the point that it has to get to before we call _write(chunk,cb)
// default to pushing everything out as fast as possible.
this.lowWaterMark = options.hasOwnProperty('lowWaterMark') ?
options.lowWaterMark : 1024;
options.lowWaterMark : 0;

// cast to ints.
assert(typeof this.lowWaterMark === 'number');
assert(typeof this.highWaterMark === 'number');
this.lowWaterMark = ~~this.lowWaterMark;
this.highWaterMark = ~~this.highWaterMark;
assert(this.lowWaterMark >= 0);
assert(this.highWaterMark >= this.lowWaterMark,
this.highWaterMark + '>=' + this.lowWaterMark);

this.needDrain = false;
// at the start of calling end()
this.ending = false;
@@ -59,7 +74,22 @@ function WritableState(options) {
// socket or file.
this.length = 0;

// a flag to see when we're in the middle of a write.
this.writing = false;

// a flag to be able to tell if the onwrite cb is called immediately,
// or on a later tick.
this.sync = false;

// the callback that's passed to _write(chunk,cb)
this.onwrite = onwrite.bind(stream);

// the callback that the user supplies to write(chunk,encoding,cb)
this.writecb = null;

// the amount that is being written when _write is called.
this.writelen = 0;

this.buffer = [];
}

@@ -69,16 +99,15 @@ function Writable(options) {
if (!(this instanceof Writable) && !(this instanceof Stream.Duplex))
return new Writable(options);

this._writableState = new WritableState(options);
this._writableState = new WritableState(options, this);

// legacy.
this.writable = true;

Stream.call(this);
}

// Override this method for sync streams
// override the _write(chunk, cb) method for async streams
// Override this method or _write(chunk, cb)
Writable.prototype.write = function(chunk, encoding, cb) {
var state = this._writableState;

@@ -117,72 +146,84 @@ Writable.prototype.write = function(chunk, encoding, cb) {
}

state.writing = true;
var sync = true;
this._write(chunk, writecb.bind(this));
sync = false;
state.sync = true;
state.writelen = l;
state.writecb = cb;
this._write(chunk, state.onwrite);
state.sync = false;

return ret;
};

function writecb(er) {
state.writing = false;
if (er) {
if (cb) {
if (sync)
process.nextTick(cb.bind(null, er));
else
cb(er);
} else
this.emit('error', er);
return;
}
state.length -= l;
function onwrite(er) {
var state = this._writableState;
var sync = state.sync;
var cb = state.writecb;
var l = state.writelen;

state.writing = false;
state.writelen = null;
state.writecb = null;

if (er) {
if (cb) {
// don't call the cb until the next tick if we're in sync mode.
// also, defer if we're about to write some more right now.
if (sync || state.buffer.length)
process.nextTick(cb);
else
cb();
}

if (state.length === 0 && (state.ended || state.ending)) {
// emit 'finish' at the very end.
state.finishing = true;
this.emit('finish');
state.finished = true;
return;
}

// if there's something in the buffer waiting, then do that, too.
if (state.buffer.length) {
var chunkCb = state.buffer.shift();
chunk = chunkCb[0];
cb = chunkCb[1];

if (false === state.decodeStrings)
l = chunk[0].length;
if (sync)
process.nextTick(cb.bind(null, er));
else
l = chunk.length;

state.writing = true;
this._write(chunk, writecb.bind(this));
}

if (state.length <= state.lowWaterMark && state.needDrain) {
// Must force callback to be called on nextTick, so that we don't
// emit 'drain' before the write() consumer gets the 'false' return
// value, and has a chance to attach a 'drain' listener.
process.nextTick(function() {
if (!state.needDrain)
return;
state.needDrain = false;
this.emit('drain');
}.bind(this));
}
cb(er);
} else
this.emit('error', er);
return;
}
state.length -= l;

if (cb) {
// don't call the cb until the next tick if we're in sync mode.
// also, defer if we're about to write some more right now.
if (sync || state.buffer.length)
process.nextTick(cb);
else
cb();
}

};
if (state.length === 0 && (state.ended || state.ending)) {
// emit 'finish' at the very end.
state.finishing = true;
this.emit('finish');
state.finished = true;
return;
}

// if there's something in the buffer waiting, then do that, too.
if (state.buffer.length) {
var chunkCb = state.buffer.shift();
var chunk = chunkCb[0];
cb = chunkCb[1];

if (false === state.decodeStrings)
l = chunk[0].length;
else
l = chunk.length;

state.writelen = l;
state.writecb = cb;
state.writechunk = chunk;
state.writing = true;
this._write(chunk, state.onwrite);
}

if (state.length <= state.lowWaterMark && state.needDrain) {
// Must force callback to be called on nextTick, so that we don't
// emit 'drain' before the write() consumer gets the 'false' return
// value, and has a chance to attach a 'drain' listener.
process.nextTick(function() {
if (!state.needDrain)
return;
state.needDrain = false;
this.emit('drain');
}.bind(this));
}
}

Writable.prototype._write = function(chunk, cb) {
process.nextTick(cb.bind(this, new Error('not implemented')));
@@ -92,7 +92,7 @@ tcp.listen(common.PORT, function() {
// We're still connecting at this point so the datagram is first pushed onto
// the connect queue. Make sure that it's not added to `bytesWritten` again
// when the actual write happens.
var r = socket.write(a, function() {
var r = socket.write(a, function(er) {
console.error('write cb');
dataWritten = true;
assert.ok(connectHappened);