Permalink
Browse files

Refactor cluster code to match with new node.js Cluster API

  • Loading branch information...
1 parent 3bc5d7a commit 4cea498f1eac6113fe8973e66b4b706d1a9dbd33 @rs committed Mar 24, 2014
Showing with 48 additions and 27 deletions.
  1. +2 −1 audience-meter.js
  2. +46 −26 lib/master.js
View
3 audience-meter.js
@@ -56,7 +56,8 @@ if (cluster.isMaster)
require('./lib/master').Master
({
workers: options.workers,
- audience: audience
+ audience: audience,
+ log: logger
});
if (options.notificationPort)
View
72 lib/master.js
@@ -11,41 +11,60 @@ function Master(options)
options.workers = require('os').cpus().length;
}
- var workers = [];
+ var eachWorker = function(callback)
+ {
+ for (var id in cluster.workers)
+ {
+ callback(cluster.workers[id]);
+ }
+ };
- function workerMessageHandler(msg)
+ cluster.on('online', function(worker)
{
- switch (msg.cmd)
+ worker.on('message', function(msg)
{
- case 'join':
- options.audience.join(msg.namespace);
- break;
- case 'leave':
- options.audience.leave(msg.namespace);
- break;
+ switch (msg.cmd)
+ {
+ case 'join':
+ options.audience.join(msg.namespace);
+ break;
+ case 'leave':
+ options.audience.leave(msg.namespace);
+ break;
- case 'exclude':
- for (var i = 0, t = workers.length; i < t; i++)
- {
- var worker = workers[i];
- if (this == worker) continue;
- worker.send(msg);
- }
- // TODO: instruct other peers of same UDP multicast segment if cluster is activated
- break;
+ case 'exclude':
+ eachWorker(function(otherWorker)
+ {
+ if (worker !== otherWorker)
+ {
+ otherWorker.send(msg);
+ }
+ });
+ // TODO: instruct other peers of same UDP multicast segment if cluster is activated
+ break;
+ }
+ });
+ });
+
+ cluster.on('exit', function(worker, code, signal)
+ {
+ if (worker.suicide === true)
+ {
+ return;
}
- }
+
+ options.log('warn', 'Respawn worker');
+ cluster.fork();
+ });
for (var i = 0; i < options.workers; i++)
{
- var worker = cluster.fork();
- workers.push(worker);
- worker.on('message', workerMessageHandler);
+ cluster.fork();
}
options.audience.on('notify', function(namespace, msg)
{
- workers.forEach(function(worker)
+ eachWorker(function(worker)
{
if (typeof msg == 'undefined')
{
@@ -58,10 +77,11 @@ function Master(options)
process.on('SIGTERM', function()
{
- workers.forEach(function(worker)
+ eachWorker(function(worker)
{
- worker.destroy();
+ options.log('debug', 'Disconnect worker ' + worker.id);
+ worker.kill();
});
process.exit();
});
-}
+}

0 comments on commit 4cea498

Please sign in to comment.