Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
cluster: support datagram sockets
  • Loading branch information
piscisaureus committed Jan 28, 2013
1 parent c13354e commit 5e7e51c
Show file tree
Hide file tree
Showing 9 changed files with 356 additions and 49 deletions.
32 changes: 23 additions & 9 deletions lib/child_process.js
Expand Up @@ -155,6 +155,18 @@ var handleConversion = {

emit(socket);
}
},

'dgram.Native': {
simultaneousAccepts: false,

send: function(message, handle) {
return handle;
},

got: function(message, handle, emit) {
emit(handle);
}
}
};

Expand Down Expand Up @@ -355,18 +367,20 @@ function setupChannel(target, channel) {
// this message will be handled by an internalMessage event handler
message = {
cmd: 'NODE_HANDLE',
type: 'net.',
msg: message
};

switch (handle.constructor.name) {
case 'Socket':
message.type += 'Socket'; break;
case 'Server':
message.type += 'Server'; break;
case 'Pipe':
case 'TCP':
message.type += 'Native'; break;
if (handle instanceof net.Socket) {
message.type = 'net.Socket';
} else if (handle instanceof net.Server) {
message.type = 'net.Server';
} else if (handle instanceof process.binding('tcp_wrap').TCP ||
handle instanceof process.binding('pipe_wrap').Pipe) {
message.type = 'net.Native';
} else if (handle instanceof process.binding('udp_wrap').UDP) {
message.type = 'dgram.Native';
} else {
throw new TypeError("This handle type can't be sent");
}

var obj = handleConversion[message.type];
Expand Down
13 changes: 10 additions & 3 deletions lib/cluster.js
Expand Up @@ -227,8 +227,14 @@ if (cluster.isMaster) {

if (serverHandlers.hasOwnProperty(key)) {
handler = serverHandlers[key];
} else if (message.addressType === 'udp4' ||
message.addressType === 'udp6') {
var dgram = require('dgram');
handler = dgram._createSocketHandle.apply(net, args);
serverHandlers[key] = handler;
} else {
handler = serverHandlers[key] = net._createServerHandle.apply(net, args);
handler = net._createServerHandle.apply(net, args);
serverHandlers[key] = handler;
}

// echo callback with the fd handler associated with it
Expand Down Expand Up @@ -259,9 +265,9 @@ if (cluster.isMaster) {
messageHandler.suicide = function(message, worker) {
worker.suicide = true;
};

}


// Messages to a worker will be handled using these methods
else if (cluster.isWorker) {

Expand Down Expand Up @@ -541,7 +547,8 @@ cluster._setupWorker = function() {
sendInternalMessage(worker, { cmd: 'online' });
};

// Internal function. Called by lib/net.js when attempting to bind a server.
// Internal function. Called by net.js and dgram.js when attempting to bind a
// TCP server or UDP socket.
cluster._getServer = function(tcpSelf, address, port, addressType, fd, cb) {
// This can only be called from a worker.
assert(cluster.isWorker);
Expand Down
90 changes: 72 additions & 18 deletions lib/dgram.js
Expand Up @@ -19,6 +19,7 @@
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.

var assert = require('assert');
var util = require('util');
var events = require('events');

Expand All @@ -29,6 +30,7 @@ var BIND_STATE_BINDING = 1;
var BIND_STATE_BOUND = 2;

// lazily loaded
var cluster = null;
var dns = null;
var net = null;

Expand Down Expand Up @@ -86,6 +88,24 @@ function newHandle(type) {
}


exports._createSocketHandle = function(address, port, addressType, fd) {
// Opening an existing fd is not supported for UDP handles.
assert(typeof fd !== 'number' || fd < 0);

var handle = newHandle(addressType);

if (port || address) {
var r = handle.bind(address, port || 0, 0);
if (r == -1) {
handle.close();
handle = null;
}
}

return handle;
};


function Socket(type, listener) {
events.EventEmitter.call(this);

Expand All @@ -110,41 +130,75 @@ exports.createSocket = function(type, listener) {
};


function startListening(socket) {
socket._handle.onmessage = onMessage;
// Todo: handle errors
socket._handle.recvStart();
socket._receiving = true;
socket._bindState = BIND_STATE_BOUND;
socket.fd = -42; // compatibility hack

socket.emit('listening');
}


Socket.prototype.bind = function(port, address, callback) {
var self = this;

self._healthCheck();

if (this._bindState != BIND_STATE_UNBOUND)
throw new Error('Socket is already bound');

this._bindState = BIND_STATE_BINDING;

if (typeof callback === 'function')
self.once('listening', callback);

// resolve address first
self._handle.lookup(address, function(err, ip) {
self._bindState = BIND_STATE_UNBOUND;

if (!self._handle)
return; // handle has been closed in the mean time

if (err) {
self._bindState = BIND_STATE_UNBOUND;
self.emit('error', err);
return;
}

if (self._handle.bind(ip, port || 0, /*flags=*/ 0)) {
self.emit('error', errnoException(errno, 'bind'));
return;
}

self._handle.onmessage = onMessage;
self._handle.recvStart();
self._receiving = true;
self._bindState = BIND_STATE_BOUND;
self.fd = -42; // compatibility hack
if (!cluster)
cluster = require('cluster');

if (cluster.isWorker) {
cluster._getServer(self, ip, port, self.type, -1, function(handle) {
if (!self._handle)
// handle has been closed in the mean time.
return handle.close();

// Set up the handle that we got from master.
handle.lookup = self._handle.lookup;
handle.bind = self._handle.bind;
handle.send = self._handle.send;
handle.owner = self;

// Replace the existing handle by the handle we got from master.
self._handle.close();
self._handle = handle;

startListening(self);
});

} else {
if (!self._handle)
return; // handle has been closed in the mean time

if (self._handle.bind(ip, port || 0, /*flags=*/ 0)) {
self.emit('error', errnoException(errno, 'bind'));
self._bindState = BIND_STATE_UNBOUND;
// Todo: close?
return;
}

self.emit('listening');
startListening(self);
}
});

self._bindState = BIND_STATE_BINDING;
};


Expand Down
2 changes: 2 additions & 0 deletions src/handle_wrap.h
Expand Up @@ -53,6 +53,8 @@ class HandleWrap {
static v8::Handle<v8::Value> Ref(const v8::Arguments& args);
static v8::Handle<v8::Value> Unref(const v8::Arguments& args);

inline uv_handle_t* GetHandle() { return handle__; };

protected:
HandleWrap(v8::Handle<v8::Object> object, uv_handle_t* handle);
virtual ~HandleWrap();
Expand Down
54 changes: 37 additions & 17 deletions src/stream_wrap.cc
Expand Up @@ -27,6 +27,7 @@
#include "pipe_wrap.h"
#include "tcp_wrap.h"
#include "req_wrap.h"
#include "udp_wrap.h"
#include "node_counters.h"

#include <stdlib.h> // abort()
Expand Down Expand Up @@ -118,7 +119,7 @@ StreamWrap::StreamWrap(Handle<Object> object, uv_stream_t* stream)

void StreamWrap::SetHandle(uv_handle_t* h) {
HandleWrap::SetHandle(h);
stream_ = (uv_stream_t*)h;
stream_ = reinterpret_cast<uv_stream_t*>(h);
stream_->data = this;
}

Expand Down Expand Up @@ -173,6 +174,28 @@ uv_buf_t StreamWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size) {
}


template <class WrapType, class UVType>
static Local<Object> AcceptHandle(uv_stream_t* pipe) {
HandleScope scope;
Local<Object> wrap_obj;
WrapType* wrap;
UVType* handle;

wrap_obj = WrapType::Instantiate();
if (wrap_obj.IsEmpty())
return Local<Object>();

wrap = static_cast<WrapType*>(
wrap_obj->GetAlignedPointerFromInternalField(0));
handle = wrap->UVHandle();

if (uv_accept(pipe, reinterpret_cast<uv_stream_t*>(handle)))
abort();

return scope.Close(wrap_obj);
}


void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread,
uv_buf_t buf, uv_handle_type pending) {
HandleScope scope;
Expand Down Expand Up @@ -212,19 +235,16 @@ void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread,

Local<Object> pending_obj;
if (pending == UV_TCP) {
pending_obj = TCPWrap::Instantiate();
pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(handle);
} else if (pending == UV_NAMED_PIPE) {
pending_obj = PipeWrap::Instantiate();
pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(handle);
} else if (pending == UV_UDP) {
pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(handle);
} else {
// We only support sending UV_TCP and UV_NAMED_PIPE right now.
assert(pending == UV_UNKNOWN_HANDLE);
}

if (!pending_obj.IsEmpty()) {
assert(pending_obj->InternalFieldCount() > 0);
StreamWrap* pending_wrap = static_cast<StreamWrap*>(
pending_obj->GetAlignedPointerFromInternalField(0));
if (uv_accept(handle, pending_wrap->GetStream())) abort();
argv[3] = pending_obj;
argc++;
}
Expand All @@ -246,7 +266,7 @@ void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {

void StreamWrap::OnRead2(uv_pipe_t* handle, ssize_t nread, uv_buf_t buf,
uv_handle_type pending) {
OnReadCommon((uv_stream_t*)handle, nread, buf, pending);
OnReadCommon(reinterpret_cast<uv_stream_t*>(handle), nread, buf, pending);
}


Expand Down Expand Up @@ -404,29 +424,29 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
StreamWrap::AfterWrite);

} else {
uv_stream_t* send_stream = NULL;
uv_handle_t* send_handle = NULL;

if (args[1]->IsObject()) {
Local<Object> send_stream_obj = args[1]->ToObject();
assert(send_stream_obj->InternalFieldCount() > 0);
StreamWrap* send_stream_wrap = static_cast<StreamWrap*>(
send_stream_obj->GetAlignedPointerFromInternalField(0));
send_stream = send_stream_wrap->GetStream();
Local<Object> send_handle_obj = args[1]->ToObject();
assert(send_handle_obj->InternalFieldCount() > 0);
HandleWrap* send_handle_wrap = static_cast<HandleWrap*>(
send_handle_obj->GetAlignedPointerFromInternalField(0));
send_handle = send_handle_wrap->GetHandle();

// Reference StreamWrap instance to prevent it from being garbage
// collected before `AfterWrite` is called.
if (handle_sym.IsEmpty()) {
handle_sym = NODE_PSYMBOL("handle");
}
assert(!req_wrap->object_.IsEmpty());
req_wrap->object_->Set(handle_sym, send_stream_obj);
req_wrap->object_->Set(handle_sym, send_handle_obj);
}

r = uv_write2(&req_wrap->req_,
wrap->stream_,
&buf,
1,
send_stream,
reinterpret_cast<uv_stream_t*>(send_handle),
StreamWrap::AfterWrite);
}

Expand Down
17 changes: 15 additions & 2 deletions src/udp_wrap.cc
Expand Up @@ -40,6 +40,7 @@ typedef ReqWrap<uv_udp_send_t> SendWrap;
// see tcp_wrap.cc
Local<Object> AddressToJS(const sockaddr* addr);

static Persistent<Function> constructor;
static Persistent<String> buffer_sym;
static Persistent<String> oncomplete_sym;
static Persistent<String> onmessage_sym;
Expand Down Expand Up @@ -98,8 +99,9 @@ void UDPWrap::Initialize(Handle<Object> target) {
NODE_SET_PROTOTYPE_METHOD(t, "ref", HandleWrap::Ref);
NODE_SET_PROTOTYPE_METHOD(t, "unref", HandleWrap::Unref);

target->Set(String::NewSymbol("UDP"),
Persistent<FunctionTemplate>::New(t)->GetFunction());
constructor = Persistent<Function>::New(
Persistent<FunctionTemplate>::New(t)->GetFunction());
target->Set(String::NewSymbol("UDP"), constructor);
}


Expand Down Expand Up @@ -392,6 +394,17 @@ UDPWrap* UDPWrap::Unwrap(Local<Object> obj) {
}


Local<Object> UDPWrap::Instantiate() {
// If this assert fires then Initialize hasn't been called yet.
assert(constructor.IsEmpty() == false);

HandleScope scope;
Local<Object> obj = constructor->NewInstance();

return scope.Close(obj);
}


uv_udp_t* UDPWrap::UVHandle() {
return &handle_;
}
Expand Down
1 change: 1 addition & 0 deletions src/udp_wrap.h
Expand Up @@ -33,6 +33,7 @@ class UDPWrap: public HandleWrap {
static Handle<Value> SetTTL(const Arguments& args);
static UDPWrap* Unwrap(Local<Object> obj);

static Local<Object> Instantiate();
uv_udp_t* UVHandle();

private:
Expand Down

0 comments on commit 5e7e51c

Please sign in to comment.