Permalink
Browse files

Add callback to socket.write(), fix test-sendfds

  • Loading branch information...
ry committed Nov 13, 2010
1 parent fd634fb commit f66e95ba2978222d9ef92d0e71a91a83c928a844
Showing with 63 additions and 29 deletions.
  1. +35 −11 lib/net.js
  2. +13 −0 src/node_io_watcher.cc
  3. +7 −9 test/fixtures/recvfd.js
  4. +8 −9 test/simple/test-sendfd.js
View
@@ -56,17 +56,18 @@ var ioWatchers = new FreeList("iowatcher", 100, function () {
IOWatcher.prototype.ondrain = function () {
- assert(this.socket);
- var socket = this.socket;
+ if (this.socket) {
+ var socket = this.socket;
- if (socket.writable || socket.readable) {
- require('timers').active(socket);
- }
+ if (socket.writable || socket.readable) {
+ require('timers').active(socket);
+ }
- socket.emit('drain');
- if (socket.ondrain) socket.ondrain();
+ socket.emit('drain');
+ if (socket.ondrain) socket.ondrain();
- if (socket._eof) socket._shutdown();
+ if (socket._eof) socket._shutdown();
+ }
};
@@ -252,12 +253,13 @@ Object.defineProperty(Stream.prototype, 'readyState', {
});
-Stream.prototype._appendBucket = function (data, encoding, fd) {
+Stream.prototype._appendBucket = function (data, encoding, fd, callback) {
if (data.length != 0) {
// TODO reject empty data.
var newBucket = { data: data };
if (encoding) newBucket.encoding = encoding;
if (fd) newBucket.fd = fd;
+ if (callback) newBucket.callback = callback;
// TODO properly calculate queueSize
@@ -280,7 +282,7 @@ Stream.prototype._appendBucket = function (data, encoding, fd) {
};
-Stream.prototype.write = function (data, encoding, fd) {
+Stream.prototype.write = function (data /* encoding, fd, callback */) {
if (this._eof) {
throw new Error('Stream.end() called already; cannot write.');
}
@@ -289,7 +291,29 @@ Stream.prototype.write = function (data, encoding, fd) {
throw new Error('Stream is not writable');
}
- var queueSize = this._appendBucket(data, encoding, fd);
+ // parse the arguments. ugly.
+
+ var encoding, fd, callback;
+
+ if (arguments[1] === undefined || typeof arguments[1] == 'string') {
+ encoding = arguments[1];
+ if (typeof arguments[2] == 'number') {
+ fd = arguments[2];
+ callback = arguments[3];
+ } else {
+ callback = arguments[2];
+ }
+ } else if (typeof arguments[1] == 'number') {
+ fd = arguments[1];
+ callback = arguments[2];
+ } else if (typeof arguments[1] == 'function') {
+ callback = arguments[1];
+ } else {
+ throw new Error("Bad type for second argument");
+ }
+
+
+ var queueSize = this._appendBucket(data, encoding, fd, callback);
if (this._connecting) return false;
View
@@ -37,6 +37,7 @@ static Persistent<String> is_unix_socket_sym;
static Persistent<String> first_bucket_sym;
static Persistent<String> last_bucket_sym;
static Persistent<String> queue_size_sym;
+static Persistent<String> callback_sym;
void IOWatcher::Initialize(Handle<Object> target) {
@@ -72,6 +73,7 @@ void IOWatcher::Initialize(Handle<Object> target) {
is_unix_socket_sym = NODE_PSYMBOL("isUnixSocket");
data_sym = NODE_PSYMBOL("data");
encoding_sym = NODE_PSYMBOL("encoding");
+ callback_sym = NODE_PSYMBOL("callback");
ev_prepare_init(&dumper, IOWatcher::Dump);
@@ -497,6 +499,17 @@ void IOWatcher::Dump() {
written -= bucket_len - offset;
+ Local<Value> bucket_callback_v = bucket->Get(callback_sym);
+ if (bucket_callback_v->IsFunction()) {
+ Local<Function> bucket_callback =
+ Local<Function>::Cast(bucket_callback_v);
+ TryCatch try_catch;
+ bucket_callback->Call(io->handle_, 0, NULL);
+ if (try_catch.HasCaught()) {
+ FatalException(try_catch);
+ }
+ }
+
// Offset is now zero
watcher->Set(offset_sym, Integer::NewFromUnsigned(0));
}
View
@@ -22,35 +22,33 @@ function processData(s) {
// version of our modified object back. Clean up when we're done.
var pipeStream = new net.Stream(fd);
- var drainFunc = function() {
+ pipeStream.resume();
+
+ pipeStream.write(JSON.stringify(d) + '\n', function () {
pipeStream.destroy();
if (++numSentMessages == 2) {
s.destroy();
}
- };
-
- pipeStream.addListener('drain', drainFunc);
- pipeStream.resume();
-
- if (pipeStream.write(JSON.stringify(d) + '\n')) {
- drainFunc();
- }
+ });
};
// Create a UNIX socket to the path defined by argv[2] and read a file
// descriptor and misc data from it.
var s = new net.Stream();
+
s.addListener('fd', function(fd) {
receivedFDs.unshift(fd);
processData(s);
});
+
s.addListener('data', function(data) {
data.toString('utf8').trim().split('\n').forEach(function(d) {
receivedData.unshift(JSON.parse(d));
});
processData(s);
});
+
s.connect(process.argv[2]);
// vim:ts=2 sw=2 et
View
@@ -53,7 +53,7 @@ var logChild = function(d) {
d.split('\n').forEach(function(l) {
if (l.length > 0) {
- common.debug('CHILD: ' + l);
+ console.error('CHILD: ' + l);
}
});
};
@@ -96,19 +96,18 @@ var srv = net.createServer(function(s) {
buf.write(JSON.stringify(DATA) + '\n', 'utf8');
s.write(str, 'utf8', pipeFDs[1]);
- if (s.write(buf, undefined, pipeFDs[1])) {
+
+ s.write(buf, pipeFDs[1], function () {
+ console.error("close pipeFDs[1]");
netBinding.close(pipeFDs[1]);
- } else {
- s.addListener('drain', function() {
- netBinding.close(pipeFDs[1]);
- });
- }
+ });
});
srv.listen(SOCK_PATH);
// Spawn a child running test/fixtures/recvfd.js
-var cp = child_process.spawn(process.argv[0],
- [path.join(common.fixturesDir, 'recvfd.js'), SOCK_PATH]);
+var cp = child_process.spawn(process.execPath,
+ [path.join(common.fixturesDir, 'recvfd.js'),
+ SOCK_PATH]);
cp.stdout.addListener('data', logChild);
cp.stderr.addListener('data', logChild);

0 comments on commit f66e95b

Please sign in to comment.