Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

cluster: support UDP sockets (WIP) #4495

Closed
wants to merge 2 commits into from

3 participants

Bert Belder Ben Noordhuis Timothy J Fontaine
Bert Belder

@bnoordhuis can you review this. I know it is rather messy, because uv_udp_t is not a subclass of uv_stream_t. My idea is that in the future this should be the case.

And yes, the libuv changes should go into a libuv branch. This just helps reviewing it as a whole.

Ben Noordhuis

because uv_udp_t is not a subclass of uv_stream_t. My idea is that in the future this should be the case.

Not this discussion again. :-/

lib/child_process.js
((5 lines not shown))
msg: message
};
switch (handle.constructor.name) {
case 'Socket':
- message.type += 'Socket'; break;
+ if (handle instanceof net.Socket) {
+ message.type = 'net.Socket';
+ } else {
+ message.type = 'dgram.Socket';
+ }

Maybe this instead?

} else if (handle instanceof dgram.Socket) {
  message.type = 'dgram.Socket';
} else {
  assert(0);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
lib/dgram.js
((66 lines not shown))
- self.emit('listening');
+ // Todo: emit async.
+ self.emit('listening');

Maybe DRY this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
src/stream_wrap.cc
@@ -171,6 +172,27 @@ uv_buf_t StreamWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size) {
}
+template <class WrapType, class UVType>
+static Local<Object> accept_handle(uv_stream_t* pipe) {
+ HandleScope scope;
+ Local<Object> wrap_obj;
+ WrapType* wrap;
+ UVType* handle;
+
+ wrap_obj = WrapType::Instantiate();
+ if (wrap_obj.IsEmpty())
+ return Local<Object>();
+
+ wrap = static_cast<WrapType*>(wrap_obj->GetPointerFromInternalField(0));
+ handle = wrap->UVHandle();
+
+ if (uv_accept(pipe, (uv_stream_t*) handle))

reinterpret_cast

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
src/stream_wrap.cc
@@ -171,6 +172,27 @@ uv_buf_t StreamWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size) {
}
+template <class WrapType, class UVType>
+static Local<Object> accept_handle(uv_stream_t* pipe) {

AcceptHandle?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
src/stream_wrap.cc
((18 lines not shown))
}
r = uv_write2(&req_wrap->req_,
wrap->stream_,
&buf,
1,
- send_stream,
+ (uv_stream_t*) send_handle,

reinterpret_cast

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
deps/uv/src/unix/stream.c
@@ -683,6 +699,21 @@ static void uv__write_req_finish(uv_write_t* req) {
}
+static int uv__handle_fd(uv_handle_t* handle) {

uv_stream_t? gcc complains about it:

[12/18] CC obj/deps/uv/src/unix/libuv.stream.o
../../deps/uv/src/unix/stream.c: In function ‘uv__write’:
../../deps/uv/src/unix/stream.c:763:5: warning: passing argument 1 of ‘uv__handle_fd’ from incompatible pointer type [enabled by default]
../../deps/uv/src/unix/stream.c:702:12: note: expected ‘struct uv_handle_t *’ but argument is of type ‘struct uv_stream_t *’

EDIT: You can, of course, also cast req->send_handle to uv_handle_t further down below.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Ben Noordhuis

LGTM insofar that an ugly hack can look good. :-)

I would probably have opted for a uv_udp_send2() function instead of merging it with the streams code. But with that approach you'd also need a uv_udp_send62() function and that's just too horrible to contemplate.

Maybe we can merge some of the udp/streams code in the future without making uv_udp_t an instance of uv_stream_t.

lib/child_process.js
((5 lines not shown))
msg: message
};
switch (handle.constructor.name) {
case 'Socket':
- message.type += 'Socket'; break;
+ if (!handle instanceof net.Socket)

Operator precedence bug: !handle instanceof net.Socket is always false.

Bert Belder Owner

this switch over handle.constructor.name is stupid anyway

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
lib/dgram.js
((59 lines not shown))
- self.emit('listening');
- });
+ // Replace the existing handle by the handle we got from master.
+ self._handle.close();
Timothy J Fontaine Owner

probably want to do handle.lookup = self._handle.lookup otherwise sending fails because ._handle doesn't have a .lookup

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
piscisaureus added some commits
Bert Belder piscisaureus cluster: support datagram sockets 5e7e51c
Bert Belder piscisaureus dgram: avoid EventEmitter leak warning
When a datagram socket hasn't been bound yet, node will defer `send()`
operations until binding has completed. Before this patch a `listening`
listener would be installed every time `send` was called. This triggered
an EventEmitter leak warning when more than 10 packets were sent in a
tight loop. Therefore switch to using a single `listening` listener, and
use an array to enqueue outbound packets.
f4f86cf
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jan 28, 2013
  1. Bert Belder
  2. Bert Belder

    dgram: avoid EventEmitter leak warning

    piscisaureus authored
    When a datagram socket hasn't been bound yet, node will defer `send()`
    operations until binding has completed. Before this patch a `listening`
    listener would be installed every time `send` was called. This triggered
    an EventEmitter leak warning when more than 10 packets were sent in a
    tight loop. Therefore switch to using a single `listening` listener, and
    use an array to enqueue outbound packets.
This page is out of date. Refresh to see the latest.
32 lib/child_process.js
View
@@ -155,6 +155,18 @@ var handleConversion = {
emit(socket);
}
+ },
+
+ 'dgram.Native': {
+ simultaneousAccepts: false,
+
+ send: function(message, handle) {
+ return handle;
+ },
+
+ got: function(message, handle, emit) {
+ emit(handle);
+ }
}
};
@@ -355,18 +367,20 @@ function setupChannel(target, channel) {
// this message will be handled by an internalMessage event handler
message = {
cmd: 'NODE_HANDLE',
- type: 'net.',
msg: message
};
- switch (handle.constructor.name) {
- case 'Socket':
- message.type += 'Socket'; break;
- case 'Server':
- message.type += 'Server'; break;
- case 'Pipe':
- case 'TCP':
- message.type += 'Native'; break;
+ if (handle instanceof net.Socket) {
+ message.type = 'net.Socket';
+ } else if (handle instanceof net.Server) {
+ message.type = 'net.Server';
+ } else if (handle instanceof process.binding('tcp_wrap').TCP ||
+ handle instanceof process.binding('pipe_wrap').Pipe) {
+ message.type = 'net.Native';
+ } else if (handle instanceof process.binding('udp_wrap').UDP) {
+ message.type = 'dgram.Native';
+ } else {
+ throw new TypeError("This handle type can't be sent");
}
var obj = handleConversion[message.type];
13 lib/cluster.js
View
@@ -227,8 +227,14 @@ if (cluster.isMaster) {
if (serverHandlers.hasOwnProperty(key)) {
handler = serverHandlers[key];
+ } else if (message.addressType === 'udp4' ||
+ message.addressType === 'udp6') {
+ var dgram = require('dgram');
+ handler = dgram._createSocketHandle.apply(net, args);
+ serverHandlers[key] = handler;
} else {
- handler = serverHandlers[key] = net._createServerHandle.apply(net, args);
+ handler = net._createServerHandle.apply(net, args);
+ serverHandlers[key] = handler;
}
// echo callback with the fd handler associated with it
@@ -259,9 +265,9 @@ if (cluster.isMaster) {
messageHandler.suicide = function(message, worker) {
worker.suicide = true;
};
-
}
+
// Messages to a worker will be handled using these methods
else if (cluster.isWorker) {
@@ -541,7 +547,8 @@ cluster._setupWorker = function() {
sendInternalMessage(worker, { cmd: 'online' });
};
-// Internal function. Called by lib/net.js when attempting to bind a server.
+// Internal function. Called by net.js and dgram.js when attempting to bind a
+// TCP server or UDP socket.
cluster._getServer = function(tcpSelf, address, port, addressType, fd, cb) {
// This can only be called from a worker.
assert(cluster.isWorker);
107 lib/dgram.js
View
@@ -19,6 +19,7 @@
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
+var assert = require('assert');
var util = require('util');
var events = require('events');
@@ -29,6 +30,7 @@ var BIND_STATE_BINDING = 1;
var BIND_STATE_BOUND = 2;
// lazily loaded
+var cluster = null;
var dns = null;
var net = null;
@@ -86,6 +88,24 @@ function newHandle(type) {
}
+exports._createSocketHandle = function(address, port, addressType, fd) {
+ // Opening an existing fd is not supported for UDP handles.
+ assert(typeof fd !== 'number' || fd < 0);
+
+ var handle = newHandle(addressType);
+
+ if (port || address) {
+ var r = handle.bind(address, port || 0, 0);
+ if (r == -1) {
+ handle.close();
+ handle = null;
+ }
+ }
+
+ return handle;
+};
+
+
function Socket(type, listener) {
events.EventEmitter.call(this);
@@ -110,41 +130,75 @@ exports.createSocket = function(type, listener) {
};
+function startListening(socket) {
+ socket._handle.onmessage = onMessage;
+ // Todo: handle errors
+ socket._handle.recvStart();
+ socket._receiving = true;
+ socket._bindState = BIND_STATE_BOUND;
+ socket.fd = -42; // compatibility hack
+
+ socket.emit('listening');
+}
+
+
Socket.prototype.bind = function(port, address, callback) {
var self = this;
self._healthCheck();
+ if (this._bindState != BIND_STATE_UNBOUND)
+ throw new Error('Socket is already bound');
+
+ this._bindState = BIND_STATE_BINDING;
+
if (typeof callback === 'function')
self.once('listening', callback);
// resolve address first
self._handle.lookup(address, function(err, ip) {
- self._bindState = BIND_STATE_UNBOUND;
-
- if (!self._handle)
- return; // handle has been closed in the mean time
-
if (err) {
+ self._bindState = BIND_STATE_UNBOUND;
self.emit('error', err);
return;
}
- if (self._handle.bind(ip, port || 0, /*flags=*/ 0)) {
- self.emit('error', errnoException(errno, 'bind'));
- return;
- }
-
- self._handle.onmessage = onMessage;
- self._handle.recvStart();
- self._receiving = true;
- self._bindState = BIND_STATE_BOUND;
- self.fd = -42; // compatibility hack
+ if (!cluster)
+ cluster = require('cluster');
+
+ if (cluster.isWorker) {
+ cluster._getServer(self, ip, port, self.type, -1, function(handle) {
+ if (!self._handle)
+ // handle has been closed in the mean time.
+ return handle.close();
+
+ // Set up the handle that we got from master.
+ handle.lookup = self._handle.lookup;
+ handle.bind = self._handle.bind;
+ handle.send = self._handle.send;
+ handle.owner = self;
+
+ // Replace the existing handle by the handle we got from master.
+ self._handle.close();
+ self._handle = handle;
+
+ startListening(self);
+ });
+
+ } else {
+ if (!self._handle)
+ return; // handle has been closed in the mean time
+
+ if (self._handle.bind(ip, port || 0, /*flags=*/ 0)) {
+ self.emit('error', errnoException(errno, 'bind'));
+ self._bindState = BIND_STATE_UNBOUND;
+ // Todo: close?
+ return;
+ }
- self.emit('listening');
+ startListening(self);
+ }
});
-
- self._bindState = BIND_STATE_BINDING;
};
@@ -189,10 +243,21 @@ Socket.prototype.send = function(buffer,
if (self._bindState == BIND_STATE_UNBOUND)
self.bind(0, null);
+ // If the socket hasn't been bound yet, push the outbound packet onto the
+ // send queue and send after binding is complete.
if (self._bindState != BIND_STATE_BOUND) {
- self.once('listening', function() {
- self.send(buffer, offset, length, port, address, callback);
- });
+ // If the send queue hasn't been initialized yet, do it, and install an
+ // event handler that flushes the bind queue after binding is done.
+ if (!self._sendQueue) {
+ self._sendQueue = [];
+ self.once('listening', function() {
+ // Flush the send queue.
+ for (var i = 0; i < self._sendQueue.length; i++)
+ self.send.apply(self, self._sendQueue[i]);
+ self._sendQueue = undefined;
+ });
+ }
+ self._sendQueue.push([buffer, offset, length, port, address, callback]);
return;
}
2  src/handle_wrap.h
View
@@ -53,6 +53,8 @@ class HandleWrap {
static v8::Handle<v8::Value> Ref(const v8::Arguments& args);
static v8::Handle<v8::Value> Unref(const v8::Arguments& args);
+ inline uv_handle_t* GetHandle() { return handle__; };
+
protected:
HandleWrap(v8::Handle<v8::Object> object, uv_handle_t* handle);
virtual ~HandleWrap();
54 src/stream_wrap.cc
View
@@ -27,6 +27,7 @@
#include "pipe_wrap.h"
#include "tcp_wrap.h"
#include "req_wrap.h"
+#include "udp_wrap.h"
#include "node_counters.h"
#include <stdlib.h> // abort()
@@ -118,7 +119,7 @@ StreamWrap::StreamWrap(Handle<Object> object, uv_stream_t* stream)
void StreamWrap::SetHandle(uv_handle_t* h) {
HandleWrap::SetHandle(h);
- stream_ = (uv_stream_t*)h;
+ stream_ = reinterpret_cast<uv_stream_t*>(h);
stream_->data = this;
}
@@ -173,6 +174,28 @@ uv_buf_t StreamWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size) {
}
+template <class WrapType, class UVType>
+static Local<Object> AcceptHandle(uv_stream_t* pipe) {
+ HandleScope scope;
+ Local<Object> wrap_obj;
+ WrapType* wrap;
+ UVType* handle;
+
+ wrap_obj = WrapType::Instantiate();
+ if (wrap_obj.IsEmpty())
+ return Local<Object>();
+
+ wrap = static_cast<WrapType*>(
+ wrap_obj->GetAlignedPointerFromInternalField(0));
+ handle = wrap->UVHandle();
+
+ if (uv_accept(pipe, reinterpret_cast<uv_stream_t*>(handle)))
+ abort();
+
+ return scope.Close(wrap_obj);
+}
+
+
void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread,
uv_buf_t buf, uv_handle_type pending) {
HandleScope scope;
@@ -212,19 +235,16 @@ void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread,
Local<Object> pending_obj;
if (pending == UV_TCP) {
- pending_obj = TCPWrap::Instantiate();
+ pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(handle);
} else if (pending == UV_NAMED_PIPE) {
- pending_obj = PipeWrap::Instantiate();
+ pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(handle);
+ } else if (pending == UV_UDP) {
+ pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(handle);
} else {
- // We only support sending UV_TCP and UV_NAMED_PIPE right now.
assert(pending == UV_UNKNOWN_HANDLE);
}
if (!pending_obj.IsEmpty()) {
- assert(pending_obj->InternalFieldCount() > 0);
- StreamWrap* pending_wrap = static_cast<StreamWrap*>(
- pending_obj->GetAlignedPointerFromInternalField(0));
- if (uv_accept(handle, pending_wrap->GetStream())) abort();
argv[3] = pending_obj;
argc++;
}
@@ -246,7 +266,7 @@ void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
void StreamWrap::OnRead2(uv_pipe_t* handle, ssize_t nread, uv_buf_t buf,
uv_handle_type pending) {
- OnReadCommon((uv_stream_t*)handle, nread, buf, pending);
+ OnReadCommon(reinterpret_cast<uv_stream_t*>(handle), nread, buf, pending);
}
@@ -404,14 +424,14 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
StreamWrap::AfterWrite);
} else {
- uv_stream_t* send_stream = NULL;
+ uv_handle_t* send_handle = NULL;
if (args[1]->IsObject()) {
- Local<Object> send_stream_obj = args[1]->ToObject();
- assert(send_stream_obj->InternalFieldCount() > 0);
- StreamWrap* send_stream_wrap = static_cast<StreamWrap*>(
- send_stream_obj->GetAlignedPointerFromInternalField(0));
- send_stream = send_stream_wrap->GetStream();
+ Local<Object> send_handle_obj = args[1]->ToObject();
+ assert(send_handle_obj->InternalFieldCount() > 0);
+ HandleWrap* send_handle_wrap = static_cast<HandleWrap*>(
+ send_handle_obj->GetAlignedPointerFromInternalField(0));
+ send_handle = send_handle_wrap->GetHandle();
// Reference StreamWrap instance to prevent it from being garbage
// collected before `AfterWrite` is called.
@@ -419,14 +439,14 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
handle_sym = NODE_PSYMBOL("handle");
}
assert(!req_wrap->object_.IsEmpty());
- req_wrap->object_->Set(handle_sym, send_stream_obj);
+ req_wrap->object_->Set(handle_sym, send_handle_obj);
}
r = uv_write2(&req_wrap->req_,
wrap->stream_,
&buf,
1,
- send_stream,
+ reinterpret_cast<uv_stream_t*>(send_handle),
StreamWrap::AfterWrite);
}
17 src/udp_wrap.cc
View
@@ -40,6 +40,7 @@ typedef ReqWrap<uv_udp_send_t> SendWrap;
// see tcp_wrap.cc
Local<Object> AddressToJS(const sockaddr* addr);
+static Persistent<Function> constructor;
static Persistent<String> buffer_sym;
static Persistent<String> oncomplete_sym;
static Persistent<String> onmessage_sym;
@@ -98,8 +99,9 @@ void UDPWrap::Initialize(Handle<Object> target) {
NODE_SET_PROTOTYPE_METHOD(t, "ref", HandleWrap::Ref);
NODE_SET_PROTOTYPE_METHOD(t, "unref", HandleWrap::Unref);
- target->Set(String::NewSymbol("UDP"),
- Persistent<FunctionTemplate>::New(t)->GetFunction());
+ constructor = Persistent<Function>::New(
+ Persistent<FunctionTemplate>::New(t)->GetFunction());
+ target->Set(String::NewSymbol("UDP"), constructor);
}
@@ -392,6 +394,17 @@ UDPWrap* UDPWrap::Unwrap(Local<Object> obj) {
}
+Local<Object> UDPWrap::Instantiate() {
+ // If this assert fires then Initialize hasn't been called yet.
+ assert(constructor.IsEmpty() == false);
+
+ HandleScope scope;
+ Local<Object> obj = constructor->NewInstance();
+
+ return scope.Close(obj);
+}
+
+
uv_udp_t* UDPWrap::UVHandle() {
return &handle_;
}
1  src/udp_wrap.h
View
@@ -33,6 +33,7 @@ class UDPWrap: public HandleWrap {
static Handle<Value> SetTTL(const Arguments& args);
static UDPWrap* Unwrap(Local<Object> obj);
+ static Local<Object> Instantiate();
uv_udp_t* UVHandle();
private:
115 test/simple/test-cluster-dgram-1.js
View
@@ -0,0 +1,115 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var NUM_WORKERS = 4;
+var PACKETS_PER_WORKER = 10;
+
+var assert = require('assert');
+var cluster = require('cluster');
+var common = require('../common');
+var dgram = require('dgram');
+
+
+if (process.platform === 'win32') {
+ console.warn("dgram clustering is currently not supported on windows.");
+ process.exit(0);
+}
+
+if (cluster.isMaster)
+ master();
+else
+ worker();
+
+
+function master() {
+ var listening = 0;
+
+ // Fork 4 workers.
+ for (var i = 0; i < NUM_WORKERS; i++)
+ cluster.fork();
+
+ // Wait until all workers are listening.
+ cluster.on('listening', function() {
+ if (++listening < NUM_WORKERS)
+ return;
+
+ // Start sending messages.
+ var buf = new Buffer('hello world');
+ var socket = dgram.createSocket('udp4');
+ var sent = 0;
+ doSend();
+
+ function doSend() {
+ socket.send(buf, 0, buf.length, common.PORT, '127.0.0.1', afterSend);
+ }
+
+ function afterSend() {
+ sent++;
+ if (sent < NUM_WORKERS * PACKETS_PER_WORKER) {
+ doSend();
+ } else {
+ console.log('master sent %d packets', sent);
+ socket.close();
+ }
+ }
+ });
+
+ // Set up event handlers for every worker. Each worker sends a message when
+ // it has received the expected number of packets. After that it disconnects.
+ for (var key in cluster.workers) {
+ if (cluster.workers.hasOwnProperty(key))
+ setupWorker(cluster.workers[key]);
+ }
+
+ function setupWorker(worker) {
+ var received = 0;
+
+ worker.on('message', function(msg) {
+ received = msg.received;
+ console.log('worker %d received %d packets', worker.id, received);
+ });
+
+ worker.on('disconnect', function() {
+ assert(received === PACKETS_PER_WORKER);
+ console.log('worker %d disconnected', worker.id);
+ });
+ }
+}
+
+
+function worker() {
+ var received = 0;
+
+ // Create udp socket and start listening.
+ var socket = dgram.createSocket('udp4');
+
+ socket.on('message', function(data, info) {
+ received++;
+
+ // Every 10 messages, notify the master.
+ if (received == PACKETS_PER_WORKER) {
+ process.send({received: received});
+ process.disconnect();
+ }
+ });
+
+ socket.bind(common.PORT);
+}
81 test/simple/test-cluster-dgram-2.js
View
@@ -0,0 +1,81 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following condonitions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var NUM_WORKERS = 4;
+var PACKETS_PER_WORKER = 10;
+
+var assert = require('assert');
+var cluster = require('cluster');
+var common = require('../common');
+var dgram = require('dgram');
+
+
+if (process.platform === 'win32') {
+ console.warn("dgram clustering is currently not supported on windows.");
+ process.exit(0);
+}
+
+if (cluster.isMaster)
+ master();
+else
+ worker();
+
+
+function master() {
+ var i;
+ var received = 0;
+
+ // Start listening on a socket.
+ var socket = dgram.createSocket('udp4');
+ socket.bind(common.PORT);
+
+ // Disconnect workers when the expected number of messages have been
+ // received.
+ socket.on('message', function(data, info) {
+ received++;
+
+ if (received == PACKETS_PER_WORKER * NUM_WORKERS) {
+ console.log('master received %d packets', received);
+
+ // Close the socket.
+ socket.close();
+
+ // Disconnect all workers.
+ cluster.disconnect();
+ }
+ });
+
+ // Fork workers.
+ for (var i = 0; i < NUM_WORKERS; i++)
+ cluster.fork();
+}
+
+
+function worker() {
+ // Create udp socket and send packets to master.
+ var socket = dgram.createSocket('udp4');
+ var buf = new Buffer('hello world');
+
+ for (var i = 0; i < PACKETS_PER_WORKER; i++)
+ socket.send(buf, 0, buf.length, common.PORT, '127.0.0.1');
+
+ console.log('worker %d sent %d packets', cluster.worker.id, PACKETS_PER_WORKER);
+}
Something went wrong with that request. Please try again.