Permalink
Browse files

only create queue tables that are necessary, add test

git-svn-id: http://kazuho.31tools.com/svn/incline/trunk@247 4d3e2a30-9d6d-0410-bc8c-dac56cff10b3
  • Loading branch information...
1 parent 40fe76f commit b9334cae2c45dd45c9331bac5bfdd7cdec17fb23 kazuho committed Nov 2, 2009
@@ -9,8 +9,8 @@ class incline_dbms;
class incline_driver_async_qtable : public incline_driver_async {
public:
virtual incline_def* create_def() const;
- std::vector<std::string> create_table_all(bool if_not_exists, incline_dbms* dbh) const;
- std::vector<std::string> drop_table_all(bool if_exists) const;
+ virtual std::vector<std::string> create_table_all(bool if_not_exists, incline_dbms* dbh) const;
+ virtual std::vector<std::string> drop_table_all(bool if_exists) const;
std::string create_table_of(const incline_def* def, bool if_not_exists, incline_dbms* dbh) const;
std::string drop_table_of(const incline_def* def, bool if_exists) const;
virtual void run_forwarder(int poll_interval, int log_fd) const;
@@ -256,6 +256,21 @@ incline_driver_sharded::connect_params::connect()
return incline_dbms::factory_->create(host, port, username, password);
}
+const incline_driver_sharded::connect_params*
+incline_driver_sharded::connect_params::find(const vector<connect_params>& cp,
+ const string& host,
+ unsigned short port)
+{
+ for (vector<connect_params>::const_iterator ci = cp.begin();
+ ci != cp.end();
+ ++ci) {
+ if (ci->host == host && ci->port == port) {
+ return &*ci;
+ }
+ }
+ return NULL;
+}
+
incline_driver_sharded::rule*
incline_driver_sharded::rule::parse(const string& file, string& err)
{
@@ -391,15 +406,12 @@ incline_driver_sharded::init(const string& host, unsigned short port)
for (vector<connect_params>::const_iterator pi = partial.begin();
pi != partial.end();
++pi) {
- for (vector<connect_params>::const_iterator ai = all_cp.begin();
- ai != all_cp.end();
- ++ai) {
- if (pi->host == ai->host && pi->port == ai->port) {
- stringstream ss;
- ss << "collision found for " << pi->host << ':' << pi->port
- << " in files:" << ai->file << " and " << pi->file;
- return ss.str();
- }
+ if (const connect_params* col = connect_params::find(all_cp, pi->host,
+ pi->port)) {
+ stringstream ss;
+ ss << "collision found for " << pi->host << ':' << pi->port
+ << " in files:" << col->file << " and " << pi->file;
+ return ss.str();
}
all_cp.push_back(*pi);
}
@@ -422,19 +434,48 @@ vector<string>
incline_driver_sharded::create_table_all(bool if_not_exists, incline_dbms* dbh)
const
{
- vector<string> r = super::create_table_all(if_not_exists, dbh);
- r.push_back(string("CREATE TABLE ") + (if_not_exists ? "IF NOT EXISTS " : "")
- + "_iq_repl (tbl_name VARCHAR(255) NOT NULL,_iq_id BIGINT NOT NULL,PRIMARY KEY (tbl_name))" + incline_dbms::factory_->create_table_suffix());
+ vector<string> r;
+ bool is_repl_dest = false;
+ for (vector<incline_def*>::const_iterator di = mgr_->defs().begin();
+ di != mgr_->defs().end();
+ ++di) {
+ const incline_def_sharded* def
+ = static_cast<const incline_def_sharded*>(*di);
+ if (is_src_host_of(def)) {
+ r.push_back(create_table_of(def, if_not_exists, dbh));
+ }
+ is_repl_dest = is_repl_dest || is_replicator_dest_host_of(def);
+ }
+ if (is_repl_dest) {
+ r.push_back(string("CREATE TABLE ")
+ + (if_not_exists ? "IF NOT EXISTS " : "")
+ + "_iq_repl (tbl_name VARCHAR(255) NOT NULL,"
+ "last_id BIGINT NOT NULL,PRIMARY KEY (tbl_name))"
+ + incline_dbms::factory_->create_table_suffix());
+ }
return r;
}
vector<string>
incline_driver_sharded::drop_table_all(bool if_exists)
const
{
- vector<string> r = super::drop_table_all(if_exists);
- r.push_back(string("DROP TABLE ") + (if_exists ? "IF EXISTS " : "")
- + "_iq_repl");
+ vector<string> r;
+ bool is_repl_dest = false;
+ for (vector<incline_def*>::const_iterator di = mgr_->defs().begin();
+ di != mgr_->defs().end();
+ ++di) {
+ const incline_def_sharded* def
+ = static_cast<const incline_def_sharded*>(*di);
+ if (is_src_host_of(def)) {
+ r.push_back(drop_table_of(def, if_exists));
+ }
+ is_repl_dest = is_repl_dest || is_replicator_dest_host_of(def);
+ }
+ if (is_repl_dest) {
+ r.push_back(string("DROP TABLE ") + (if_exists ? "IF EXISTS " : "")
+ + "_iq_repl");
+ }
return r;
}
@@ -483,6 +524,35 @@ incline_driver_sharded::rule_of(const string& file) const
return NULL;
}
+bool
+incline_driver_sharded::is_src_host_of(const incline_def_sharded* def) const
+{
+ const rule* rl = rule_of(def->shard_file());
+ assert(rl != NULL);
+ if (const shard_rule* srl = dynamic_cast<const shard_rule*>(rl)) {
+ return
+ connect_params::find(srl->get_all_connect_params(), cur_host_, cur_port_)
+ != NULL;
+ } else if (const replicator_rule* rrl
+ = dynamic_cast<const replicator_rule*>(rl)) {
+ return rrl->source().host == cur_host_ && rrl->source().port == cur_port_;
+ }
+ assert(0);
+}
+
+bool
+incline_driver_sharded::is_replicator_dest_host_of(const incline_def_sharded*
+ def) const
+{
+ const rule* rl = rule_of(def->shard_file());
+ assert(rl != NULL);
+ if (const replicator_rule* rrl = dynamic_cast<const replicator_rule*>(rl)) {
+ return
+ connect_params::find(rrl->destination(), cur_host_, cur_port_) != NULL;
+ }
+ return false;
+}
+
void
incline_driver_sharded::_build_insert_from_def(trigger_body& body,
const incline_def* _def,
@@ -493,7 +563,7 @@ incline_driver_sharded::_build_insert_from_def(trigger_body& body,
const incline_def_sharded* def
= dynamic_cast<const incline_def_sharded*>(_def);
assert(def != NULL);
- if (_is_source_host_of(def)) {
+ if (is_src_host_of(def)) {
super::_build_insert_from_def(body, def, src_table, action, cond);
}
}
@@ -507,7 +577,7 @@ incline_driver_sharded::_build_delete_from_def(trigger_body& body,
const incline_def_sharded* def
= dynamic_cast<const incline_def_sharded*>(_def);
assert(def != NULL);
- if (_is_source_host_of(def)) {
+ if (is_src_host_of(def)) {
super::_build_delete_from_def(body, def, src_table, cond);
}
}
@@ -522,7 +592,7 @@ incline_driver_sharded::_build_update_merge_from_def(trigger_body& body,
const incline_def_sharded* def
= dynamic_cast<const incline_def_sharded*>(_def);
assert(def != NULL);
- if (_is_source_host_of(def)) {
+ if (is_src_host_of(def)) {
super::_build_update_merge_from_def(body, def, src_table, cond);
}
}
@@ -540,25 +610,3 @@ incline_driver_sharded::do_build_direct_expr(const incline_def_async* _def,
assert(srl != NULL);
return srl->build_expr_for(column_expr, cur_host_, cur_port_);
}
-
-bool
-incline_driver_sharded::_is_source_host_of(const incline_def_sharded* def) const
-{
- const rule* rl = rule_of(def->shard_file());
- assert(rl != NULL);
- if (const shard_rule* srl = dynamic_cast<const shard_rule*>(rl)) {
- vector<connect_params> cp = srl->get_all_connect_params();
- for (vector<connect_params>::const_iterator ci = cp.begin();
- ci != cp.end();
- ++ci) {
- if (ci->host == cur_host_ && ci->port == cur_port_) {
- return true;
- }
- }
- return false;
- } else if (const replicator_rule* rrl
- = dynamic_cast<const replicator_rule*>(rl)) {
- return rrl->source().host == cur_host_ && rrl->source().port == cur_port_;
- }
- assert(0);
-}
@@ -19,6 +19,8 @@ class incline_driver_sharded : public incline_driver_async_qtable {
connect_params(const std::string& f) : file(f), host(), port(), username(), password() {}
std::string parse(const picojson::value& def);
incline_dbms* connect();
+ public:
+ static const connect_params* find(const std::vector<connect_params>& cp, const std::string& host, unsigned short port);
};
class rule {
@@ -66,20 +68,21 @@ class incline_driver_sharded : public incline_driver_async_qtable {
virtual ~incline_driver_sharded();
std::string init(const std::string& host, unsigned short port);
virtual incline_def* create_def() const;
- std::vector<std::string> create_table_all(bool if_not_exists, incline_dbms* dbh) const;
- std::vector<std::string> drop_table_all(bool if_exists) const;
+ virtual std::vector<std::string> create_table_all(bool if_not_exists, incline_dbms* dbh) const;
+ virtual std::vector<std::string> drop_table_all(bool if_exists) const;
virtual void run_forwarder(int poll_interval, int log_fd) const;
virtual bool should_exit_loop() const;
const rule* rule_of(const std::string& file) const;
std::pair<std::string, unsigned short> get_hostport() const {
return make_pair(cur_host_, cur_port_);
}
+ bool is_src_host_of(const incline_def_sharded* def) const;
+ bool is_replicator_dest_host_of(const incline_def_sharded* def) const;
protected:
virtual void _build_insert_from_def(trigger_body& body, const incline_def* def, const std::string& src_table, action_t action, const std::vector<std::string>* cond) const;
virtual void _build_delete_from_def(trigger_body& body, const incline_def* def, const std::string& src_table, const std::vector<std::string>& cond) const;
virtual void _build_update_merge_from_def(trigger_body& body, const incline_def* def, const std::string& src_table, const std::vector<std::string>& cond) const;
virtual std::string do_build_direct_expr(const incline_def_async* def, const std::string& column_expr) const;
- bool _is_source_host_of(const incline_def_sharded* def) const;
};
#endif
@@ -58,7 +58,7 @@ incline_fw_replicator::run()
if (dest_dbh == NULL) {
dest_dbh = dest_cp_.connect();
vector<vector<incline_dbms::value_t> > res;
- dbh_->query(res,
+ dest_dbh->query(res,
"SELECT last_id FROM _iq_repl WHERE tbl_name='"
+ dest_dbh->escape(def()->destination()) + '\'');
last_id = res.empty() ? string("0") : *res[0][0];
@@ -0,0 +1,8 @@
+use strict;
+use warnings;
+
+use List::MoreUtils qw(apply);
+
+$ENV{TEST_DBMS} = 'postgresql';
+
+require(apply { s/\-pgsql.t$/\.pl/ } $0);
View
@@ -5,6 +5,7 @@
use DBI;
use InclineTest;
+use Scope::Guard;
use Test::More;
# skip tests if dbms does not exist
@@ -15,7 +16,7 @@
postgresql => {},
);
-plan tests => 11;
+plan tests => 27;
my @incline_cmd = (
qw(src/incline),
@@ -61,24 +62,6 @@
),
'create source table',
);
- ok(
- system(
- @incline_cmd,
- "--host=$db_host",
- "--port=$db_port",
- 'create-queue',
- ) == 0,
- 'create queue',
- );
- ok(
- system(
- @incline_cmd,
- "--host=$db_host",
- "--port=$db_port",
- 'create-trigger',
- ) == 0,
- 'create trigger',
- );
} else {
ok(
$dbh[-1]->do(
@@ -87,6 +70,89 @@
'create dest table',
);
}
+ ok(
+ system(
+ @incline_cmd,
+ "--host=$db_host",
+ "--port=$db_port",
+ 'create-queue',
+ ) == 0,
+ 'create queue',
+ );
+ ok(
+ system(
+ @incline_cmd,
+ "--host=$db_host",
+ "--port=$db_port",
+ 'create-trigger',
+ ) == 0,
+ 'create trigger',
+ );
}
+sub cmpf {
+ my $slave = shift;
+ my $n = $dbh[0]->selectall_arrayref(
+ 'SELECT MAX(_iq_id) FROM _iq_incline_dest',
+ )->[0]->[0];
+ sleep 1 while do {
+ my $r = $dbh[$slave]->selectall_arrayref(
+ q{SELECT last_id FROM _iq_repl WHERE tbl_name='incline_dest'},
+ );
+ $n != (@$r ? $r->[0]->[0] : 0);
+ };
+ return (
+ $dbh[0]->selectall_arrayref('SELECT * FROM incline_src'),
+ $dbh[$slave]->selectall_arrayref('SELECT * FROM incline_dest'),
+ );
+}
+
+sub start_fw {
+ my $fw_pid;
+ unless ($fw_pid = fork()) {
+ my ($db_host, $db_port) = split /:/, $db_nodes[0], 2;
+ exec_cmd(
+ @incline_cmd,
+ "--host=$db_host",
+ "--port=$db_port",
+ 'forward',
+ );
+ die "failed to exec forwarder: $?";
+ }
+ Scope::Guard->new(sub { kill 9, $fw_pid if $fw_pid });
+}
+
+# start forwarder
+my $fw = start_fw();
+
+ok(
+ $dbh[0]->do(q{INSERT into incline_src (message) VALUES ('hello')}),
+ 'insert',
+);
+is_deeply(cmpf($_), "post insertion check (node $_)")
+ for qw/1 2/;
+ok(
+ $dbh[0]->do(q{UPDATE incline_src SET message='ciao'}),
+ 'update',
+);
+is_deeply(cmpf($_), "post update check (node $_)")
+ for qw/1 2/;
+ok(
+ $dbh[0]->do(q{INSERT into incline_src (message) VALUES ('aloha')}),
+ 'insert 2',
+);
+is_deeply(cmpf($_), "post insertion check 2 (node $_)")
+ for qw/1 2/;
+ok(
+ $dbh[0]->do(q{DELETE FROM incline_src WHERE message='ciao'}),
+ 'delete',
+);
+is_deeply(cmpf($_), "post deletion check (node $_)")
+ for qw/1 2/;
+
+# TODO add partial stop tests
+
+undef $fw;
+@dbh = ();
+
1;

0 comments on commit b9334ca

Please sign in to comment.