Permalink
Browse files

gracefully shutdown on SIGTERM / SIGHUP

git-svn-id: http://kazuho.31tools.com/svn/incline/trunk@257 4d3e2a30-9d6d-0410-bc8c-dac56cff10b3
  • Loading branch information...
kazuho
kazuho committed Nov 5, 2009
1 parent 4660891 commit 93b1b1150e13bea5c72b6586d7b8ec709ef4a081
View
@@ -1,5 +1,6 @@
extern "C" {
#include <fcntl.h>
+#include <signal.h>
}
#include <fstream>
#include <iostream>
@@ -67,6 +68,12 @@ static incline_dbms* dbh()
return h.get();
}
+static void shutdown_forwarder(int signum)
+{
+ cerr << "recevied signal:" << signum << ", shutting down..." << endl;
+ aq_driver()->should_exit_loop(true);
+}
+
int
main(int argc, char** argv)
{
@@ -178,6 +185,8 @@ main(int argc, char** argv)
exit(3);
}
}
+ signal(SIGHUP, shutdown_forwarder);
+ signal(SIGTERM, shutdown_forwarder);
aq_driver()->run_forwarder(1, log_fd);
} else {
fprintf(stderr, "unknown command: %s\n", command.c_str());
@@ -66,7 +66,7 @@ incline_driver_async_qtable::drop_table_of(const incline_def* _def,
}
void
-incline_driver_async_qtable::run_forwarder(int poll_interval, int log_fd) const
+incline_driver_async_qtable::run_forwarder(int poll_interval, int log_fd)
{
incline_fw_async_qtable::manager manager(this, poll_interval, log_fd);
vector<pthread_t> threads;
@@ -7,14 +7,19 @@ class incline_def_async_qtable;
class incline_dbms;
class incline_driver_async_qtable : public incline_driver_async {
+ typedef incline_driver_async super;
+protected:
+ bool should_exit_loop_;
public:
+ incline_driver_async_qtable() : super(), should_exit_loop_(false) {}
virtual incline_def* create_def() const;
virtual std::vector<std::string> create_table_all(bool if_not_exists, incline_dbms* dbh) const;
virtual std::vector<std::string> drop_table_all(bool if_exists) const;
std::string create_table_of(const incline_def* def, bool if_not_exists, incline_dbms* dbh) const;
std::string drop_table_of(const incline_def* def, bool if_exists) const;
- virtual void run_forwarder(int poll_interval, int log_fd) const;
- virtual bool should_exit_loop() const { return false; }
+ virtual void run_forwarder(int poll_interval, int log_fd);
+ virtual bool should_exit_loop() const { return should_exit_loop_; }
+ virtual void should_exit_loop(bool f) { should_exit_loop_ = f; }
protected:
std::string _create_table_of(const incline_def_async_qtable* def, const std::string& table_name, bool if_not_exists, incline_dbms* dbh) const;
virtual void do_build_enqueue_insert_sql(trigger_body& body, const incline_def* def, const std::string& src_table, action_t action, const std::vector<std::string>* cond) const;
@@ -480,7 +480,7 @@ incline_driver_sharded::drop_table_all(bool if_exists)
}
void
-incline_driver_sharded::run_forwarder(int poll_interval, int log_fd) const
+incline_driver_sharded::run_forwarder(int poll_interval, int log_fd)
{
incline_fw_sharded::manager shard_mgr(this, poll_interval, log_fd);
incline_fw_replicator::manager repl_mgr(this, poll_interval, log_fd);
@@ -499,14 +499,18 @@ incline_driver_sharded::run_forwarder(int poll_interval, int log_fd) const
bool
incline_driver_sharded::should_exit_loop() const
{
- for (vector<const rule*>::const_iterator ri = rules_.begin();
- ri != rules_.end();
- ++ri) {
- if ((*ri)->should_exit_loop()) {
- return true;
+ if (! super::should_exit_loop()) {
+ for (vector<const rule*>::const_iterator ri = rules_.begin();
+ ri != rules_.end();
+ ++ri) {
+ if ((*ri)->should_exit_loop()) {
+ const_cast<incline_driver_sharded*>(this)
+ ->super::should_exit_loop(true);
+ break;
+ }
}
}
- return false;
+ return super::should_exit_loop();
}
const incline_driver_sharded::rule*
@@ -70,7 +70,7 @@ class incline_driver_sharded : public incline_driver_async_qtable {
virtual incline_def* create_def() const;
virtual std::vector<std::string> create_table_all(bool if_not_exists, incline_dbms* dbh) const;
virtual std::vector<std::string> drop_table_all(bool if_exists) const;
- virtual void run_forwarder(int poll_interval, int log_fd) const;
+ virtual void run_forwarder(int poll_interval, int log_fd);
virtual bool should_exit_loop() const;
const rule* rule_of(const std::string& file) const;
std::pair<std::string, unsigned short> get_hostport() const {
View
@@ -27,15 +27,6 @@ incline_fw::manager::log_sql(const incline_dbms* dbms, const string& sql)
}
}
-bool
-incline_fw::manager::should_exit_loop() const
-{
- if (! should_exit_loop_ && driver_->should_exit_loop()) {
- const_cast<incline_fw::manager*>(this)->should_exit_loop_ = true;
- }
- return should_exit_loop_;
-}
-
incline_fw::incline_fw(manager* mgr, const incline_def_async_qtable* def,
incline_dbms* dbh)
: mgr_(mgr), def_(def), dbh_(dbh),
@@ -80,7 +71,7 @@ incline_fw::run()
} catch (domain_error& e) {
cerr << e.what() << endl;
}
- mgr_->should_exit_loop(true);
+ mgr_->driver()->should_exit_loop(true);
delete this;
return NULL;
}
View
@@ -13,18 +13,16 @@ class incline_fw {
class manager {
protected:
- const incline_driver_async_qtable* driver_;
+ incline_driver_async_qtable* driver_;
int poll_interval_;
int log_fd_;
- bool should_exit_loop_;
public:
- manager(const incline_driver_async_qtable* driver, int poll_interval, int log_fd) : driver_(driver), poll_interval_(poll_interval), log_fd_(log_fd), should_exit_loop_(false) {}
+ manager(incline_driver_async_qtable* driver, int poll_interval, int log_fd) : driver_(driver), poll_interval_(poll_interval), log_fd_(log_fd) {}
virtual ~manager() {}
const incline_driver_async_qtable* driver() const { return driver_; }
+ incline_driver_async_qtable* driver() { return driver_; }
int poll_interval() const { return poll_interval_; }
void log_sql(const incline_dbms* dbh, const std::string& sql);
- bool should_exit_loop() const;
- void should_exit_loop(bool f) { should_exit_loop_ = f; }
};
protected:
@@ -21,7 +21,7 @@ incline_fw_async_qtable::do_run()
{
string extra_cond, last_id;
- while (! mgr_->should_exit_loop()) {
+ while (! mgr_->driver()->should_exit_loop()) {
try {
vector<string> iq_ids;
vector<vector<string> > delete_pks, insert_rows;
@@ -48,7 +48,7 @@ incline_fw_replicator::do_run()
incline_dbms* dest_dbh = NULL;
string last_id;
- while (! mgr()->should_exit_loop()) {
+ while (! mgr()->driver()->should_exit_loop()) {
vector<string> iq_ids;
vector<vector<string> > delete_pks, insert_rows;
@@ -12,8 +12,9 @@ class incline_fw_replicator : public incline_fw {
public:
typedef super::manager super;
public:
- manager(const incline_driver_sharded* driver, int poll_interval, int log_fd) : super(driver, poll_interval, log_fd) {}
+ manager(incline_driver_sharded* driver, int poll_interval, int log_fd) : super(driver, poll_interval, log_fd) {}
const incline_driver_sharded* driver() const { return static_cast<const incline_driver_sharded*>(super::driver()); }
+ incline_driver_sharded* driver() { return static_cast<incline_driver_sharded*>(super::driver()); }
void start(std::vector<pthread_t>& threads);
};
View
@@ -39,9 +39,10 @@ class incline_fw_sharded : public incline_fw_async_qtable {
protected:
std::vector<std::pair<incline_driver_sharded::connect_params, writer*> > writers_;
public:
- manager(const incline_driver_sharded* driver, int poll_interval, int log_fd) : super(driver, poll_interval, log_fd), writers_() {}
+ manager(incline_driver_sharded* driver, int poll_interval, int log_fd) : super(driver, poll_interval, log_fd), writers_() {}
virtual ~manager();
const incline_driver_sharded* driver() const { return static_cast<const incline_driver_sharded*>(super::driver()); }
+ incline_driver_sharded* driver() { return static_cast<incline_driver_sharded*>(super::driver()); }
const incline_driver_sharded::shard_rule* rule_of(const incline_def_sharded* def) const;
const std::vector<std::pair<incline_driver_sharded::connect_params, writer*> >& writers() const { return writers_; }
void start(std::vector<pthread_t>& threads);
@@ -69,7 +69,7 @@ sub dbh_close {
my $fw_pid;
my $_sg = Scope::Guard->new(
sub {
- kill 9, $fw_pid
+ kill 'TERM', $fw_pid
if $fw_pid;
},
);
@@ -59,7 +59,7 @@
my ($fw_script, $fw_pid);
my $_sg = Scope::Guard->new(
sub {
- kill 9, $fw_pid
+ kill 'TERM', $fw_pid
if $fw_pid;
},
);
@@ -73,7 +73,7 @@ sub dbh_close {
my ($fw_script, $fw_pid);
my $_sg = Scope::Guard->new(
sub {
- kill 9, $fw_pid
+ kill 'TERM', $fw_pid
if $fw_pid;
},
);
@@ -165,7 +165,7 @@
my $fw_pid;
my $_sg = Scope::Guard->new(
sub {
- kill 9, $fw_pid;
+ kill 'TERM', $fw_pid;
},
);
unless ($fw_pid = fork()) {
@@ -156,7 +156,7 @@
my $fw_pid;
my $_sg = Scope::Guard->new(
sub {
- kill 9, $fw_pid;
+ kill 'TERM', $fw_pid;
},
);
unless ($fw_pid = fork()) {
@@ -161,7 +161,7 @@
my $fw_pid;
my $_sg = Scope::Guard->new(
sub {
- kill 9, $fw_pid;
+ kill 'TERM', $fw_pid;
},
);
unless ($fw_pid = fork()) {
@@ -157,7 +157,7 @@
my $fw_pid;
my $_sg = Scope::Guard->new(
sub {
- kill 9, $fw_pid;
+ kill 'TERM', $fw_pid;
},
);
unless ($fw_pid = fork()) {
View
@@ -119,7 +119,7 @@ sub start_fw {
);
die "failed to exec forwarder: $?";
}
- Scope::Guard->new(sub { kill 9, $fw_pid if $fw_pid });
+ Scope::Guard->new(sub { kill 'TERM', $fw_pid if $fw_pid });
}
# start forwarder

0 comments on commit 93b1b11

Please sign in to comment.