diff --git a/lib/net.js b/lib/net.js index ce0961a92eb..c8eb6701c41 100644 --- a/lib/net.js +++ b/lib/net.js @@ -42,6 +42,8 @@ var setKeepAlive= binding.setKeepAlive; var socketError = binding.socketError; var getsockname = binding.getsockname; var errnoException = binding.errnoException; +var sendMsg = binding.sendMsg; +var recvMsg = binding.recvMsg; var EINPROGRESS = binding.EINPROGRESS; var ENOENT = binding.ENOENT; var EMFILE = binding.EMFILE; @@ -271,6 +273,182 @@ function _doFlush () { } } +function setImplmentationMethods (self) { + function noData(buf, off, len) { + return !buf || + (off != undefined && off >= buf.length) || + (len == 0); + }; + + if (self.type == 'unix') { + self._writeImpl = function(buf, off, len, fd, flags) { + // Detect and disallow zero-byte writes wth an attached file + // descriptor. This is an implementation limitation of sendmsg(2). + if (fd && noData(buf, off, len)) { + throw new Error('File descriptors can only be written with data'); + } + + return sendMsg(self.fd, buf, off, len, fd, flags); + }; + + self._readImpl = function(buf, off, len, calledByIOWatcher) { + var bytesRead = recvMsg(self.fd, buf, off, len); + + // Do not emit this in the same stack, otherwise we risk corrupting + // our buffer pool which is full of read data, but has not had + // had its pointers updated just yet. + if (recvMsg.fd !== null) { + process.nextTick(function() { + self.emit('fd', recvMsg.fd); + }); + } + + return bytesRead; + }; + } else { + self._writeImpl = function(buf, off, len, fd, flags) { + // XXX: TLS support requires that 0-byte writes get processed + // by the kernel for some reason. Otherwise, we'd just + // fast-path return here. + + return write(self.fd, buf, off, len, fd, flags); + }; + + self._readImpl = function(buf, off, len, calledByIOWatcher) { + return read(self.fd, buf, off, len); + }; + } + + self._shutdownImpl = function() { + shutdown(self.fd, 'write') + }; + + if (self.secure) { + var oldWrite = self._writeImpl; + self._writeImpl = function(buf, off, len, fd, flags) { + assert(buf); + assert(self.secure); + + var bytesWritten = self.secureStream.writeInject(buf, off, len); + + if (!securePool) { + allocNewSecurePool(); + } + + var secureLen = self.secureStream.writeExtract( + securePool, 0, securePool.length + ); + + if (secureLen == -1) { + // Check our read again for secure handshake + self._readWatcher.callback(); + } else { + oldWrite(securePool, 0, secureLen, fd, flags); + } + + if (!self.secureEstablished && self.secureStream.isInitFinished()) { + self.secureEstablished = true; + + if (self._events && self._events['secure']) { + self.emit('secure'); + } + } + + return bytesWritten; + }; + + var oldRead = self._readImpl; + self._readImpl = function(buf, off, len, calledByIOWatcher) { + assert(self.secure); + + var bytesRead = 0; + var secureBytesRead = null; + + if (!securePool) { + allocNewSecurePool(); + } + + if (calledByIOWatcher) { + secureBytesRead = oldRead(securePool, 0, securePool.length); + self.secureStream.readInject(securePool, 0, secureBytesRead); + } + + var chunkBytes; + do { + chunkBytes = self.secureStream.readExtract( + pool, + pool.used + bytesRead, + pool.length - pool.used - bytesRead + ); + + bytesRead += chunkBytes; + } while ((chunkBytes > 0) && (pool.used + bytesRead < pool.length)); + + if (bytesRead == 0 && !calledByIOWatcher) { + return -1; + } + + if (self.secureStream.readPending()) { + process.nextTick(function () { + if(self._readWatcher) + self._readWatcher.callback(); + }); + } + + if (!self.secureEstablished) { + if (self.secureStream.isInitFinished()) { + self.secureEstablished = true; + if (self._events && self._events['secure']) { + self.emit('secure'); + } + } + } + + if (calledByIOWatcher && secureBytesRead === null && !self.server) { + // Client needs to write as part of handshake + self._writeWatcher.start(); + return -1; + } + + if (bytesRead == 0 && secureBytesRead > 0) { + // Deal with SSL handshake + if (self.server) { + self._checkForSecureHandshake(); + } else { + if (self.secureEstablised) { + self.flush(); + } else { + self._checkForSecureHandshake(); + } + } + + return -1; + } + + return bytesRead; + }; + + var oldShutdown = self._shutdownImpl; + self._shutdownImpl = function() { + self.secureStream.shutdown(); + + if (!securePool) { + allocNewSecurePool(); + } + + var secureLen = self.secureStream.writeExtract( + securePool, 0, securePool.length + ); + + try { + oldWrite(securePool, 0, secureLen); + } catch (e) { } + + oldShutdown(); + }; + } +}; + function initStream (self) { self._readWatcher = ioWatchers.alloc(); self._readWatcher.callback = function () { @@ -285,68 +463,19 @@ function initStream (self) { //debug('pool.used ' + pool.used); var bytesRead; - var secureBytesRead; try { - if (self.secure) { - if (!securePool) allocNewSecurePool(); - var calledByNextTick = (arguments.length == 0); // IOWatcher always passes arguments - if (!calledByNextTick) { - secureBytesRead = read(self.fd, securePool, 0, securePool.length); - self.secureStream.readInject(securePool, 0, secureBytesRead); - } - var chunkBytes; - bytesRead = 0; - do { - chunkBytes = self.secureStream.readExtract(pool, - pool.used + bytesRead, - pool.length - pool.used - bytesRead); - bytesRead += chunkBytes; - } while ((chunkBytes > 0) && (pool.used + bytesRead < pool.length)); - if (bytesRead == 0 && calledByNextTick) - return; - if (self.secureStream.readPending()) { - process.nextTick(function () { - if(self._readWatcher) - self._readWatcher.callback(); - }); - } - if (!self.secureEstablished) { - if (self.secureStream.isInitFinished()) { - self.secureEstablished = true; - if (self._events && self._events['secure']) self.emit('secure'); - } - } - if (secureBytesRead === null && !self.server) { - // Client needs to write as part of handshake - self._writeWatcher.start(); - return; - } - } else { - bytesRead = read(self.fd, - pool, - pool.used, - pool.length - pool.used); - } + bytesRead = self._readImpl(pool, pool.used, pool.length - pool.used, (arguments.length > 0)); } catch (e) { self.destroy(e); return; } - //debug('bytesRead ' + bytesRead + '\n'); + // Note that some _readImpl() implementations return -1 bytes + // read as an indication not to do any processing on the result + // (but not an error). - if (self.secure && bytesRead == 0 && secureBytesRead > 0) { - // Deal with SSL handshake - if (self.server) { - self._checkForSecureHandshake(); - } else { - if (self.secureEstablised) { - self.flush(); - } else { - self._checkForSecureHandshake(); - } - } - } else if (bytesRead === 0) { + if (bytesRead === 0) { self.readable = false; self._readWatcher.stop(); @@ -384,6 +513,7 @@ function initStream (self) { // Queue of buffers and string that need to be written to socket. self._writeQueue = []; self._writeQueueEncoding = []; + self._writeQueueFD = []; self._writeWatcher = ioWatchers.alloc(); self._writeWatcher.socket = self; @@ -391,14 +521,17 @@ function initStream (self) { self.writable = false; } -function Stream (fd) { +function Stream (fd, type) { events.EventEmitter.call(this); this.fd = null; + this.type = null; this.secure = false; if (parseInt(fd) >= 0) { - this.open(fd); + this.open(fd, type); + } else { + setImplmentationMethods(this); } }; sys.inherits(Stream, events.EventEmitter); @@ -423,6 +556,8 @@ Stream.prototype.setSecure = function(credentials) { } this.secureStream = new SecureStream(this.credentials.context, this.server ? 1 : 0, this.credentials.shouldVerify ? 1 : 0); + setImplmentationMethods(this); + if (!this.server) { // If client, trigger handshake this._checkForSecureHandshake(); @@ -439,6 +574,10 @@ Stream.prototype.verifyPeer = function() { Stream.prototype._checkForSecureHandshake = function() { + if (!this.writable) { + return; + } + // Do an empty write to see if we need to write out as part of handshake if (!emptyBuffer) allocEmptyBuffer(); this.write(emptyBuffer); @@ -461,13 +600,15 @@ Stream.prototype.getCipher = function() { } -Stream.prototype.open = function (fd) { +Stream.prototype.open = function (fd, type) { initStream(this); this.fd = fd; - + this.type = type || null; this.readable = true; + setImplmentationMethods(this); + this._writeWatcher.set(this.fd, false, true); this.writable = true; } @@ -504,7 +645,9 @@ Object.defineProperty(Stream.prototype, 'readyState', { // Returns true if all the data was flushed to socket. Returns false if // something was queued. If data was queued, then the "drain" event will // signal when it has been finally flushed to socket. -Stream.prototype.write = function (data, encoding) { +// +// XXX: Caller cannot close the given fd until the stream has drained. +Stream.prototype.write = function (data, encoding, fd) { if (this._writeQueue && this._writeQueue.length) { // Slow. There is already a write queue, so let's append to it. if (this._writeQueueLast() === END_OF_FILE) { @@ -519,26 +662,20 @@ Stream.prototype.write = function (data, encoding) { this._writeQueue.push(data); this._writeQueueEncoding.push(encoding); } + + if (fd != undefined) { + this._writeQueueFD.push(fd); + } + return false; } else { // Fast. // The most common case. There is no write queue. Just push the data // directly to the socket. - return this._writeOut(data, encoding); + return this._writeOut(data, encoding, fd); } }; - -Stream.prototype._shutdownSecure = function () { - this.secureStream.shutdown(); - if (!securePool) allocNewSecurePool(); - var secureLen = this.secureStream.writeExtract(securePool, 0, securePool.length); - try { - var secureBytesWritten = write(this.fd, securePool, 0, secureLen); - } catch (e) { - } -} - // Directly writes the data to socket. // // Steps: @@ -547,12 +684,10 @@ Stream.prototype._shutdownSecure = function () { // 2. Write data to socket. Return true if flushed. // 3. Slice out remaining // 4. Unshift remaining onto _writeQueue. Return false. -Stream.prototype._writeOut = function (data, encoding) { +Stream.prototype._writeOut = function (data, encoding, fd) { if (!this.writable) { - if (this.secure) return false; - else throw new Error('Stream is not writable'); + throw new Error('Stream is not writable'); } - if (!this.secure && data.length == 0) return true; var buffer, off, len; var bytesWritten, charsWritten; @@ -581,7 +716,9 @@ Stream.prototype._writeOut = function (data, encoding) { charsWritten = bytesWritten; } - if (encoding) assert(bytesWritten > 0); + if (encoding && data.length > 0) { + assert(bytesWritten > 0); + } buffer = pool; len = bytesWritten; @@ -602,30 +739,7 @@ Stream.prototype._writeOut = function (data, encoding) { } try { - if (this.secure) { - if (!buffer) return false; - bytesWritten = this.secureStream.writeInject(buffer, off, len); - if (!securePool) allocNewSecurePool(); - var secureLen = this.secureStream.writeExtract(securePool, 0, securePool.length); - if (secureLen==-1) { - // Check our read again for secure handshake - this._readWatcher.callback(); - secureBytesWritten = 0; - } else { - var secureBytesWritten = write(this.fd, securePool, 0, secureLen); - } - if (!this.secureEstablished && this.secureStream.isInitFinished()) { - this.secureEstablished = true; - try { - if (this._events && this._events['secure']) this.emit('secure'); - } catch (e) { - this.destroy(e); - return; - } - } - } else { - bytesWritten = write(this.fd, buffer, off, len); - } + bytesWritten = this._writeImpl(buffer, off, len, fd, 0); } catch (e) { this.destroy(e); return false; @@ -665,6 +779,11 @@ Stream.prototype._writeOut = function (data, encoding) { this._writeQueue.unshift(leftOver); this._writeQueueEncoding.unshift(null); + // If didn't successfully write any bytes, enqueue our fd and try again + if (!bytesWritten) { + this._writeQueueFD.unshift(fd); + } + return false; } @@ -675,13 +794,14 @@ Stream.prototype.flush = function () { while (this._writeQueue && this._writeQueue.length) { var data = this._writeQueue.shift(); var encoding = this._writeQueueEncoding.shift(); + var fd = this._writeQueueFD.shift(); if (data === END_OF_FILE) { this._shutdown(); return true; } - var flushed = this._writeOut(data,encoding); + var flushed = this._writeOut(data,encoding,fd); if (!flushed) return false; } if (this._writeWatcher) this._writeWatcher.stop(); @@ -788,6 +908,8 @@ Stream.prototype.connect = function () { // UNIX self.fd = socket('unix'); self.type = 'unix'; + + setImplmentationMethods(this); doConnect(self, arguments[0]); } }; @@ -890,14 +1012,10 @@ Stream.prototype._shutdown = function () { // readable and writable this.writable = false; - if (this.secure) { - this._shutdownSecure(); - } try { - shutdown(this.fd, 'write') + this._shutdownImpl(); } catch (e) { this.destroy(e); - return; } } else { // writable but not readable @@ -951,7 +1069,7 @@ function Server (listener) { } if (!peerInfo) return; - var s = new Stream(peerInfo.fd); + var s = new Stream(peerInfo.fd, self.type); s.remoteAddress = peerInfo.address; s.remotePort = peerInfo.port; s.type = self.type; @@ -1078,3 +1196,5 @@ Server.prototype.close = function () { self.emit("close"); } }; + +// vim:ts=2 sw=2 diff --git a/src/node_net.cc b/src/node_net.cc index 57d0d97e989..e414c876bb1 100644 --- a/src/node_net.cc +++ b/src/node_net.cc @@ -554,8 +554,6 @@ static Handle RecvMsg(const Arguments& args) { String::New("Length is extends beyond buffer"))); } - int received_fd; - struct iovec iov[1]; iov[0].iov_base = (char*)buffer->data() + off; iov[0].iov_len = len; @@ -583,15 +581,36 @@ static Handle RecvMsg(const Arguments& args) { // that the wrapper can pick up. Since we're single threaded, this is not // a problem - just make sure to copy out that variable before the next // call to recvmsg(). + // + // XXX: Some implementations can send multiple file descriptors in a + // single message. We should be using CMSG_NXTHDR() to walk the + // chain to get at them all. This would require changing the + // API to hand these back up the caller, is a pain. + + int received_fd = -1; + for (struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg); + msg.msg_controllen > 0 && cmsg != NULL; + cmsg = CMSG_NXTHDR(&msg, cmsg)) { + if (cmsg->cmsg_type == SCM_RIGHTS) { + if (received_fd != -1) { + fprintf(stderr, "ignoring extra FD received: %d\n", received_fd); + } - struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg); - if (cmsg && cmsg->cmsg_type == SCM_RIGHTS) { - received_fd = *(int *) CMSG_DATA(cmsg); - recv_msg_template->GetFunction()->Set(fd_symbol, Integer::New(received_fd)); - } else { - recv_msg_template->GetFunction()->Set(fd_symbol, Null()); + received_fd = *(int *) CMSG_DATA(cmsg); + } else { + fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n", + cmsg->cmsg_type + ); + } } + recv_msg_template->GetFunction()->Set( + fd_symbol, + (received_fd != -1) ? + Integer::New(received_fd) : + Null() + ); + return scope.Close(Integer::New(bytes_read)); } @@ -640,49 +659,115 @@ static Handle Write(const Arguments& args) { } -// var bytesWritten = t.sendFD(self.fd) -// returns null on EAGAIN or EINTR, raises an exception on all other errors -static Handle SendFD(const Arguments& args) { +// var bytes = sendmsg(fd, buf, off, len, fd, flags); +// +// Write a buffer with optional offset and length to the given file +// descriptor. Note that we refuse to send 0 bytes. +// +// The 'fd' parameter is a numerical file descriptor, or the undefined value +// to send none. +// +// The 'flags' parameter is a number representing a bitmask of MSG_* values. +// This is passed directly to sendmsg(). +// +// Returns null on EAGAIN or EINTR, raises an exception on all other errors +static Handle SendMsg(const Arguments& args) { HandleScope scope; + struct iovec iov; + if (args.Length() < 2) { return ThrowException(Exception::TypeError( String::New("Takes 2 parameters"))); } + // The first argument should be a file descriptor FD_ARG(args[0]) - // TODO: make sure fd is a unix domain socket? - - if (!args[1]->IsInt32()) { + // Grab the actul data to be written, stuffing it into iov + if (!Buffer::HasInstance(args[1])) { return ThrowException(Exception::TypeError( - String::New("FD to send is not an integer"))); + String::New("Expected either a string or a buffer"))); } - int fd_to_send = args[1]->Int32Value(); + Buffer *buf = ObjectWrap::Unwrap(args[1]->ToObject()); + + size_t offset = 0; + if (args.Length() >= 3 && !args[2]->IsUndefined()) { + if (!args[2]->IsUint32()) { + return ThrowException(Exception::TypeError( + String::New("Expected unsigned integer for offset"))); + } + + offset = args[2]->Uint32Value(); + if (offset >= buf->length()) { + return ThrowException(Exception::Error( + String::New("Offset into buffer too large"))); + } + } + + size_t length = buf->length() - offset; + if (args.Length() >= 4 && !args[3]->IsUndefined()) { + if (!args[3]->IsUint32()) { + return ThrowException(Exception::TypeError( + String::New("Expected unsigned integer for length"))); + } + + length = args[3]->Uint32Value(); + if (offset + length > buf->length()) { + return ThrowException(Exception::Error( + String::New("offset + length beyond buffer length"))); + } + } + + iov.iov_base = buf->data() + offset; + iov.iov_len = length; + + int fd_to_send = -1; + if (args.Length() >= 5 && !args[4]->IsUndefined()) { + if (!args[4]->IsUint32()) { + return ThrowException(Exception::TypeError( + String::New("Expected unsigned integer for a file descriptor"))); + } + + fd_to_send = args[4]->Uint32Value(); + } + + int flags = 0; + if (args.Length() >= 6 && !args[5]->IsUndefined()) { + if (!args[5]->IsUint32()) { + return ThrowException(Exception::TypeError( + String::New("Expected unsigned integer for a flags argument"))); + } + + flags = args[5]->Uint32Value(); + } struct msghdr msg; - struct iovec iov[1]; - char control_msg[CMSG_SPACE(sizeof(fd_to_send))]; - struct cmsghdr *cmsg; - static char dummy = 'd'; // Need to send at least a byte of data in the message + char scratch[64]; - iov[0].iov_base = &dummy; - iov[0].iov_len = 1; - msg.msg_iov = iov; + msg.msg_iov = &iov; msg.msg_iovlen = 1; msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_flags = 0; - msg.msg_control = (void *) control_msg; - msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send)); - cmsg = CMSG_FIRSTHDR(&msg); - cmsg->cmsg_level = SOL_SOCKET; - cmsg->cmsg_type = SCM_RIGHTS; - cmsg->cmsg_len = msg.msg_controllen; - *(int*) CMSG_DATA(cmsg) = fd_to_send; + msg.msg_control = NULL; + msg.msg_controllen = 0; + + if (fd_to_send >= 0) { + struct cmsghdr *cmsg; + + msg.msg_control = (void *) scratch; + msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send)); + + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = msg.msg_controllen; + *(int*) CMSG_DATA(cmsg) = fd_to_send; + } - ssize_t written = sendmsg(fd, &msg, 0); + ssize_t written = sendmsg(fd, &msg, flags); if (written < 0) { if (errno == EAGAIN || errno == EINTR) return Null(); @@ -805,7 +890,7 @@ void InitNet(Handle target) { NODE_SET_METHOD(target, "write", Write); NODE_SET_METHOD(target, "read", Read); - NODE_SET_METHOD(target, "sendFD", SendFD); + NODE_SET_METHOD(target, "sendMsg", SendMsg); recv_msg_template = Persistent::New(FunctionTemplate::New(RecvMsg)); diff --git a/test/fixtures/recvfd.js b/test/fixtures/recvfd.js new file mode 100644 index 00000000000..27e8fc644a3 --- /dev/null +++ b/test/fixtures/recvfd.js @@ -0,0 +1,57 @@ +// See test/simple/test-sendfd.js for a complete description of what this +// script is doing and how it fits into the test as a whole. + +var net = require('net'); +var sys = require('sys'); + +var receivedData = []; +var receivedFDs = []; +var numSentMessages = 0; + +function processData(s) { + if (receivedData.length == 0 || receivedFDs.length == 0) { + return; + } + + var fd = receivedFDs.shift(); + var d = receivedData.shift(); + + // Augment our received object before sending it back across the pipe. + d.pid = process.pid; + + // Create a stream around the FD that we received and send a serialized + // version of our modified object back. Clean up when we're done. + var pipeStream = new net.Stream(fd); + + var drainFunc = function() { + pipeStream.destroy(); + + if (++numSentMessages == 2) { + s.destroy(); + } + }; + + pipeStream.addListener('drain', drainFunc); + pipeStream.resume(); + + if (pipeStream.write(JSON.stringify(d) + '\n')) { + drainFunc(); + } +}; + +// Create a UNIX socket to the path defined by argv[2] and read a file +// descriptor and misc data from it. +var s = new net.Stream(); +s.addListener('fd', function(fd) { + receivedFDs.unshift(fd); + processData(s); +}); +s.addListener('data', function(data) { + data.toString('utf8').trim().split('\n').forEach(function(d) { + receivedData.unshift(JSON.parse(d)); + }); + processData(s); +}); +s.connect(process.argv[2]); + +// vim:ts=2 sw=2 et diff --git a/test/simple/test-sendfd.js b/test/simple/test-sendfd.js new file mode 100644 index 00000000000..9c5d7313bda --- /dev/null +++ b/test/simple/test-sendfd.js @@ -0,0 +1,126 @@ +// Test sending and receiving a file descriptor. +// +// This test is pretty complex. It ends up spawning test/fixtures/recvfd.js +// as a child to test desired behavior. What happens is +// +// 1. Create an in-memory pipe via pipe(2). These two file descriptors +// are not visible to any other process, and so make a good test-case +// for sharing. +// 2. Create a a UNIX socket at SOCK_PATH. When a client connects to this +// path, they are sent the write end of the pipe from above. +// 3. The client is sent n JSON representations of the DATA variable, each +// with a different ordinal. We send these delimited by '\n' strings +// so that the receiving end can avoid any coalescing that hapepns +// due to the stream nature of the socket (e.g. '{}{}' is not a valid +// JSON string). +// 4. The child process receives file descriptors and JSON blobs and, +// whenever it has at least one of each, writes a modified JSON blob +// to the FD. The blob is modified to include the child's process ID. +// 5. Once the child process has sent n responses, it closes the write end +// of the pipe, which signals to the parent that there is no more data +// coming. +// 6. The parent listens to the read end of the pipe, accumulating JSON +// blobs (again, delimited by '\n') and verifying that a) the 'pid' +// attribute belongs to the child and b) the 'ord' field has not been +// seen in a response yet. This is intended to ensure that all blobs +// sent out have been relayed back to us. + +require('../common'); + +var buffer = require('buffer'); +var child_process = require('child_process'); +var fs = require('fs'); +var net = require('net'); +var netBinding = process.binding('net'); +var path = require('path'); +var sys = require('sys'); + +var DATA = { + 'ppid' : process.pid, + 'ord' : 0 +}; + +var SOCK_PATH = path.join( + __dirname, + '..', + path.basename(__filename, '.js') + '.sock' +); + +var logChild = function(d) { + if (typeof d == 'object') { + d = d.toString(); + } + + d.split('\n').forEach(function(l) { + if (l.length > 0) { + sys.debug('CHILD: ' + l); + } + }); +}; + +// Create a pipe +// +// We establish a listener on the read end of the pipe so that we can +// validate any data sent back by the child. We send the write end of the +// pipe to the child and close it off in our process. +var pipeFDs = netBinding.pipe(); +assert.equal(pipeFDs.length, 2); + +var seenOrdinals = []; + +var pipeReadStream = new net.Stream(); +pipeReadStream.addListener('data', function(data) { + data.toString('utf8').trim().split('\n').forEach(function(d) { + var rd = JSON.parse(d); + + assert.equal(rd.pid, cpp); + assert.equal(seenOrdinals.indexOf(rd.ord), -1); + + seenOrdinals.unshift(rd.ord); + }); +}); +pipeReadStream.open(pipeFDs[0]); +pipeReadStream.resume(); + +// Create a UNIX socket at SOCK_PATH and send DATA and the write end +// of the pipe to whoever connects. +// +// We send two messages here, both with the same pipe FD: one string, and +// one buffer. We want to make sure that both datatypes are handled +// correctly. +var srv = net.createServer(function(s) { + var str = JSON.stringify(DATA) + '\n'; + + DATA.ord = DATA.ord + 1; + var buf = new buffer.Buffer(str.length); + buf.write(JSON.stringify(DATA) + '\n', 'utf8'); + + s.write(str, 'utf8', pipeFDs[1]); + if (s.write(buf, undefined, pipeFDs[1])) { + netBinding.close(pipeFDs[1]); + } else { + s.addListener('drain', function() { + netBinding.close(pipeFDs[1]); + }); + } +}); +srv.listen(SOCK_PATH); + +// Spawn a child running test/fixtures/recvfd.js +var cp = child_process.spawn(process.argv[0], + [path.join(fixturesDir, 'recvfd.js'), SOCK_PATH]); + +cp.stdout.addListener('data', logChild); +cp.stderr.addListener('data', logChild); + +// When the child exits, clean up and validate its exit status +var cpp = cp.pid; +cp.addListener('exit', function(code, signal) { + srv.close(); + // fs.unlinkSync(SOCK_PATH); + + assert.equal(code, 0); + assert.equal(seenOrdinals.length, 2); +}); + +// vim:ts=2 sw=2 et