Permalink
Browse files

cluster: allow shared reused dgram sockets

Allow listening on reused dgram ports in cluster workers.

Fix: nodejs/node-v0.x-archive#9261
PR-URL: #2548
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
  • Loading branch information...
indutny authored and rvagg committed Aug 25, 2015
1 parent a7596d7 commit c60857a81ad9f0a6c66cda3ba5a8bbaa94f5b63b
Showing with 87 additions and 23 deletions.
  1. +25 −14 lib/cluster.js
  2. +15 −8 lib/dgram.js
  3. +7 −1 lib/net.js
  4. +40 −0 test/parallel/test-cluster-dgram-reuse.js
View
@@ -57,7 +57,7 @@ Worker.prototype.isConnected = function isConnected() {
// Master/worker specific methods are defined in the *Init() functions.
-function SharedHandle(key, address, port, addressType, backlog, fd) {
+function SharedHandle(key, address, port, addressType, backlog, fd, flags) {
this.key = key;
this.workers = [];
this.handle = null;
@@ -66,7 +66,7 @@ function SharedHandle(key, address, port, addressType, backlog, fd) {
// FIXME(bnoordhuis) Polymorphic return type for lack of a better solution.
var rval;
if (addressType === 'udp4' || addressType === 'udp6')
- rval = dgram._createSocketHandle(address, port, addressType, fd);
+ rval = dgram._createSocketHandle(address, port, addressType, fd, flags);
else
rval = net._createServerHandle(address, port, addressType, fd);
@@ -438,7 +438,8 @@ function masterInit() {
var args = [message.address,
message.port,
message.addressType,
- message.fd];
+ message.fd,
+ message.index];
var key = args.join(':');
var handle = handles[key];
if (handle === undefined) {
@@ -456,7 +457,8 @@ function masterInit() {
message.port,
message.addressType,
message.backlog,
- message.fd);
+ message.fd,
+ message.flags);
}
if (!handle.data) handle.data = message.data;
@@ -485,7 +487,7 @@ function masterInit() {
cluster.emit('listening', worker, info);
}
- // Round-robin only. Server in worker is closing, remove from list.
+ // Server in worker is closing, remove from list.
function close(worker, message) {
var key = message.key;
var handle = handles[key];
@@ -500,6 +502,7 @@ function masterInit() {
function workerInit() {
var handles = {};
+ var indexes = {};
// Called from src/node.js
cluster._setupWorker = function() {
@@ -528,15 +531,22 @@ function workerInit() {
};
// obj is a net#Server or a dgram#Socket object.
- cluster._getServer = function(obj, address, port, addressType, fd, cb) {
- var message = {
- addressType: addressType,
- address: address,
- port: port,
+ cluster._getServer = function(obj, options, cb) {
+ const key = [ options.address,
+ options.port,
+ options.addressType,
+ options.fd ].join(':');
+ if (indexes[key] === undefined)
+ indexes[key] = 0;
+ else
+ indexes[key]++;
+
+ const message = util._extend({
act: 'queryServer',
- fd: fd,
+ index: indexes[key],
data: null
- };
+ }, options);
+
// Set custom data on handle (i.e. tls tickets key)
if (obj._getServerData) message.data = obj._getServerData();
send(message, function(reply, handle) {
@@ -549,9 +559,9 @@ function workerInit() {
});
obj.once('listening', function() {
cluster.worker.state = 'listening';
- var address = obj.address();
+ const address = obj.address();
message.act = 'listening';
- message.port = address && address.port || port;
+ message.port = address && address.port || options.port;
send(message);
});
};
@@ -563,6 +573,7 @@ function workerInit() {
// closed. Avoids resource leaks when the handle is short-lived.
var close = handle.close;
handle.close = function() {
+ send({ act: 'close', key: key });
delete handles[key];
return close.apply(this, arguments);
};
View
@@ -60,14 +60,14 @@ function newHandle(type) {
}
-exports._createSocketHandle = function(address, port, addressType, fd) {
+exports._createSocketHandle = function(address, port, addressType, fd, flags) {
// Opening an existing fd is not supported for UDP handles.
assert(typeof fd !== 'number' || fd < 0);
var handle = newHandle(addressType);
if (port || address) {
- var err = handle.bind(address, port || 0, 0);
+ var err = handle.bind(address, port || 0, flags);
if (err) {
handle.close();
return err;
@@ -176,8 +176,12 @@ Socket.prototype.bind = function(port /*, address, callback*/) {
if (!cluster)
cluster = require('cluster');
+ var flags = 0;
+ if (self._reuseAddr)
+ flags |= constants.UV_UDP_REUSEADDR;
+
if (cluster.isWorker && !exclusive) {
- cluster._getServer(self, ip, port, self.type, -1, function(err, handle) {
+ function onHandle(err, handle) {
if (err) {
var ex = exceptionWithHostPort(err, 'bind', ip, port);
self.emit('error', ex);
@@ -191,16 +195,19 @@ Socket.prototype.bind = function(port /*, address, callback*/) {
replaceHandle(self, handle);
startListening(self);
- });
+ }
+ cluster._getServer(self, {
+ address: ip,
+ port: port,
+ addressType: self.type,
+ fd: -1,
+ flags: flags
+ }, onHandle);
} else {
if (!self._handle)
return; // handle has been closed in the mean time
- var flags = 0;
- if (self._reuseAddr)
- flags |= constants.UV_UDP_REUSEADDR;
-
var err = self._handle.bind(ip, port || 0, flags);
if (err) {
var ex = exceptionWithHostPort(err, 'bind', ip, port);
View
@@ -1268,7 +1268,13 @@ function listen(self, address, port, addressType, backlog, fd, exclusive) {
return;
}
- cluster._getServer(self, address, port, addressType, fd, cb);
+ cluster._getServer(self, {
+ address: address,
+ port: port,
+ addressType: addressType,
+ fd: fd,
+ flags: 0
+ }, cb);
function cb(err, handle) {
// EADDRINUSE may not be reported until we call listen(). To complicate
@@ -0,0 +1,40 @@
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+const cluster = require('cluster');
+const dgram = require('dgram');
+
+if (common.isWindows) {
+ console.log('1..0 # Skipped: dgram clustering is currently not supported ' +
+ 'on windows.');
+ return;
+}
+
+if (cluster.isMaster) {
+ cluster.fork().on('exit', function(code) {
+ assert.equal(code, 0);
+ });
+ return;
+}
+
+const sockets = [];
+function next() {
+ sockets.push(this);
+ if (sockets.length !== 2)
+ return;
+
+ // Work around health check issue
+ process.nextTick(function() {
+ for (var i = 0; i < sockets.length; i++)
+ sockets[i].close(close);
+ });
+}
+
+var waiting = 2;
+function close() {
+ if (--waiting === 0)
+ cluster.worker.disconnect();
+}
+
+for (var i = 0; i < 2; i++)
+ dgram.createSocket({ type: 'udp4', reuseAddr: true }).bind(common.PORT, next);

0 comments on commit c60857a

Please sign in to comment.