Permalink
Browse files

add replicator (to be tested0

git-svn-id: http://kazuho.31tools.com/svn/incline/trunk@240 4d3e2a30-9d6d-0410-bc8c-dac56cff10b3
  • Loading branch information...
1 parent d0243f5 commit 9270d181cab58a7fb88b2bd3988f5b0aba5d13f2 kazuho committed Nov 1, 2009
View
@@ -7,6 +7,7 @@ incline_SOURCES += incline_def_sharded.cc incline_def_sharded.h incline_driver_s
incline_SOURCES += incline_fw.cc incline_fw.h
incline_SOURCES += incline_fw_async_qtable.cc incline_fw_async_qtable.h
incline_SOURCES += incline_fw_sharded.cc incline_fw_sharded.h
+incline_SOURCES += incline_fw_replicator.cc incline_fw_replicator.h
if WITH_MYSQL
incline_SOURCES += incline_mysql.cc incline_mysql.h
endif
@@ -10,9 +10,6 @@ incline_def_sharded::parse(const picojson::value& def)
return err;
}
// post init
- if (direct_expr_column_.empty()) {
- return "no shard.key defined for table:" + destination();
- }
if (shard_file_.empty()) {
return "no shard.file defined for table:" + destination();
}
@@ -9,6 +9,7 @@ extern "C" {
#include "incline_def_sharded.h"
#include "incline_driver_sharded.h"
#include "incline_fw_sharded.h"
+#include "incline_fw_replicator.h"
#include "incline_mgr.h"
#include "incline_util.h"
@@ -316,15 +317,40 @@ incline_driver_sharded::rule::_get_file_mtime() const
return st.st_mtime;
}
+string
+incline_driver_sharded::replicator_rule::parse(const picojson::value& def)
+{
+ string err;
+ connect_params source(file());
+ if (! (err = src_cp_.parse(def.get("source"))).empty()) {
+ return err;
+ }
+ const picojson::value& dest = def.get("destination");
+ if (! dest.is<picojson::array>()) {
+ return "destination is not of type array";
+ }
+ for (picojson::array::const_iterator di = dest.get<picojson::array>().begin();
+ di != dest.get<picojson::array>().end();
+ ++di) {
+ connect_params cp(file());
+ if (! (err = cp.parse(*di)).empty()) {
+ return err;
+ }
+ dest_cp_.push_back(cp);
+ }
+ return string();
+}
+
void
incline_driver_sharded::run_forwarder(int poll_interval, int log_fd) const
{
incline_fw_sharded::manager shard_mgr(this, poll_interval, log_fd);
+ incline_fw_replicator::manager repl_mgr(this, poll_interval, log_fd);
vector<pthread_t> threads;
- // create shard forwarders and writers
+ // create forwarders and writers
shard_mgr.start(threads);
- // TODO create replicators
+ repl_mgr.start(threads);
// wait until all forwarder threads exit
while (! threads.empty()) {
pthread_join(threads.back(), NULL);
@@ -358,21 +384,43 @@ incline_driver_sharded::init(const string& host, unsigned short port)
rules_.push_back(rl);
}
if (def->direct_expr_column().empty()) {
- if (dynamic_cast<const replication_rule*>(rl) == NULL) {
- return "TODO write error message";
+ if (dynamic_cast<const replicator_rule*>(rl) == NULL) {
+ return "no shard.key defined for table:" + def->destination();
}
} else {
if (dynamic_cast<const shard_rule*>(rl) == NULL) {
- return "TODO write error message";
+ return "shard.key should not be defined fo table:" + def->destination()
+ + " using replicator rule";
}
- // TODO check if given host:port exists in list
}
}
- // build list of all destinations (as well as checking collisions)
- vector<connect_params> all_cp;
- if (! (err = get_all_connect_params(all_cp)).empty()) {
- return err;
+ {
+ // check collision of shard nodes (same instance must not reappear, since
+ // the node will be difficult to divide)
+ vector<connect_params> all_cp;
+ for (vector<const rule*>::const_iterator ri = rules_.begin();
+ ri != rules_.end();
+ ++ri) {
+ if (const shard_rule* srl = dynamic_cast<const shard_rule*>(*ri)) {
+ vector<connect_params> partial = srl->get_all_connect_params();
+ for (vector<connect_params>::const_iterator pi = partial.begin();
+ pi != partial.end();
+ ++pi) {
+ for (vector<connect_params>::const_iterator ai = all_cp.begin();
+ ai != all_cp.end();
+ ++ai) {
+ if (pi->host == ai->host && pi->port == ai->port) {
+ stringstream ss;
+ ss << "collision found for " << pi->host << ':' << pi->port
+ << " in files:" << ai->file << " and " << pi->file;
+ return ss.str();
+ }
+ }
+ all_cp.push_back(*pi);
+ }
+ }
+ }
}
cur_host_ = host;
@@ -386,34 +434,6 @@ incline_driver_sharded::create_def() const
return new incline_def_sharded();
}
-string
-incline_driver_sharded::get_all_connect_params(vector<connect_params>& all_cp) const
-{
- for (vector<const rule*>::const_iterator ri = rules_.begin();
- ri != rules_.end();
- ++ri) {
- vector<connect_params> partial = (*ri)->get_all_connect_params();
- for (vector<connect_params>::const_iterator pi = partial.begin();
- pi != partial.end();
- ++pi) {
- for (vector<connect_params>::const_iterator ai = all_cp.begin();
- ai != all_cp.end();
- ++ai) {
- if (pi->host == ai->host && pi->port == ai->port) {
- // same instance must not reappear (since it will be difficult to
- // divide)
- stringstream ss;
- ss << "collision found for " << pi->host << ':' << pi->port
- << " in files:" << ai->file << " and " << pi->file;
- return ss.str();
- }
- }
- all_cp.push_back(*pi);
- }
- }
- return string();
-}
-
bool
incline_driver_sharded::should_exit_loop() const
{
@@ -495,27 +515,29 @@ incline_driver_sharded::do_build_direct_expr(const incline_def_async* _def,
assert(def != NULL);
const rule* rl = rule_of(def->shard_file());
assert(rl != NULL);
- return rl->build_expr_for(column_expr, cur_host_, cur_port_);
+ const shard_rule* srl = dynamic_cast<const shard_rule*>(rl);
+ assert(srl != NULL);
+ return srl->build_expr_for(column_expr, cur_host_, cur_port_);
}
bool
incline_driver_sharded::_is_source_host_of(const incline_def_sharded* def) const
{
- const rule* rule = rule_of(def->shard_file());
- assert(rule != NULL);
- vector<connect_params> cp = rule->get_all_connect_params();
- for (vector<connect_params>::const_iterator ci = cp.begin();
- ci != cp.end();
- ++ci) {
- if (dynamic_cast<const shard_rule*>(rule) != NULL) {
+ const rule* rl = rule_of(def->shard_file());
+ assert(rl != NULL);
+ if (const shard_rule* srl = dynamic_cast<const shard_rule*>(rl)) {
+ vector<connect_params> cp = srl->get_all_connect_params();
+ for (vector<connect_params>::const_iterator ci = cp.begin();
+ ci != cp.end();
+ ++ci) {
if (ci->host == cur_host_ && ci->port == cur_port_) {
return true;
}
- } else if (dynamic_cast<const replication_rule*>(rule) != NULL) {
- // TODO check if I am the source host
- } else {
- assert(0);
}
+ return false;
+ } else if (const replicator_rule* rrl
+ = dynamic_cast<const replicator_rule*>(rl)) {
+ return rrl->source().host == cur_host_ && rrl->source().port == cur_port_;
}
- return false;
+ assert(0);
}
@@ -30,9 +30,6 @@ class incline_driver_sharded : public incline_driver_async_qtable {
virtual ~rule() {}
std::string file() const { return file_; }
bool should_exit_loop() const;
- virtual std::vector<connect_params> get_all_connect_params() const = 0;
- virtual connect_params get_connect_params_for(const std::string& key) const = 0;
- virtual std::string build_expr_for(const std::string& column_expr, const std::string& host, unsigned short port) const = 0;
protected:
virtual std::string parse(const picojson::value& def) = 0;
time_t _get_file_mtime() const;
@@ -43,11 +40,21 @@ class incline_driver_sharded : public incline_driver_async_qtable {
class shard_rule : public rule {
public:
shard_rule(const std::string& file) : rule(file) {}
+ virtual std::vector<connect_params> get_all_connect_params() const = 0;
+ virtual connect_params get_connect_params_for(const std::string& key) const = 0;
+ virtual std::string build_expr_for(const std::string& column_expr, const std::string& host, unsigned short port) const = 0;
};
- class replication_rule : public rule {
+ class replicator_rule : public rule {
+ protected:
+ connect_params src_cp_;
+ std::vector<connect_params> dest_cp_;
public:
- replication_rule(const std::string& file) : rule(file) {}
+ replicator_rule(const std::string& file) : rule(file), src_cp_(file), dest_cp_() {}
+ const connect_params& source() const { return src_cp_; }
+ const std::vector<connect_params>& destination() const { return dest_cp_; }
+ protected:
+ virtual std::string parse(const picojson::value& def);
};
protected:
@@ -60,7 +67,6 @@ class incline_driver_sharded : public incline_driver_async_qtable {
std::string init(const std::string& host, unsigned short port);
virtual incline_def* create_def() const;
virtual void run_forwarder(int poll_interval, int log_fd) const;
- std::string get_all_connect_params(std::vector<connect_params>& all_cp) const;
virtual bool should_exit_loop() const;
const rule* rule_of(const std::string& file) const;
std::pair<std::string, unsigned short> get_hostport() const {
@@ -0,0 +1,112 @@
+#include "incline_dbms.h"
+#include "incline_def_sharded.h"
+#include "incline_driver_sharded.h"
+#include "incline_fw_replicator.h"
+#include "incline_mgr.h"
+#include "start_thread.h"
+
+using namespace std;
+
+void
+incline_fw_replicator::manager::start(vector<pthread_t>& threads)
+{
+ const vector<incline_def*>& defs = driver()->mgr()->defs();
+ pair<string, unsigned short> cur_hostport(driver()->get_hostport());
+
+ for (vector<incline_def*>::const_iterator di = defs.begin();
+ di != defs.end();
+ ++di) {
+ const incline_def_sharded* def
+ = static_cast<const incline_def_sharded*>(*di);
+ const incline_driver_sharded::rule* rl
+ = driver()->rule_of(def->shard_file());
+ assert(rl != NULL);
+ const incline_driver_sharded::replicator_rule* rrl
+ = dynamic_cast<const incline_driver_sharded::replicator_rule*>(rl);
+ if (rrl != NULL) {
+ if (rrl->source().host == cur_hostport.first
+ && rrl->source().port == cur_hostport.second) {
+ const vector<incline_driver_sharded::connect_params>& dest
+ = rrl->destination();
+ for (vector<incline_driver_sharded::connect_params>::const_iterator ddi
+ = dest.begin();
+ ddi != dest.end();
+ ++ddi) {
+ incline_dbms* dbh = incline_dbms::factory_->create();
+ assert(dbh != NULL);
+ threads.push_back(start_thread(new incline_fw_replicator(this, def,
+ dbh, *ddi)));
+ }
+ }
+ }
+ }
+}
+
+void*
+incline_fw_replicator::run()
+{
+ incline_dbms* dest_dbh = NULL;
+ string last_id;
+
+ while (! mgr()->driver()->should_exit_loop()) {
+
+ vector<string> iq_ids;
+ vector<vector<string> > delete_pks, insert_rows;
+
+ // setup
+ try {
+ if (dest_dbh == NULL) {
+ dest_dbh = dest_cp_.connect();
+ vector<vector<incline_dbms::value_t> > res;
+ dbh_->query(res,
+ "SELECT last_id FROM _iq_repl WHERE tbl_name='"
+ + dest_dbh->escape(def()->destination()) + '\'');
+ last_id = res.empty() ? string("0") : *res[0][0];
+ }
+ } catch (incline_dbms::error_t& err) {
+ cerr << err.what() << endl;
+ goto ON_DEST_ERROR;
+ }
+
+ // fetch rows
+ fetch_rows("_iq_id>" + last_id, iq_ids, delete_pks, insert_rows);
+
+ // sleep and retry if no data
+ if (iq_ids.empty()) {
+ sleep(mgr()->poll_interval());
+ goto ON_NEXT;
+ }
+ last_id = iq_ids.back();
+
+ // write to target database
+ try {
+ dest_dbh->execute("BEGIN");
+ if (! delete_pks.empty()) {
+ this->delete_rows(dest_dbh, delete_pks);
+ }
+ if (! insert_rows.empty()) {
+ this->insert_rows(dest_dbh, insert_rows);
+ }
+ string sql("INSERT INTO _iq_repl (tbl_name,last_id) VALUES ('" + def()->destination() + "'," + last_id + ") ON DUPLICATE KEY UPDATE last_id=VALUES(last_id)");
+ mgr_->log_sql(dest_dbh, sql);
+ dest_dbh->execute(sql);
+ dest_dbh->execute("COMMIT");
+ } catch (incline_dbms::error_t& err) {
+ cerr << err.what() << endl;
+ goto ON_DEST_ERROR;
+ }
+ // done
+ goto ON_NEXT;
+
+ ON_DEST_ERROR:
+ delete dest_dbh;
+ dest_dbh = NULL;
+ sleep(mgr()->poll_interval());
+
+ ON_NEXT:
+ ;
+ }
+
+ delete dest_dbh;
+ return NULL;
+}
@@ -0,0 +1,30 @@
+#ifndef incline_fw_replicator_h
+#define incline_fw_replicator_h
+
+#include "incline_driver_sharded.h"
+#include "incline_fw.h"
+
+class incline_fw_replicator : public incline_fw {
+public:
+ typedef incline_fw super;
+
+ class manager : public super::manager {
+ public:
+ typedef super::manager super;
+ public:
+ manager(const incline_driver_sharded* driver, int poll_interval, int log_fd) : super(driver, poll_interval, log_fd) {}
+ const incline_driver_sharded* driver() const { return static_cast<const incline_driver_sharded*>(super::driver()); }
+ void start(std::vector<pthread_t>& threads);
+ };
+
+protected:
+ incline_driver_sharded::connect_params dest_cp_;
+public:
+ incline_fw_replicator(manager* mgr, const incline_def_sharded* def, incline_dbms* dbh, const incline_driver_sharded::connect_params& dest_cp) : super(mgr, def, dbh), dest_cp_(dest_cp) {}
+ const manager* mgr() const { return static_cast<const manager*>(super::mgr()); }
+ manager* mgr() { return static_cast<manager*>(super::mgr()); }
+ const incline_def_sharded* def() const { return static_cast<const incline_def_sharded*>(super::def()); }
+ virtual void* run();
+};
+
+#endif
Oops, something went wrong.

0 comments on commit 9270d18

Please sign in to comment.