Skip to content

Commit

Permalink
Graceful shutdown and restart
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Sergeant committed May 19, 2016
1 parent f103c41 commit 0c58308
Show file tree
Hide file tree
Showing 12 changed files with 219 additions and 44 deletions.
73 changes: 53 additions & 20 deletions bin/haraka
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ var fs = require('fs'),
path = require('path'),
nopt = require("nopt"),
os = require('os'),
async = require('async'),
sprintf = require('sprintf-js').sprintf,
path = require('path'),
base = path.join(__dirname, '..'),
Expand All @@ -25,7 +26,8 @@ var fs = require('fs'),
"qlist": Boolean,
"qstat": Boolean,
"qempty": Boolean,
"qunstick": String,
"qunstick": [String, null],
"graceful": Boolean,
"order": Boolean,
"test": [String, Array],
"ip": String,
Expand Down Expand Up @@ -380,31 +382,38 @@ else if (parsed.qunstick) {
var domain = parsed.qunstick.toLowerCase();
process.env.HARAKA = parsed.configs;
var logger = require(path.join(base, "logger"));
if (!parsed.verbose)
logger.log = function () {} // disable logging for this
var cb = function () {
process.exit();
};
if (domain == 'true') {
send_internal_command('flushQueue', cb);
}
else {
send_internal_command('flushQueue ' + domain, cb);
}
}
else if (parsed.graceful) {
if (!parsed.configs) {
fail("graceful option requires config path");
}
process.env.HARAKA = parsed.configs;
var logger = require(path.join(base, "logger"));
if (!parsed.verbose)
logger.log = function () {} // disable logging for this
var config = require(path.join(base, "config"));
var key = config.get("internalcmd_key");
var smtp_ini = config.get("smtp.ini");
var listen_addrs = require(path.join(base, "server")).get_listen_addrs(smtp_ini.main);
var hp = /^\[?([^\]]+)\]?:(\d+)$/.exec(listen_addrs[0]);
if (!hp) throw "No listen address in smtp.ini";
console.log("Connecting to " + listen_addrs[0]);
var sock = require('net').connect(hp[2], hp[1], function () {
sock.once('data', function (data) {
// this is the greeting. Ignore it...
sock.write('INTERNALCMD ' + (key ? ('key:' + key + ' ') : '') + 'flushQueue ' + domain + '\r\n');
sock.once('data', function (data) {
console.log(data.toString());
sock.write('QUIT\r\n');
sock.once('data', function (data) {
sock.end();
})
});
});
});
sock.on('end', function () {
if (!smtp_ini.main.nodes) {
console.log("Graceful restart not possible without `nodes` value in smtp.ini");
process.exit();
})
}
else {
send_internal_command('gracefulRestart', function () {
process.exit();
});
}
}
else if (parsed.qempty) {
if (!parsed.configs) {
Expand Down Expand Up @@ -642,3 +651,27 @@ else {
console.log("\033[31;40mError\033[0m: Undefined or erroneous arguments\n");
console.log(usage);
}

function send_internal_command (cmd, cb) {
var config = require(path.join(base, "config"));
var key = config.get("internalcmd_key");
var smtp_ini = config.get("smtp.ini");
var listen_addrs = require(path.join(base, "server")).get_listen_addrs(smtp_ini.main);
var hp = /^\[?([^\]]+)\]?:(\d+)$/.exec(listen_addrs[0]);
if (!hp) throw "No listen address in smtp.ini";
// console.log("Connecting to " + listen_addrs[0]);
var sock = require('net').connect(hp[2], hp[1], function () {
sock.once('data', function (data) {
// this is the greeting. Ignore it...
sock.write('INTERNALCMD ' + (key ? ('key:' + key + ' ') : '') + cmd + '\r\n');
sock.once('data', function (data) {
console.log(data.toString().replace(/\r?\n$/, ''));
sock.write('QUIT\r\n');
sock.once('data', function (data) {
sock.end();
})
});
});
});
sock.on('end', cb);
}
7 changes: 7 additions & 0 deletions configfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,13 @@ cfreader.ensure_enoent_timer = function () {
}, 60 * 1000);
};

process.on('message', function (msg) {
if (msg.event && msg.event == 'cfreader.shutdown') {
logger.loginfo("[cfreader] Shutting down enoent timer");
clearInterval(cfreader._enoent_timer);
}
});

cfreader.empty_config = function(type) {
if (type === 'ini') {
return { main: {} };
Expand Down
2 changes: 1 addition & 1 deletion connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ Connection.prototype.cmd_internalcmd = function (line) {
}

require('./server').sendToMaster(command, results);
return this.respond(250, "Command sent for execution. Check logs for results.");
return this.respond(250, "Command sent for execution. Check Haraka logs for results.");
}

Connection.prototype.cmd_helo = function(line) {
Expand Down
10 changes: 9 additions & 1 deletion haraka.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,18 @@ process.on('uncaughtException', function (err) {
logger.dump_and_exit(1);
});

var shutting_down = false;
['SIGTERM', 'SIGINT'].forEach(function (sig) {
process.on(sig, function () {
if (shutting_down) return process.exit(1);
shutting_down = true;
process.title = path.basename(process.argv[1], '.js');
logger.lognotice(sig + ' received');
logger.dump_and_exit(1);
logger.dump_and_exit(function () {
if (server.cluster && server.cluster.isMaster) {
server.gracefulShutdown();
}
});
});
});

Expand All @@ -51,6 +58,7 @@ process.on('SIGHUP', function () {
});

process.on('exit', function(code) {
if (shutting_down) return;
process.title = path.basename(process.argv[1], '.js');
logger.lognotice('Shutting down');
logger.dump_logs();
Expand Down
1 change: 1 addition & 0 deletions logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ logger.dump_logs = function (cb) {

logger.dump_and_exit = function (code) {
this.dump_logs(function () {
if (util.isFunction(code)) return code();
process.exit(code);
});
}
Expand Down
4 changes: 4 additions & 0 deletions outbound.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ process.on('message', function (msg) {
exports.flush_queue(msg.domain, process.pid);
return;
}
if (msg.event && msg.event == 'outbound.shutdown') {
logger.loginfo("[outbound] Shutting down temp fail queue");
temp_fail_queue.shutdown();
}
// ignores the message
});

Expand Down
20 changes: 20 additions & 0 deletions plugins.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ function Plugin (name) {
this.hooks = {};
}

exports.shutdown_plugins = function () {
for (var i in exports.registered_plugins) {
if (exports.registered_plugins[i].shutdown) {
exports.registered_plugins[i].shutdown();
}
}
}

process.on('message', function (msg) {
if (msg.event && msg.event == 'plugins.shutdown') {
logger.loginfo("[plugins] Shutting down plugins");
exports.shutdown_plugins();
}
});

Plugin.prototype.core_require = function (name) {
return require('./' + name);
};
Expand Down Expand Up @@ -136,6 +151,11 @@ Plugin.prototype.inherits = function (parent_name) {
if (!this[method]) {
this[method] = parent_plugin[method];
}
// else if (method == 'shutdown') { // Method is in this module, so it exists in the plugin
// if (!this.hasOwnProperty('shutdown')) {
// this[method] = parent_plugin[method];
// }
// }
}
if (parent_plugin.register) {
parent_plugin.register.call(this);
Expand Down
6 changes: 6 additions & 0 deletions plugins/connect.p0f.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ function P0FClient(path) {
connect();
}

P0FClient.prototype.shutdown = function () {
if (self.restart_interval) {
clearInterval(self.restart_interval);
}
}

P0FClient.prototype.decode_response = function (data) {
var decode_string = function (data2, start, end) {
var str = '';
Expand Down
5 changes: 5 additions & 0 deletions plugins/dns_list_base.js
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ exports.check_zones = function (interval) {
}
};

exports.shutdown = function () {
this.logdebug("Shutting down re-test interval");
clearInterval(this._interval);
};

exports.disable_zone = function (zone, result) {
if (!zone) return false;
if (!this.zones) return false;
Expand Down
8 changes: 6 additions & 2 deletions plugins/process_title.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,14 @@ exports.hook_init_child = function (next, server) {
server.notes.pt_mps_max = 0;
var title = 'Haraka (worker)';
process.title = title;
setupInterval(title, server);
this._interval = setupInterval(title, server);
return next();
};

exports.shutdown = function () {
clearInterval(this._interval);
};

exports.hook_connect_init = function (next, connection) {
var server = connection.server;
connection.notes.pt_connect_run = true;
Expand Down Expand Up @@ -137,7 +141,7 @@ exports.hook_data = function (next, connection) {

var setupInterval = function (title, server) {
// Set up a timer to update title
setInterval(function () {
return setInterval(function () {
// Connections per second
var av_cps = Math.round((server.notes.pt_connections/process.uptime()*100))/100;
var cps = server.notes.pt_connections - server.notes.pt_cps_diff;
Expand Down
121 changes: 102 additions & 19 deletions server.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,87 @@ Server.flushQueue = function (domain) {
}
};

var gracefull_in_progress = false;

Server.gracefulRestart = function () {
Server._graceful();
}

Server.gracefulShutdown = function () {
Server._graceful(function () {
process.exit(0);
});
}

Server._graceful = function (shutdown) {
if (!Server.cluster) {
return;
}

if (gracefull_in_progress) {
logger.lognotice("Restart currently in progress - ignoring request");
return;
}

gracefull_in_progress = true;
// TODO: Make these configurable
var disconnect_timeout = 30;
var exit_timeout = 30;
cluster.removeAllListeners('exit');
// only reload one worker at a time
// otherwise, we'll have a time when no connection handlers are running
var worker_ids = Object.keys(cluster.workers);
async.eachSeries(worker_ids, function (id, cb) {
logger.lognotice("Killing node: " + id);
var worker = cluster.workers[id];
['outbound', 'cfreader', 'plugins'].forEach(function (module) {
worker.send({event: module + '.shutdown'});
})
worker.disconnect();
var disconnect_received = false;
var disconnect_timer = setTimeout(function () {
if (!disconnect_received) {
logger.logcrit("Disconnect never received by worker. Killing.");
worker.kill();
}
}, disconnect_timeout * 1000);
worker.once("disconnect", function() {
clearTimeout(disconnect_timer);
disconnect_received = true;
logger.lognotice("Disconnect complete");
var dead = false;
var timer = setTimeout(function () {
if (!dead) {
logger.logcrit("Worker " + id + " failed to shutdown. Killing.");
worker.kill();
}
}, exit_timeout * 1000);
worker.once("exit", function () {
dead = true;
clearTimeout(timer);
if (shutdown) cb();
});
});
if (shutdown) return;
var newWorker = cluster.fork();
newWorker.once("listening", function() {
logger.lognotice("Replacement worker online.");
newWorker.on('exit', function (code, signal) {
cluster_exit_listener(newWorker, code, signal);
});
cb();
});
}, function (err) {
// err can basically never happen, but fuckit...
if (err) logger.logerror(err);
if (shutdown) {
return shutdown();
}
gracefull_in_progress = false;
logger.lognotice("Reload complete, workers: " + JSON.stringify(Object.keys(cluster.workers)));
});
}

Server.sendToMaster = function (command, params) {
// console.log("Send to master: ", command);
if (Server.cluster) {
Expand Down Expand Up @@ -364,28 +445,30 @@ Server.init_master_respond = function (retval, msg) {
logger.lognotice('worker ' + worker.id + ' listening on ' +
address.address + ':' + address.port);
});
cluster.on('exit', function (worker, code, signal) {
if (signal) {
logger.lognotice('worker ' + worker.id +
' killed by signal ' + signal);
}
else if (code !== 0) {
logger.lognotice('worker ' + worker.id +
' exited with error code: ' + code);
}
if (signal || code !== 0) {
// Restart worker
var new_worker = cluster.fork({
CLUSTER_MASTER_PID: process.pid
});
new_worker.send({
event: 'outbound.load_pid_queue', data: worker.process.pid,
});
}
});
cluster.on('exit', cluster_exit_listener);
});
};

function cluster_exit_listener (worker, code, signal) {
if (signal) {
logger.lognotice('worker ' + worker.id +
' killed by signal ' + signal);
}
else if (code !== 0) {
logger.lognotice('worker ' + worker.id +
' exited with error code: ' + code);
}
if (signal || code !== 0) {
// Restart worker
var new_worker = cluster.fork({
CLUSTER_MASTER_PID: process.pid
});
new_worker.send({
event: 'outbound.load_pid_queue', data: worker.process.pid,
});
}
}

Server.init_child_respond = function (retval, msg) {
switch (retval) {
case constants.ok:
Expand Down
Loading

0 comments on commit 0c58308

Please sign in to comment.