Permalink
Browse files

Merge branch 'hotfix/bug#1'

  • Loading branch information...
2 parents 668d8fd + c1ad24e commit 43575d7a28619f10bf739c2a60bac90747fa443a @kuenishi kuenishi committed Nov 15, 2011
Showing with 43 additions and 20 deletions.
  1. +9 −0 src/common/membership.cpp
  2. +2 −0 src/common/membership.hpp
  3. +20 −9 src/common/zk.cpp
  4. +6 −5 src/common/zk.hpp
  5. +5 −5 src/server/classifier.cpp
  6. +1 −1 src/server/classifier_serv.cpp
@@ -60,6 +60,11 @@ namespace jubatus{
build_existence_path(path, ip, port, path1);
z.create(path1, "", true);
}
+
+ // set exit zlistener here
+ pfi::lang::function <void()> f = &force_exit;
+ z.push_cleanup(f);
+
return true;
}
@@ -84,4 +89,8 @@ namespace jubatus{
return true;
}
+ void force_exit(){
+ exit(-1);
+ }
+
}
@@ -45,4 +45,6 @@ namespace jubatus{
bool push_cleanup(zk&, pfi::lang::function<void()>&);
+ void force_exit();
+
}
View
@@ -44,7 +44,10 @@ namespace jubatus{
}
do{
- zh_ = zookeeper_init(hosts.c_str(), NULL, timeout, 0, NULL, 0);
+ // FIXME?: this loop will call zookeeper_init many times till
+ // the state got ZOO_CONNECTING_STAT
+ // timeout is supposed to be ms??
+ zh_ = zookeeper_init(hosts.c_str(), NULL, timeout * 1000, 0, NULL, 0);
if(zh_ == NULL){
perror("");
throw std::runtime_error("cannot init zk");
@@ -170,33 +173,41 @@ namespace jubatus{
const std::string& zk::get_hosts()const{ return hosts_; }
- void zkmutex::lock(){
+ bool zkmutex::lock(){
pfi::concurrent::scoped_lock lk(m_);
LOG(ERROR) << "not implemented:" << __func__;
- assert(false);
+ while(!has_lock_){
+ if(try_lock()) break;
+ }
+ return true;
};
bool zkmutex::try_lock(){
pfi::concurrent::scoped_lock lk(m_);
if(has_lock_)return has_lock_;
string prefix = path_ + "/lock_";
zk_->create_seq(prefix, seqfile_);
-
+
+ if(seqfile_ == "") return false;
+
vector<string> list;
zk_->list(path_, list);
+
+ if(list.empty()) return false;
+
has_lock_ = ((path_ + "/" + list[0]) == seqfile_);
if(not has_lock_){
- // DLOG(INFO) << "mine" << seqfile_;
- // DLOG(INFO) << "top " << path_ + "/" + list[0];
zk_->remove(seqfile_);
}
- DLOG(INFO) << "got lock for " << path_ << " (" << seqfile_ << ") "; //"couldn't acquire lock of " << path_;
+ DLOG(INFO) << "got lock for " << path_ << " (" << seqfile_ << ") ";
return has_lock_;
};
- void zkmutex::unlock(){
+ bool zkmutex::unlock(){
pfi::concurrent::scoped_lock lk(m_);
- if(has_lock_)
+ if(has_lock_){
zk_->remove(seqfile_);
+ }
+ return true;
};
void mywatcher(zhandle_t* zh, int type, int state, const char* path, void* p){
View
@@ -34,7 +34,8 @@ namespace jubatus{
// TODO: write zk mock and test them all?
class zk{
public:
- zk(const std::string& hosts, int timeout = 1024, const std::string& logfile = "");
+ // timeout [ms]
+ zk(const std::string& hosts, int timeout = 10, const std::string& logfile = "");
~zk();
void create(const std::string& path, const std::string& payload = "", bool ephemeral = false);
@@ -71,18 +72,18 @@ namespace jubatus{
};
// TODO: write zk mock and test them all?
- class zkmutex{
+ class zkmutex : public pfi::concurrent::lockable {
public:
zkmutex(pfi::lang::shared_ptr<zk,
pfi::concurrent::threading_model::multi_thread>& z, const std::string& path):
zk_(z),path_(path),has_lock_(false){};
- ~zkmutex(){
+ virtual ~zkmutex(){
this->unlock();
}
- void lock();
+ bool lock();
bool try_lock();
- void unlock();
+ bool unlock();
private:
pfi::lang::shared_ptr<zk, pfi::concurrent::threading_model::multi_thread> zk_;
View
@@ -58,7 +58,7 @@ int main(int argc, char *argv[])
cmdline::parser p;
p.add<int>("rpc-port", 'p', "port number", false, 9199);
p.add<int>("thread", 'c', "thread number", false, 2);
- p.add<int>("timeout", 't', "time out", false, 10);
+ p.add<int>("timeout", 't', "time out (sec)", false, 10);
p.add<std::string>("storage", 'S', "storage type", false, "local");
p.add<std::string>("zookeeper", 'z', "zookeeper location", false);
p.add<std::string>("name", 'n', "learning machine instance name", true);
@@ -89,8 +89,8 @@ int main(int argc, char *argv[])
std::string zkcluster = (p.get<std::string>("zookeeper") == "")? "localhost:2181" : p.get<std::string>("zookeeper");
- shared_ptr<jubatus::zk,
- pfi::concurrent::threading_model::multi_thread> z(new jubatus::zk(zkcluster, timeout, logfile.c_str()));
+ shared_ptr<jubatus::zk, pfi::concurrent::threading_model::multi_thread>
+ z(new jubatus::zk(zkcluster, timeout, logfile.c_str()));
st.reset(storage::storage_factory::create_storage(p.get<std::string>("storage")));
vector<string> list;
@@ -112,8 +112,8 @@ int main(int argc, char *argv[])
}
register_actor(*z, name, self, port);
- shared_ptr<mixer,
- pfi::concurrent::threading_model::multi_thread> m(new mixer(z, name, &classifier::server::mix));
+ shared_ptr<mixer, pfi::concurrent::threading_model::multi_thread>
+ m(new mixer(z, name, &classifier::server::mix));
classifier::server s(st, m, p.get<std::string>("tmpdir"));
m->start();
@@ -122,7 +122,7 @@ result<int> server::train(string name, std::vector<std::pair<std::string, datum>
result<std::vector<estimate_results> > server::classify(string name, std::vector<datum> data) {
std::vector<estimate_results> ret;
scoped_lock lk(rlock(m_));
- DLOG(INFO) << __func__;
+
if (!classifier_){
LOG(ERROR) << __func__ << ": config is not set";
return result<std::vector<estimate_results> >::fail("config_not_set");

0 comments on commit 43575d7

Please sign in to comment.