Browse files

cluster: add graceful disconnect support

This patch add a worker.disconnect() method there will stop the worker from accepting
new connections and then stop the IPC. This allow the worker to die graceful.
When the IPC has been disconnected a 'disconnect' event will emit.

The patch also add a cluster.disconnect() method, this will call worker.disconnect() on
all connected workers. When the workers are disconneted it will then close all server
handlers. This allow the cluster itself to self terminate in a graceful way.
  • Loading branch information...
1 parent ab32e9e commit d927fbc9ab01b8120d71dda0519c2ed2e82b030a @AndreasMadsen AndreasMadsen committed with isaacs Mar 10, 2012
Showing with 426 additions and 15 deletions.
  1. +87 −3 doc/api/cluster.markdown
  2. +107 −12 lib/cluster.js
  3. +122 −0 test/simple/test-cluster-disconnect.js
  4. +110 −0 test/simple/test-cluster-worker-disconnect.js
View
90 doc/api/cluster.markdown
@@ -118,6 +118,21 @@ where the 'listening' event is emitted.
console.log("We are now connected");
});
+## Event: 'disconnect'
+
+* `worker` {Worker object}
+
+When a workers IPC channel has disconnected this event is emitted. This will happen
+when the worker die, usually after calling `.destroy()`.
+
+But also when calling `.disconnect()`, in this case it is possible there is delay
+between the `disconnect` and `death` and the event can be used to detect if the
+process is stuck in a cleanup or if there are long living connection.
+
+ cluster.on('disconnect', function(worker) {
+ console.log('The worker #' + worker.uniqueID + ' has disconnected');
+ });
+
## Event: 'death'
* `worker` {Worker object}
@@ -179,6 +194,16 @@ Spawn a new worker process. This can only be called from the master process.
All settings set by the `.setupMaster` is stored in this settings object.
This object is not supposed to be change or set manually.
+## cluster.disconnect([callback])
+
+* `callback` {Function} called when all workers are disconnected and handlers are closed
+
+When calling this method all workers will commit a graceful suicide. When they are
+disconnected all internal handlers will be closed, allowing the master process to
+die graceful if no other event is waiting.
+
+The method takes an optional callback argument there will be called when finished.
+
## cluster.workers
* {Object}
@@ -232,9 +257,8 @@ See: [Child Process module](child_process.html)
* {Boolean}
-This property is a boolean. It is set when a worker dies, until then it is
-`undefined`. It is true if the worker was killed using the `.destroy()`
-method, and false otherwise.
+This property is a boolean. It is set when a worker dies after calling `.destroy()`
+or immediately after calling the `.disconnect()` method. Until then it is `undefined`.
### worker.send(message, [sendHandle])
@@ -273,6 +297,55 @@ a suicide boolean is set to true.
// destroy worker
worker.destroy();
+
+## Worker.disconnect()
+
+When calling this function the worker will no longer accept new connections, but
+they will be handled by any other listening worker. Existing connection will be
+allowed to exit as usual. When no more connections exist, the IPC channel to the worker
+will close allowing it to die graceful. When the IPC channel is closed the `disconnect`
+event will emit, this is then followed by the `death` event, there is emitted when
+the worker finally die.
+
+Because there might be long living connections, it is useful to implement a timeout.
+This example ask the worker to disconnect and after 2 seconds it will destroy the
+server. An alternative wound be to execute `worker.destroy()` after 2 seconds, but
+that would normally not allow the worker to do any cleanup if needed.
+
+ if (cluster.isMaster) {
+ var worker = cluser.fork();
+ var timeout;
+
+ worker.on('listening', function () {
+ worker.disconnect();
+ timeout = setTimeout(function () {
+ worker.send('force kill');
+ }, 2000);
+ });
+
+ worker.on('disconnect', function () {
+ clearTimeout(timeout);
+ });
+
+ } else if (cluster.isWorker) {
+ var net = require('net');
+ var server = net.createServer(function (socket) {
+ // connection never end
+ });
+
+ server.listen(8000);
+
+ server.on('close', function () {
+ // cleanup
+ });
+
+ process.on('message', function (msg) {
+ if (msg === 'force kill') {
+ server.destroy();
+ }
+ });
+ }
+
### Event: 'message'
* `message` {Object}
@@ -342,6 +415,17 @@ on the specified worker.
// Worker is listening
};
+## Event: 'disconnect'
+
+* `worker` {Worker object}
+
+Same as the `cluster.on('disconnect')` event, but emits only when the state change
+on the specified worker.
+
+ cluster.fork().on('disconnect', function (worker) {
+ // Worker has disconnected
+ };
+
## Event: 'death'
* `worker` {Worker object}
View
119 lib/cluster.js
@@ -77,6 +77,19 @@ function eachWorker(cb) {
}
}
+// Extremely simple progress tracker
+function ProgressTracker(missing, callback) {
+ this.missing = missing;
+ this.callback = callback;
+}
+ProgressTracker.prototype.done = function() {
+ this.missing -= 1;
+ this.check();
+};
+ProgressTracker.prototype.check = function() {
+ if (this.missing === 0) this.callback();
+};
+
cluster.setupMaster = function(options) {
// This can only be called from the master.
assert(cluster.isMaster);
@@ -239,7 +252,10 @@ if (cluster.isMaster) {
// Messages to a worker will be handled using this methods
else if (cluster.isWorker) {
- // TODO: the disconnect step will use this
+ // Handle worker.disconnect from master
+ messageHandingObject.disconnect = function(message, worker) {
+ worker.disconnect();
+ };
}
function toDecInt(value) {
@@ -293,9 +309,11 @@ function Worker(customEnv) {
});
}
- // handle internalMessage and exit event
+ // handle internalMessage, exit and disconnect event
this.process.on('internalMessage', handleMessage.bind(null, this));
this.process.on('exit', prepareDeath.bind(null, this, 'dead', 'death'));
+ this.process.on('disconnect',
+ prepareDeath.bind(null, this, 'disconnected', 'disconnect'));
// relay message and error
this.process.on('message', this.emit.bind(this, 'message'));
@@ -356,14 +374,6 @@ Worker.prototype.send = function() {
this.process.send.apply(this.process, arguments);
};
-
-function closeWorkerChannel(worker, callback) {
- //Apparently the .close method is async, but do not have a callback
- worker.process._channel.close();
- worker.process._channel = null;
- process.nextTick(callback);
-}
-
// Kill the worker without restarting
Worker.prototype.destroy = function() {
var self = this;
@@ -373,9 +383,14 @@ Worker.prototype.destroy = function() {
if (cluster.isMaster) {
// Disconnect IPC channel
// this way the worker won't need to propagate suicide state to master
- closeWorkerChannel(this, function() {
+ if (self.process.connected) {
+ self.process.once('disconnect', function() {
+ self.process.kill();
+ });
+ self.process.disconnect();
+ } else {
self.process.kill();
- });
+ }
} else {
// Channel is open
@@ -403,6 +418,59 @@ Worker.prototype.destroy = function() {
}
};
+// The .disconnect function will close all server and then disconnect
+// the IPC channel.
+if (cluster.isMaster) {
+ // Used in master
+ Worker.prototype.disconnect = function() {
+ this.suicide = true;
+
+ sendInternalMessage(this, {cmd: 'disconnect'});
+ };
+
+} else {
+ // Used in workers
+ Worker.prototype.disconnect = function() {
+ var self = this;
+
+ this.suicide = true;
+
+ // keep track of open servers
+ var servers = Object.keys(serverLisenters).length;
+ var progress = new ProgressTracker(servers, function() {
+ // there are no more servers open so we will close the IPC channel.
+ // Closeing the IPC channel will emit emit a disconnect event
+ // in both master and worker on the process object.
+ // This event will be handled by prepearDeath.
+ self.process.disconnect();
+ });
+
+ // depending on where this function was called from (master or worker)
+ // the suicide state has allready been set.
+ // But it dosn't really matter if we set it again.
+ sendInternalMessage(this, {cmd: 'suicide'}, function() {
+ // in case there are no servers
+ progress.check();
+
+ // closeing all servers graceful
+ var server;
+ for (var key in serverLisenters) {
+ server = serverLisenters[key];
+
+ // in case the server is closed we wont close it again
+ if (server._handle === null) {
+ progress.done();
+ continue;
+ }
+
+ server.on('close', progress.done.bind(progress));
+ server.close();
+ }
+ });
+
+ };
+}
+
// Fork a new worker
cluster.fork = function(env) {
// This can only be called from the master.
@@ -414,6 +482,33 @@ cluster.fork = function(env) {
return (new cluster.Worker(env));
};
+// execute .disconnect on all workers and close handlers when done
+cluster.disconnect = function(callback) {
+ // This can only be called from the master.
+ assert(cluster.isMaster);
+
+ // Close all TCP handlers when all workers are disconnected
+ var workers = Object.keys(cluster.workers).length;
+ var progress = new ProgressTracker(workers, function() {
+ for (var key in serverHandlers) {
+ serverHandlers[key].close();
+ delete serverHandlers[key];
+ }
+
+ // call callback when done
+ if (callback) callback();
+ });
+
+ // begin disconnecting all workers
+ eachWorker(function(worker) {
+ worker.once('disconnect', progress.done.bind(progress));
+ worker.disconnect();
+ });
+
+ // in case there wasn't any workers
+ progress.check();
+};
+
// Sync way to quickly kill all cluster workers
// However the workers may not die instantly
function quickDestroyCluster() {
View
122 test/simple/test-cluster-disconnect.js
@@ -0,0 +1,122 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+
+var common = require('../common');
+var assert = require('assert');
+var cluster = require('cluster');
+var net = require('net');
+
+if (cluster.isWorker) {
+ net.createServer(function(socket) {
+ socket.end('echo');
+ }).listen(common.PORT, '127.0.0.1');
+
+ net.createServer(function(socket) {
+ socket.end('echo');
+ }).listen(common.PORT + 1, '127.0.0.1');
+
+} else if (cluster.isMaster) {
+
+ // test a single TCP server
+ var testConnection = function(port, cb) {
+ var socket = net.connect(port, '127.0.0.1', function() {
+ // buffer result
+ var result = '';
+ socket.on('data', function(chunk) { result += chunk; });
+
+ // check result
+ socket.on('end', function() {
+ cb(result === 'echo');
+ });
+ });
+ };
+
+ // test both servers created in the cluster
+ var testCluster = function(cb) {
+ var servers = 2;
+ var done = 0;
+
+ for (var i = 0, l = servers; i < l; i++) {
+ testConnection(common.PORT + i, function(sucess) {
+ assert.ok(sucess);
+ done += 1;
+ if (done === servers) {
+ cb();
+ }
+ });
+ }
+ };
+
+ // start two workers and execute callback when both is listening
+ var startCluster = function(cb) {
+ var workers = 2;
+ var online = 0;
+
+ for (var i = 0, l = workers; i < l; i++) {
+
+ var worker = cluster.fork();
+ worker.on('listening', function() {
+ online += 1;
+ if (online === workers) {
+ cb();
+ }
+ });
+ }
+ };
+
+
+ var results = {
+ start: 0,
+ test: 0,
+ disconnect: 0
+ };
+
+ var test = function(again) {
+ //1. start cluster
+ startCluster(function() {
+ results.start += 1;
+
+ //2. test cluster
+ testCluster(function() {
+ results.test += 1;
+
+ //3. disconnect cluster
+ cluster.disconnect(function() {
+ results.disconnect += 1;
+
+ // run test again to confirm cleanup
+ if (again) {
+ test();
+ }
+ });
+ });
+ });
+ };
+
+ test(true);
+
+ process.once('exit', function() {
+ assert.equal(results.start, 2);
+ assert.equal(results.test, 2);
+ assert.equal(results.disconnect, 2);
+ });
+}
View
110 test/simple/test-cluster-worker-disconnect.js
@@ -0,0 +1,110 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+
+var common = require('../common');
+var assert = require('assert');
+var cluster = require('cluster');
+
+if (cluster.isWorker) {
+ var http = require('http');
+ http.Server(function() {
+
+ }).listen(common.PORT, '127.0.0.1');
+
+} else if (cluster.isMaster) {
+
+ var checks = {
+ cluster: {
+ emitDisconnect: false,
+ emitDeath: false,
+ callback: false
+ },
+ worker: {
+ emitDisconnect: false,
+ emitDeath: false,
+ state: false,
+ suicideMode: false,
+ died: false
+ }
+ };
+
+ // helper function to check if a process is alive
+ var alive = function(pid) {
+ try {
+ process.kill(pid, 0);
+ return true;
+ } catch (e) {
+ return false;
+ }
+ };
+
+ // start worker
+ var worker = cluster.fork();
+
+ // Disconnect worker when it is ready
+ worker.once('listening', function() {
+ worker.disconnect();
+ });
+
+ // Check cluster events
+ cluster.once('disconnect', function() {
+ checks.cluster.emitDisconnect = true;
+ });
+ cluster.once('death', function() {
+ checks.cluster.emitDeath = true;
+ });
+
+ // Check worker eventes and properties
+ worker.once('disconnect', function() {
+ checks.worker.emitDisconnect = true;
+ checks.worker.suicideMode = worker.suicide;
+ checks.worker.state = worker.state;
+ });
+
+ // Check that the worker died
+ worker.once('death', function() {
+ checks.worker.emitDeath = true;
+ checks.worker.died = !alive(worker.process.pid);
+ process.nextTick(function() {
+ process.exit(0);
+ });
+ });
+
+ process.once('exit', function() {
+
+ var w = checks.worker;
+ var c = checks.cluster;
+
+ // events
+ assert.ok(w.emitDisconnect, 'Disconnect event did not emit');
+ assert.ok(c.emitDisconnect, 'Disconnect event did not emit');
+ assert.ok(w.emitDeath, 'Death event did not emit');
+ assert.ok(c.emitDeath, 'Death event did not emit');
+
+ // flags
+ assert.equal(w.state, 'disconnected', 'The state property was not set');
+ assert.equal(w.suicideMode, true, 'Suicide mode was not set');
+
+ // is process alive
+ assert.ok(w.died, 'The worker did not die');
+ });
+}

0 comments on commit d927fbc

Please sign in to comment.