Skip to content

Commit

Permalink
net.js integration - does not work on last iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
ry committed Nov 30, 2010
1 parent 5838902 commit 285af55
Showing 1 changed file with 56 additions and 211 deletions.
267 changes: 56 additions & 211 deletions lib/net.js
Expand Up @@ -54,6 +54,22 @@ var ioWatchers = new FreeList("iowatcher", 100, function () {
return new IOWatcher();
});


IOWatcher.prototype.ondrain = function () {
assert(this.socket);
if (this.writable || this.readable) {
require('timers').active(this.socket);
this.socket.emit('drain');
}
};


IOWatcher.prototype.onerror = function (errno) {
assert(this.socket);
this.socket.destroy(errnoException(errno, 'write'));
};


exports.isIP = binding.isIP;

exports.isIPv4 = function (input) {
Expand Down Expand Up @@ -92,16 +108,6 @@ function setImplmentationMethods (self) {
};

if (self.type == 'unix') {
self._writeImpl = function (buf, off, len, fd, flags) {
// Detect and disallow zero-byte writes wth an attached file
// descriptor. This is an implementation limitation of sendmsg(2).
if (fd && noData(buf, off, len)) {
throw new Error('File descriptors can only be written with data');
}

return sendMsg(self.fd, buf, off, len, fd, flags);
};

self._readImpl = function (buf, off, len) {
var bytesRead = recvMsg(self.fd, buf, off, len);

Expand All @@ -123,25 +129,10 @@ function setImplmentationMethods (self) {
return bytesRead;
};
} else {
self._writeImpl = function (buf, off, len, fd, flags) {
// XXX: TLS support requires that 0-byte writes get processed
// by the kernel for some reason. Otherwise, we'd just
// fast-path return here.

// Drop 'fd' and 'flags' as these are not supported by the write(2)
// system call
return write(self.fd, buf, off, len);
};

self._readImpl = function (buf, off, len) {
return read(self.fd, buf, off, len);
};
}

self._shutdownImpl = function () {
shutdown(self.fd, 'write');
};

};


Expand Down Expand Up @@ -169,11 +160,6 @@ function initStream (self) {
self._readWatcher.callback = onReadable;
self.readable = false;

// Queue of buffers and string that need to be written to socket.
self._writeQueue = [];
self._writeQueueEncoding = [];
self._writeQueueFD = [];

self._writeWatcher = ioWatchers.alloc();
self._writeWatcher.socket = self;
self._writeWatcher.callback = onWritable;
Expand Down Expand Up @@ -256,182 +242,46 @@ Object.defineProperty(Stream.prototype, 'readyState', {
});


// Returns true if all the data was flushed to socket. Returns false if
// something was queued. If data was queued, then the "drain" event will
// signal when it has been finally flushed to socket.
Stream.prototype.write = function (data, encoding, fd) {
if (this._connecting || (this._writeQueue && this._writeQueue.length)) {
if (!this._writeQueue) {
this._writeQueue = [];
this._writeQueueEncoding = [];
this._writeQueueFD = [];
}

// Slow. There is already a write queue, so let's append to it.
if (this._writeQueueLast() === END_OF_FILE) {
throw new Error('Stream.end() called already; cannot write.');
}
Stream.prototype._appendBucket = function (data, encoding, fd) {
var newBucket = { data: data };
if (encoding) newBucket.encoding = encoding;
if (fd) newBucket.fd = fd;

if (typeof data == 'string' &&
this._writeQueue.length &&
typeof this._writeQueue[this._writeQueue.length-1] === 'string' &&
this._writeQueueEncoding[this._writeQueueEncoding.length-1] === encoding) {
// optimization - concat onto last
this._writeQueue[this._writeQueue.length-1] += data;
} else {
this._writeQueue.push(data);
this._writeQueueEncoding.push(encoding);
}
var queueSize = data.length;

if (fd != undefined) {
this._writeQueueFD.push(fd);
if (this._writeWatcher.buckets) {
// FIXME O(n) append - use double linked list
var b = this._writeWatcher.buckets;
while (b.next) {
queueSize += b.data.length;
b = b.next;
}

return false;
} else {
// Fast.
// The most common case. There is no write queue. Just push the data
// directly to the socket.
return this._writeOut(data, encoding, fd);
}
};

// Directly writes the data to socket.
//
// Steps:
// 1. If it's a string, write it to the `pool`. (If not space remains
// on the pool make a new one.)
// 2. Write data to socket. Return true if flushed.
// 3. Slice out remaining
// 4. Unshift remaining onto _writeQueue. Return false.
Stream.prototype._writeOut = function (data, encoding, fd) {
if (!this.writable) {
throw new Error('Stream is not writable');
}

var buffer, off, len;
var bytesWritten, charsWritten;
var queuedData = false;

if (typeof data != 'string') {
// 'data' is a buffer, ignore 'encoding'
buffer = data;
off = 0;
len = data.length;

b.next = newBucket;
} else {
assert(typeof data == 'string');

if (!pool || pool.length - pool.used < kMinPoolSpace) {
pool = null;
allocNewPool();
}

if (!encoding || encoding == 'utf8' || encoding == 'utf-8') {
// default to utf8
bytesWritten = pool.write(data, 'utf8', pool.used);
charsWritten = Buffer._charsWritten;
} else {
bytesWritten = pool.write(data, encoding, pool.used);
charsWritten = bytesWritten;
}

if (encoding && data.length > 0) {
assert(bytesWritten > 0);
}

buffer = pool;
len = bytesWritten;
off = pool.used;

pool.used += bytesWritten;

debug('wrote ' + bytesWritten + ' bytes to pool');

if (charsWritten != data.length) {
//debug("couldn't fit " + (data.length - charsWritten) + " bytes into the pool\n");
// Unshift whatever didn't fit onto the buffer
this._writeQueue.unshift(data.slice(charsWritten));
this._writeQueueEncoding.unshift(encoding);
this._writeWatcher.start();
queuedData = true;
}
this._writeWatcher.buckets = newBucket;
}

try {
bytesWritten = this._writeImpl(buffer, off, len, fd, 0);
} catch (e) {
this.destroy(e);
return false;
}

debug('wrote ' + bytesWritten + ' to socket. [fd, off, len] = ' + JSON.stringify([this.fd, off, len]) + "\n");

require('timers').active(this);
return queueSize;
};

if (bytesWritten == len) {
// awesome. sent to buffer.
if (buffer === pool) {
// If we're just writing from the pool then we can make a little
// optimization and save the space.
buffer.used -= len;
}

if (queuedData) {
return false;
} else {
return true;
}
Stream.prototype.write = function (data, encoding, fd) {
if (this._eof) {
throw new Error('Stream.end() called already; cannot write.');
}

// Didn't write the entire thing to buffer.
// Need to wait for the socket to become available before trying again.
this._writeWatcher.start();

// Slice out the data left.
var leftOver = buffer.slice(off + bytesWritten, off + len);
leftOver.used = leftOver.length; // used the whole thing...

// util.error('data.used = ' + data.used);
//if (!this._writeQueue) initWriteStream(this);

// data should be the next thing to write.
this._writeQueue.unshift(leftOver);
this._writeQueueEncoding.unshift(null);

// If didn't successfully write any bytes, enqueue our fd and try again
if (!bytesWritten) {
this._writeQueueFD.unshift(fd);
if (!this.writable) {
throw new Error('Stream is not writable');
}

return false;
};


// Flushes the write buffer out.
// Returns true if the entire buffer was flushed.
Stream.prototype.flush = function () {
while (this._writeQueue && this._writeQueue.length) {
var data = this._writeQueue.shift();
var encoding = this._writeQueueEncoding.shift();
var fd = this._writeQueueFD.shift();

if (data === END_OF_FILE) {
this._shutdown();
return true;
}
var queueSize = this._appendBucket(data, encoding, fd);

var flushed = this._writeOut(data,encoding,fd);
if (!flushed) return false;
}
if (this._writeWatcher) this._writeWatcher.stop();
return true;
};
if (this._connecting) return false;

this._onWritable(); // Insert writeWatcher into the dumpQueue
require('timers').active(this);

Stream.prototype._writeQueueLast = function () {
return this._writeQueue.length > 0 ? this._writeQueue[this._writeQueue.length-1]
: null;
return queueSize < (64*1024);
};


Expand Down Expand Up @@ -483,7 +333,7 @@ Stream.prototype._onConnect = function () {
}


if (this._writeQueue && this._writeQueue.length) {
if (this._writeWatcher.buckets) {
// Flush this in case any writes are queued up while connecting.
this._onWritable();
}
Expand All @@ -495,11 +345,10 @@ Stream.prototype._onConnect = function () {


Stream.prototype._onWritable = function () {
// Stream becomes writable on connect() but don't flush if there's
// nothing actually to write
if (this.flush()) {
if (this._events && this._events['drain']) this.emit("drain");
if (this.ondrain) this.ondrain(); // Optimization
// Stick it into the dumpQueue
if (!this._writeWatcher.next) {
this._writeWatcher.next = IOWatcher.dumpQueue.next;
IOWatcher.dumpQueue.next = this._writeWatcher;
}
};

Expand Down Expand Up @@ -650,15 +499,12 @@ Stream.prototype.destroy = function (exception) {
// pool is shared between sockets, so don't need to free it here.
var self = this;

// TODO would like to set _writeQueue to null to avoid extra object alloc,
// but lots of code assumes this._writeQueue is always an array.
this._writeQueue = [];

this.readable = this.writable = false;

if (this._writeWatcher) {
this._writeWatcher.stop();
this._writeWatcher.socket = null;
this._writeWatcher.buckets = null;
ioWatchers.free(this._writeWatcher);
this._writeWatcher = null;
}
Expand Down Expand Up @@ -698,7 +544,7 @@ Stream.prototype._shutdown = function () {
if (this.readable) {

try {
this._shutdownImpl();
shutdown(this.fd, 'write');
} catch (e) {
this.destroy(e);
}
Expand All @@ -711,15 +557,14 @@ Stream.prototype._shutdown = function () {


Stream.prototype.end = function (data, encoding) {
if (this.writable) {
if (this._writeQueueLast() !== END_OF_FILE) {
if (data) this.write(data, encoding);
this._writeQueue.push(END_OF_FILE);
if (!this._connecting) {
this.flush();
}
}
}
if (!this.writable) return; // TODO this should throw error
if (this._eof) return; // TODO this should also throw error

if (data) this._appendBucket(data, encoding);
this._eof = true;

// If this isn't in the dumpQueue then we shutdown now.
if (!this._writeWatcher.next) this._shutdown();
};


Expand Down

0 comments on commit 285af55

Please sign in to comment.