Permalink
Browse files

Implement datagram sockets

- Adds new dgram module, for all data-gram type transports
- Supports both UDP client and servers
- Supports Unix Daemon sockets in DGRAM mode too (think syslog)
- Uses a shared Buffer and slices that as needed to be reasonably
  performant.
- One supplied test program so far, test-dgram-pingpong
- Passes test cases on osx 10.6 and ubuntu 9.10u
  • Loading branch information...
1 parent e65e603 commit 02da5ed4a1f63bb0990b8e7b1fd0793cd045cbb0 @pquerna pquerna committed with ry Jun 12, 2010
Showing with 425 additions and 0 deletions.
  1. +184 −0 lib/dgram.js
  2. +1 −0 src/node.cc
  3. +157 −0 src/node_net.cc
  4. +83 −0 test/simple/test-dgram-pingpong.js
View
@@ -0,0 +1,184 @@
+var sys = require("sys");
+var fs = require("fs");
+var events = require("events");
+var dns = require('dns');
+
+var Buffer = require('buffer').Buffer;
+var IOWatcher = process.IOWatcher;
+var binding = process.binding('net');
+var socket = binding.socket;
+var bind = binding.bind;
+var recvfrom = binding.recvfrom;
+var sendto = binding.sendto;
+var close = binding.close;
+var ENOENT = binding.ENOENT;
+
+function isPort (x) { return parseInt(x) >= 0; }
+var pool = null;
+
+function getPool() {
+ /* TODO: this effectively limits you to 8kb maximum packet sizes */
+ var minPoolAvail = 1024 * 8;
+
+ var poolSize = 1024 * 64;
+
+ if (pool === null || (pool.used + minPoolAvail > pool.length)) {
+ pool = new Buffer(poolSize);
+ pool.used = 0;
+ }
+
+ return pool;
+}
+
+function Socket (listener) {
+ events.EventEmitter.call(this);
+ var self = this;
+
+ if (listener) {
+ self.addListener('message', listener);
+ }
+
+ self.watcher = new IOWatcher();
+ self.watcher.host = self;
+ self.watcher.callback = function () {
+ while (self.fd) {
+ var p = getPool();
+ var rinfo = recvfrom(self.fd, p, p.used, p.length - p.used, 0);
+
+ if (!rinfo) return;
+
+ self.emit('message', p.slice(p.used, p.used + rinfo.size), rinfo);
+
+ p.used += rinfo.size;
+ }
+ };
+}
+
+sys.inherits(Socket, events.EventEmitter);
+exports.Socket = Socket;
+
+exports.createSocket = function (listener) {
+ return new Socket(listener);
+};
+
+Socket.prototype.bind = function () {
+ var self = this;
+ if (self.fd) throw new Error('Server already opened');
+
+ if (!isPort(arguments[0])) {
+ /* TODO: unix path dgram */
+ self.fd = socket('unix_dgram');
+ self.type = 'unix_dgram';
+ var path = arguments[0];
+ self.path = path;
+ // unlink sockfile if it exists
+ fs.stat(path, function (err, r) {
+ if (err) {
+ if (err.errno == ENOENT) {
+ bind(self.fd, path);
+ process.nextTick(function() {
+ self._startWatcher();
+ });
+ } else {
+ throw r;
+ }
+ } else {
+ if (!r.isFile()) {
+ throw new Error("Non-file exists at " + path);
+ } else {
+ fs.unlink(path, function (err) {
+ if (err) {
+ throw err;
+ } else {
+ bind(self.fd, path);
+ process.nextTick(function() {
+ self._startWatcher();
+ });
+ }
+ });
+ }
+ }
+ });
+ } else if (!arguments[1]) {
+ // Don't bind(). OS will assign a port with INADDR_ANY.
+ // The port can be found with server.address()
+ self.type = 'udp4';
+ self.fd = socket(self.type);
+ bind(self.fd, arguments[0]);
+ process.nextTick(function() {
+ self._startWatcher();
+ });
+ } else {
+ // the first argument is the port, the second an IP
+ var port = arguments[0];
+ dns.lookup(arguments[1], function (err, ip, addressType) {
+ if (err) {
+ self.emit('error', err);
+ } else {
+ self.type = addressType == 4 ? 'udp4' : 'udp6';
+ self.fd = socket(self.type);
+ bind(self.fd, port, ip);
+ process.nextTick(function() {
+ self._startWatcher();
+ });
+ }
+ });
+ }
+};
+
+Socket.prototype._startWatcher = function () {
+ this.watcher.set(this.fd, true, false);
+ this.watcher.start();
+ this.emit("listening");
+};
+
+Socket.prototype.address = function () {
+ return getsockname(this.fd);
+};
+
+Socket.prototype.send = function(port, addr, buffer, offset, length) {
+ var self = this;
+
+ if (!isPort(arguments[0])) {
+ if (!self.fd) {
+ self.type = 'unix_dgram';
+ self.fd = socket(self.type);
+ }
+ sendto(self.fd, buffer, offset, length, 0, port, addr);
+ }
+ else {
+ dns.lookup(arguments[1], function (err, ip, addressType) {
+ if (err) {
+ self.emit('error', err);
+ } else {
+ if (!self.fd) {
+ self.type = addressType == 4 ? 'udp4' : 'udp6';
+ self.fd = socket(self.type);
+ process.nextTick(function() {
+ self._startWatcher();
+ });
+ }
+ sendto(self.fd, buffer, offset, length, 0, port, ip);
+ }
+ });
+ }
+};
+
+Socket.prototype.close = function () {
+ var self = this;
+
+ if (!self.fd) throw new Error('Not running');
+
+ self.watcher.stop();
+
+ close(self.fd);
+ self.fd = null;
+
+ if (self.type === "unix_dgram") {
+ fs.unlink(self.path, function () {
+ self.emit("close");
+ });
+ } else {
+ self.emit("close");
+ }
+};
View
@@ -1845,6 +1845,7 @@ static Handle<Value> Binding(const Arguments& args) {
exports->Set(String::New("assert"), String::New(native_assert));
exports->Set(String::New("buffer"), String::New(native_buffer));
exports->Set(String::New("child_process"),String::New(native_child_process));
+ exports->Set(String::New("dgram"), String::New(native_dgram));
exports->Set(String::New("dns"), String::New(native_dns));
exports->Set(String::New("events"), String::New(native_events));
exports->Set(String::New("file"), String::New(native_file));
View
@@ -42,6 +42,7 @@ static Persistent<String> errno_symbol;
static Persistent<String> syscall_symbol;
static Persistent<String> fd_symbol;
+static Persistent<String> size_symbol;
static Persistent<String> address_symbol;
static Persistent<String> port_symbol;
static Persistent<String> type_symbol;
@@ -147,7 +148,16 @@ static Handle<Value> Socket(const Arguments& args) {
} else if (0 == strcasecmp(*t, "UNIX")) {
domain = PF_UNIX;
type = SOCK_STREAM;
+ } else if (0 == strcasecmp(*t, "UNIX_DGRAM")) {
+ domain = PF_UNIX;
+ type = SOCK_DGRAM;
} else if (0 == strcasecmp(*t, "UDP")) {
+ domain = PF_INET;
+ type = SOCK_DGRAM;
+ } else if (0 == strcasecmp(*t, "UDP4")) {
+ domain = PF_INET;
+ type = SOCK_DGRAM;
+ } else if (0 == strcasecmp(*t, "UDP6")) {
domain = PF_INET6;
type = SOCK_DGRAM;
} else {
@@ -520,6 +530,63 @@ static Handle<Value> Read(const Arguments& args) {
return scope.Close(Integer::New(bytes_read));
}
+// var info = t.recvfrom(fd, buffer, offset, length, flags);
+// info.size // bytes read
+// info.port // from port
+// info.address // from address
+// returns null on EAGAIN or EINTR, raises an exception on all other errors
+// returns object otherwise
+static Handle<Value> RecvFrom(const Arguments& args) {
+ HandleScope scope;
+
+ if (args.Length() < 5) {
+ return ThrowException(Exception::TypeError(
+ String::New("Takes 5 parameters")));
+ }
+
+ FD_ARG(args[0])
+
+ if (!Buffer::HasInstance(args[1])) {
+ return ThrowException(Exception::TypeError(
+ String::New("Second argument should be a buffer")));
+ }
+
+ Buffer * buffer = ObjectWrap::Unwrap<Buffer>(args[1]->ToObject());
+
+ size_t off = args[2]->Int32Value();
+ if (off >= buffer->length()) {
+ return ThrowException(Exception::Error(
+ String::New("Offset is out of bounds")));
+ }
+
+ size_t len = args[3]->Int32Value();
+ if (off + len > buffer->length()) {
+ return ThrowException(Exception::Error(
+ String::New("Length is extends beyond buffer")));
+ }
+
+ int flags = args[4]->Int32Value();
+
+ struct sockaddr_storage address_storage;
+ socklen_t addrlen = sizeof(struct sockaddr_storage);
+
+ ssize_t bytes_read = recvfrom(fd, (char*)buffer->data() + off, len, flags,
+ (struct sockaddr*) &address_storage, &addrlen);
+
+ if (bytes_read < 0) {
+ if (errno == EAGAIN || errno == EINTR) return Null();
+ return ThrowException(ErrnoException(errno, "read"));
+ }
+
+ Local<Object> info = Object::New();
+
+ info->Set(size_symbol, Integer::New(bytes_read));
+
+ ADDRESS_TO_JS(info, address_storage);
+
+ return scope.Close(info);
+}
+
// bytesRead = t.recvMsg(fd, buffer, offset, length)
// if (recvMsg.fd) {
@@ -780,6 +847,93 @@ static Handle<Value> SendMsg(const Arguments& args) {
return scope.Close(Integer::New(written));
}
+// var bytes = sendto(fd, buf, off, len, flags, destination port, desitnation address);
+//
+// Write a buffer with optional offset and length to the given file
+// descriptor. Note that we refuse to send 0 bytes.
+//
+// The 'fd' parameter is a numerical file descriptor, or the undefined value
+// to send none.
+//
+// The 'flags' parameter is a number representing a bitmask of MSG_* values.
+// This is passed directly to sendmsg().
+//
+// The destination port can either be an int port, or a path.
+//
+// Returns null on EAGAIN or EINTR, raises an exception on all other errors
+static Handle<Value> SendTo(const Arguments& args) {
+ HandleScope scope;
+
+ if (args.Length() < 5) {
+ return ThrowException(Exception::TypeError(
+ String::New("Takes 5 or 6 parameters")));
+ }
+
+ // The first argument should be a file descriptor
+ FD_ARG(args[0])
+
+ // Grab the actul data to be written
+ if (!Buffer::HasInstance(args[1])) {
+ return ThrowException(Exception::TypeError(
+ String::New("Expected either a string or a buffer")));
+ }
+
+ Buffer *buf = ObjectWrap::Unwrap<Buffer>(args[1]->ToObject());
+
+ size_t offset = 0;
+ if (args.Length() >= 3 && !args[2]->IsUndefined()) {
+ if (!args[2]->IsUint32()) {
+ return ThrowException(Exception::TypeError(
+ String::New("Expected unsigned integer for offset")));
+ }
+
+ offset = args[2]->Uint32Value();
+ if (offset >= buf->length()) {
+ return ThrowException(Exception::Error(
+ String::New("Offset into buffer too large")));
+ }
+ }
+
+ size_t length = buf->length() - offset;
+ if (args.Length() >= 4 && !args[3]->IsUndefined()) {
+ if (!args[3]->IsUint32()) {
+ return ThrowException(Exception::TypeError(
+ String::New("Expected unsigned integer for length")));
+ }
+
+ length = args[3]->Uint32Value();
+ if (offset + length > buf->length()) {
+ return ThrowException(Exception::Error(
+ String::New("offset + length beyond buffer length")));
+ }
+ }
+
+ int flags = 0;
+ if (args.Length() >= 5 && !args[4]->IsUndefined()) {
+ if (!args[4]->IsUint32()) {
+ return ThrowException(Exception::TypeError(
+ String::New("Expected unsigned integer for a flags argument")));
+ }
+
+ flags = args[4]->Uint32Value();
+ }
+
+ Handle<Value> error = ParseAddressArgs(args[5], args[6], false);
+ if (!error.IsEmpty()) return ThrowException(error);
+
+ ssize_t written = sendto(fd, buf->data() + offset, length, flags, addr, addrlen);
+
+ if (written < 0) {
+ if (errno == EAGAIN || errno == EINTR) return Null();
+ return ThrowException(ErrnoException(errno, "sendmsg"));
+ }
+
+ /* Note that the FD isn't explicitly closed here, this
+ * happens in the JS */
+
+ return scope.Close(Integer::New(written));
+}
+
// Probably only works for Linux TCP sockets?
// Returns the amount of data on the read queue.
@@ -891,6 +1045,8 @@ void InitNet(Handle<Object> target) {
NODE_SET_METHOD(target, "read", Read);
NODE_SET_METHOD(target, "sendMsg", SendMsg);
+ NODE_SET_METHOD(target, "recvfrom", RecvFrom);
+ NODE_SET_METHOD(target, "sendto", SendTo);
recv_msg_template =
Persistent<FunctionTemplate>::New(FunctionTemplate::New(RecvMsg));
@@ -927,6 +1083,7 @@ void InitNet(Handle<Object> target) {
errno_symbol = NODE_PSYMBOL("errno");
syscall_symbol = NODE_PSYMBOL("syscall");
fd_symbol = NODE_PSYMBOL("fd");
+ size_symbol = NODE_PSYMBOL("size");
address_symbol = NODE_PSYMBOL("address");
port_symbol = NODE_PSYMBOL("port");
}
Oops, something went wrong. Retry.

0 comments on commit 02da5ed

Please sign in to comment.