Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Commit

Permalink
added disconnect method and event
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreasMadsen committed Nov 14, 2011
1 parent 29647e8 commit 25f5793
Showing 1 changed file with 134 additions and 6 deletions.
140 changes: 134 additions & 6 deletions lib/cluster.js
Expand Up @@ -46,12 +46,13 @@ if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) {
// Used in the master:
var masterStarted = false;
var ids = 0;
var servers = {};
var serverHandlers = {};
var workerFilename;
var workerArgs;
var workerTotal;

// Used in the worker:
var serverLisenters = {};
var queryIds = 0;
var queryCallbacks = {};

Expand Down Expand Up @@ -91,6 +92,7 @@ cluster.setupMaster = function(options) {
workerArgs = options.args || process.argv.slice(2);
workerTotal = options.workers || os.cpus().length;

/*
//This is really bad
process.on('uncaughtException', function(e) {
// Quickly try to kill all the workers.
Expand All @@ -105,7 +107,7 @@ cluster.setupMaster = function(options) {
console.error('Please report this bug.');
process.exit(1);
});

*/
};

// Check if a message is internal only
Expand Down Expand Up @@ -161,6 +163,8 @@ function handleMessage(message, handle, worker) {
}
}
}

//Messages to the master will be handled using this methods
if (cluster.isMaster) {

//Handle online messages from workers
Expand All @@ -183,10 +187,10 @@ if (cluster.isMaster) {
var key = args.join(':');
var handler;

if (servers.hasOwnProperty(key)) {
handler = servers[key];
if (serverHandlers.hasOwnProperty(key)) {
handler = serverHandlers[key];
} else {
handler = servers[key] = net._createServerHandle.apply(net, args);
handler = serverHandlers[key] = net._createServerHandle.apply(net, args);
}

//echo callback id, with the fd handler associated with it
Expand Down Expand Up @@ -219,6 +223,49 @@ if (cluster.isMaster) {
worker.send(internalMessage({}, message));
}
};

//Handle disconnect messages from workers
messageHandingObject.disconnect = function(message, worker) {

if (message.state === "setState") {
worker.state = "disconnect";
worker.suicide = true;

//Send echo if requested
if (requireEcho(message)) {
worker.send(internalMessage({}, message));
}
}

else if (message.state === "done") {
//Send echo if requested before closing channel
if (requireEcho(message)) {
worker.send(internalMessage({}, message));
}

worker.process._channel.close();
worker.emit("disconnect", worker);
cluster.emit("disconnect", worker);
}

};

}

//Messages to a worker will be handled using this methods
else if (cluster.isWorker) {

//Handle disconnect messages from master
messageHandingObject.disconnect = function(message, worker) {
//Run disconnect
worker.disconnect();

//Send echo if requested
if (requireEcho(message)) {
worker.send(internalMessage({}, message));
}
};

}

// Create a worker call there works both for master and worker
Expand Down Expand Up @@ -292,6 +339,25 @@ function Worker(env) {
self.emit('exit', self);
cluster.emit('death', self);
});

//Handle disconnect
self.on('disconnect', function () {

//set state to disconnect
self.disconnect = 'disconnect';

//Make suicide a boolean
self.suicide = !!self.suicide;

//Remove from workers in the master
if (cluster.isMaster) {
delete cluster.workers[self.workerID];
}

//Emit exit and death
self.emit('exit', self);
cluster.emit('death', self);
});

}
util.inherits(Worker, EventEmitter);
Expand Down Expand Up @@ -338,6 +404,64 @@ Worker.prototype.kill = function() {
}
};

// Kill the worker without restarting
Worker.prototype.disconnect = function() {
var self = this;

if (cluster.isMaster) {
//Inform worker that is should disconnect from the master
this.send(internalMessage({cmd: 'disconnect'}));
} else {
//Inform master that about state and suicide and make it emit disconnect
this.send(internalMessage({cmd: 'disconnect', state: "setState"}), function () {
var item;

//Predefine closeState
var closeState = {master: false};
for (item in serverLisenters) {
if (serverLisenters.hasOwnProperty(item)) {
closeState[item] = false;
}
}

//Check closeState
var setState = function (name) {
return function () {
//Set State
closeState[name] = true;

//Check all closeStates
var state;
for (state in closeState) {
if (closeState.hasOwnProperty(state) && closeState === false) {
return undefined;
}
}

//Emit a disconnect if all closeState are true
self.emit('disconnect', self);
};
};

//Close TCP connections
for (item in serverLisenters) {
if (serverLisenters.hasOwnProperty(item)) {

//Close TCP connection and set closeState when done
serverLisenters[item].once('end', setState(item));
serverLisenters[item].close();
}
}

//Make master emit a disconnect event
self.send(internalMessage({cmd: 'disconnect', state: "done"}), function () {
//Close connection to the master
setState('master')();
});
});
}
};

// Fork a new worker
cluster.fork = function(env) {
// This can only be called from the master.
Expand Down Expand Up @@ -402,7 +526,11 @@ cluster._setupWorker = function() {
cluster._getServer = function(tcpSelf, address, port, addressType, cb) {
// This can only be called from a worker.
assert(cluster.isWorker);


//Store tcp instance for later use
var key = [address, port, addressType].join(":");
serverLisenters[key] = tcpSelf;

//Send a listening message to the master
tcpSelf.once('listening', function() {
cluster.worker.state = 'listening';
Expand Down

0 comments on commit 25f5793

Please sign in to comment.