Skip to content

Commit

Permalink
cleaned things up a bit, added error for when all workers are dead
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeB committed Dec 16, 2013
1 parent ad763a3 commit 42cb10c
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 68 deletions.
101 changes: 51 additions & 50 deletions index.js
@@ -1,5 +1,4 @@
var _ = require('lodash');
var opt = require('optimist')
var util = require('util');
var cluster = require('cluster');
var mixdownMaster = require('./lib/master.js');
Expand All @@ -8,34 +7,12 @@ var path = require('path');
var packageJSON = require(path.join(process.cwd(), '/package.json'));

// Export the factory
exports.create = function(mixdown, options) {
module.exports.create = function(mixdown, options) {
var main = new Main(mixdown, options);

// placeholder for options for starting the server
var argv = opt
.alias('h', 'help')
.alias('?', 'help')
.describe('help', 'Display help')
.usage('Starts ' + packageJSON.name + ' framework for serving multiple sites.\n\nVersion: ' + packageJSON.version + '\nAuthor: ' + packageJSON.author)
.alias('v', 'version')
.describe('version', 'Display Mixdown application version.')
.argv;

if(argv.help) {
opt.showHelp();
process.exit();
}

if(argv.version) {
console.log(packageJSON.version);
process.exit();
}

return main;
};

var Main = function(mixdown, options) {

// instance attrs
this.server = null;
this.workers = {}; // if this is a master, then we'll load this with child processes.
Expand All @@ -46,23 +23,28 @@ var Main = function(mixdown, options) {

// passed configs.
this.mixdown = mixdown;

this.options = _.defaults(options || {}, {
cluster: {
on: false
}
});
};

var logServerInfo = function(message) {
var hmap = _.map(this.mixdown.apps, function(app) { return _.pick(app, 'vhosts', 'id'); });
logger.info(message || 'Server Information. ', this.socket.address(), hmap);
var logServerInfo = function(server,message) {

var hmap = _.map(server.mixdown.apps, function(app){
return _.pick(app, 'vhosts', 'id');
});

logger.info(message || 'Server Information. ', server.server.address(), hmap);
};

Main.prototype.createMaster = function() {
var self = this;

// start server. Sets up server, port, and starts the app.
self.master = new mixdownMaster(self.mixdown, self.options);
self.master = new mixdownMaster(self.workers, self.options);

self.master.start(function(err, data) {
if (err) {
Expand All @@ -72,7 +54,7 @@ Main.prototype.createMaster = function() {
else {
self.socket = data.socket;
self.server = data.server;
logServerInfo.call(self, 'Server started successfully.');
logServerInfo(self, 'Server started successfully.');
typeof(callback) === 'function' ? callback(err, self) : null;
}
});
Expand All @@ -86,25 +68,36 @@ Main.prototype.start = function(callback) {
var self = this;
var mixdown = this.mixdown;


// this reload listener just logs the reload info.
mixdown.on('reload', logServerInfo.bind(this, 'Mixdown reloaded. '));
mixdown.on('reload', function() {
logServerInfo(self, 'Mixdown reloaded. ');
});

// Start cluster.
var clusterConfig = mixdown.main.options.cluster || {};

if(clusterConfig.on){
logger.info("Using cluster");

var numCPUs = clusterConfig.workers || require('os').cpus().length;
var numChidrenToSpawn = clusterConfig.workers || require('os').cpus().length;

if(cluster.isMaster){
logger.info("Starting master with " + numCPUs + " CPUs");
logger.info("Using cluster");
//cluser is on, and this is the master!
logger.info("Starting master with " + numChidrenToSpawn + " workers");

// spawn n workers
for (var i = 0; i < numCPUs; i++) {
var child = cluster.fork();
self.workers[child.process.pid] = child;
for (var i = 0; i < numChidrenToSpawn; i++) {
(function(){
var child = cluster.fork();

child.once('message',function(message){
if(message == 'ready'){

self.workers[child.process.pid] = child;
logger.debug('initial child ready');
}
});

})();
}

// Add application kill signals.
Expand All @@ -113,13 +106,13 @@ Main.prototype.start = function(callback) {

process.on(sig, function() {

_.each(self.workers, function(child) {
_.each(cluster.workers, function(child) {
child.destroy(); // send suicide signal
});

// create function to check self all workers are dead.
var checkExit = function() {
if (_.keys(self.workers).length == 0) {
if (_.keys(cluster.workers).length == 0) {
process.exit();
}
else {
Expand All @@ -129,32 +122,40 @@ Main.prototype.start = function(callback) {

// poll the master and exit when children are all gone.
setImmediate(checkExit);

});
});

cluster.on('disconnect',function(worker) {
delete self.workers[worker.process.pid];
logger.info('worker '+worker.process.pid+' disconnected');
});

cluster.on('exit', function(worker) {
logger.error('Worker exited unexpectedly. Spawning new worker', worker);

// remove the child from the tracked running list..
delete self.workers[worker.process.pid];

// if it purposely destroyed itself, then do no re-spawn.
// Otherwise, it was killed for some external reason and should create a new child in the pool.
if (!worker.suicide) {

if(!worker.suicide){
logger.error('Worker exited unexpectedly. Spawning new worker');
// spawn new child
var child = cluster.fork();
self.workers[child.process.pid] = child;
}

child.on('message',function(message){
if(message == 'ready'){
logger.debug('respawned child ready id: ' + child.process.pid);
self.workers[child.process.pid] = child;
}
});
}
});

self.createMaster();

} else {
logger.info("Worker ID", process.pid);
}
else {
//cluser is on, and this is a worker!
logger.info("new worker Worker id: "+process.pid);

try {
self.worker = new mixdownWorker(mixdown);
Expand All @@ -163,9 +164,9 @@ Main.prototype.start = function(callback) {
typeof(callback) === 'function' ? callback(e, self) : null;
}
}

}
else {
//cluster isn't running so create a master server.
self.createMaster();
}
};
Expand Down
36 changes: 24 additions & 12 deletions lib/master.js
Expand Up @@ -5,55 +5,67 @@ var util = require('util');
var assert = require('assert');
var Worker = require('./worker.js');

var Master = function(mixdown, options) {
this.mixdown = mixdown;
var Master = function(workers, options) {

this.options = options || {};
this.currentIndex = 0;
this.server = null;
this.socket = null;
this.workers = workers;
};

util.inherits(Master, Worker);

Master.prototype.getWorker = function() {
var pids = Object.keys(this.mixdown.main.workers);
var pids = Object.keys(this.workers);

if (this.currentIndex >= pids.length) {
this.currentIndex = 0;
}

logger.debug(pids);
logger.debug(this.currentIndex);

return this.mixdown.main.workers[pids[this.currentIndex++]];
return this.workers[pids[this.currentIndex++]];
};

Master.prototype.distribute = function(socket) {
this.handoff(this.getWorker(), socket);
var worker = this.getWorker()
var self = this;

if(!worker){
//TODO::make a correct http error
socket.end('HTTP 1.1 500 Internal Server Error\nContent-Length:0');
}
else{
self.handoff(worker, socket);
}
};

Master.prototype.handoff = function(worker, socket) {
var self = this;
worker.send('socket', socket);
};

// create the server.
// create the server.
Master.prototype.start = function(callback) {

var self = this;
var clusterEnabled = this.mixdown.main.options.cluster.on;
var clusterEnabled = this.options.cluster.on;

// Set port from server config.
var listen = this.options.listen || {};

if (listen.type !== 'unix' && !isNaN(process.env.MIXDOWN_PORT)) {
listen.port = process.env.MIXDOWN_PORT;
}

var lp = listen.type === 'unix' ? listen.path : listen.port;

var create = function() {

logger.info('Starting master server');

self.server = net.createServer(function() { return; });
self.socket = self.server.listen(lp, function(err) {

self.server.listen(lp, function(err) {
callback(err, _.pick(self, 'server', 'socket'));
});

Expand All @@ -65,6 +77,7 @@ Master.prototype.start = function(callback) {
// if cluster is not used, then handle in master.
self.server.on('connection', function(socket) {
if (clusterEnabled) {
socket.pause();
self.distribute(socket);
}
else {
Expand All @@ -82,5 +95,4 @@ Master.prototype.start = function(callback) {
}
};


module.exports = Master;
13 changes: 7 additions & 6 deletions lib/worker.js
Expand Up @@ -14,6 +14,9 @@ var Worker = function(mixdown) {

// check if the external Config is enabled and starting listening if so.
var self = this;

//this is a bit of a problem, it is an async function in the constructor
//also i don't think it's necessary as mixdown will do this?
mixdown.getExternalConfig(function(err, externalConfig) {

if (err) {
Expand All @@ -36,11 +39,11 @@ var Worker = function(mixdown) {
services.emit('reload', serverConfig);
}
});

});

}
});

process.send('ready');
};

util.inherits(Worker, events.EventEmitter);
Expand Down Expand Up @@ -86,14 +89,13 @@ Worker.prototype.reload = function() {
this.vhosts = vhosts;
};



Worker.prototype.handleRequest = function(req, res) {
var app = null;
var host = null;

// for simple case, listen on all hosts.
var appkeys = Object.keys(this.mixdown.apps);

if (appkeys.length === 1) {
app = this.mixdown.apps[appkeys[0]];
}
Expand All @@ -119,5 +121,4 @@ Worker.prototype.handleMessage = function(message, socket) {
}
};


module.exports = Worker;
module.exports = Worker;

0 comments on commit 42cb10c

Please sign in to comment.