Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'release/0.3.3'

  • Loading branch information...
commit 22ce13ec7a523c65897266b09a51ef9807ac9a9c 2 parents f3a80c8 + 6388a1e
@rimms rimms authored
Showing with 4,518 additions and 2,621 deletions.
  1. +4 −1 Makefile
  2. +19 −0 README.rst
  3. +2 −2 configure
  4. +125 −112 src/common/cached_zk.cpp
  5. +23 −22 src/common/cached_zk.hpp
  6. +35 −30 src/common/cached_zk_test.cpp
  7. +108 −97 src/common/cht.cpp
  8. +44 −43 src/common/cht.hpp
  9. +2 −2 src/common/cht_test.cpp
  10. +16 −5 src/common/global_id_generator.cpp
  11. +6 −5 src/common/global_id_generator.hpp
  12. +6 −5 src/common/hash.hpp
  13. +25 −18 src/common/lock_service.cpp
  14. +60 −60 src/common/lock_service.hpp
  15. +108 −98 src/common/membership.cpp
  16. +28 −27 src/common/membership.hpp
  17. +77 −0 src/common/mprpc/rpc_client.cpp
  18. +21 −1 src/common/mprpc/rpc_client.hpp
  19. +3 −2 src/common/mprpc/rpc_client_test.cpp
  20. +2 −2 src/common/mprpc/wscript
  21. +1 −1  src/common/rpc_util.hpp
  22. +67 −50 src/common/util.cpp
  23. +10 −3 src/common/util.hpp
  24. +14 −0 src/common/util_test.cpp
  25. +16 −2 src/common/wscript
  26. +248 −219 src/common/zk.cpp
  27. +72 −70 src/common/zk.hpp
  28. +11 −10 src/common/zk_test.cpp
  29. +2 −1  src/framework.hpp
  30. +0 −409 src/framework/jubatus_serv.cpp
  31. +0 −111 src/framework/jubatus_serv.hpp
  32. +6 −2 src/framework/keeper.cpp
  33. +18 −18 src/framework/keeper.hpp
  34. +34 −65 src/framework/mixable.hpp
  35. +89 −0 src/framework/mixable_test.cpp
  36. +0 −99 src/framework/mixer.cpp
  37. +0 −87 src/framework/mixer.hpp
  38. +51 −0 src/framework/mixer/dummy_mixer.hpp
  39. +257 −0 src/framework/mixer/linear_mixer.cpp
  40. +97 −0 src/framework/mixer/linear_mixer.hpp
  41. +86 −0 src/framework/mixer/linear_mixer_test.cpp
  42. +48 −0 src/framework/mixer/mixer.hpp
  43. +43 −0 src/framework/mixer/mixer_factory.cpp
  44. +34 −0 src/framework/mixer/mixer_factory.hpp
  45. +31 −0 src/framework/mixer/wscript
  46. +96 −0 src/framework/server_base.cpp
  47. +70 −0 src/framework/server_base.hpp
  48. +78 −0 src/framework/server_helper.cpp
  49. +145 −0 src/framework/server_helper.hpp
  50. +19 −6 src/framework/server_util.cpp
  51. +5 −22 src/framework/server_util.hpp
  52. +14 −0 src/framework/server_util_test.cpp
  53. +27 −7 src/framework/wscript
  54. +1 −1  src/fv_converter/character_ngram.cpp
  55. +1 −1  src/fv_converter/character_ngram.hpp
  56. +10 −0 src/fv_converter/converter_config.cpp
  57. +5 −1 src/fv_converter/converter_config.hpp
  58. +27 −0 src/fv_converter/converter_config_test.cpp
  59. +52 −6 src/fv_converter/datum_to_fv_converter.cpp
  60. +9 −1 src/fv_converter/datum_to_fv_converter.hpp
  61. +37 −38 src/fv_converter/datum_to_fv_converter_test.cpp
  62. +12 −1 src/fv_converter/dynamic_loader.cpp
  63. +1 −1  src/fv_converter/dynamic_splitter.cpp
  64. +1 −1  src/fv_converter/dynamic_splitter.hpp
  65. +47 −0 src/fv_converter/feature_hasher.cpp
  66. +37 −0 src/fv_converter/feature_hasher.hpp
  67. +48 −0 src/fv_converter/feature_hasher_test.cpp
  68. +0 −3  src/fv_converter/re2_filter.hpp
  69. +1 −1  src/fv_converter/space_splitter.cpp
  70. +1 −1  src/fv_converter/space_splitter.hpp
  71. +1 −1  src/fv_converter/test_splitter.cpp
  72. +1 −1  src/fv_converter/weight_manager.cpp
  73. +2 −2 src/fv_converter/weight_manager_test.cpp
  74. +1 −1  src/fv_converter/without_split.cpp
  75. +1 −1  src/fv_converter/without_split.hpp
  76. +1 −1  src/fv_converter/word_splitter.hpp
  77. +3 −1 src/fv_converter/wscript
  78. +21 −8 src/plugin/fv_converter/mecab_splitter.cpp
  79. +2 −2 src/plugin/fv_converter/mecab_splitter.hpp
  80. +31 −0 src/plugin/fv_converter/mecab_splitter_test.cpp
  81. +1 −1  src/plugin/fv_converter/re2_splitter.cpp
  82. +1 −1  src/plugin/fv_converter/re2_splitter.hpp
  83. +1 −1  src/plugin/fv_converter/ux_splitter.cpp
  84. +1 −1  src/plugin/fv_converter/ux_splitter.hpp
  85. +13 −1 src/plugin/fv_converter/wscript
  86. +1 −0  src/recommender/inverted_index.cpp
  87. +1 −0  src/recommender/lsh.cpp
  88. +1 −1  src/recommender/minhash.cpp
  89. +10 −0 src/server/classifier_client.hpp
  90. +9 −9 src/server/classifier_impl.cpp
  91. +15 −9 src/server/classifier_keeper.cpp
  92. +48 −104 src/server/classifier_serv.cpp
  93. +25 −55 src/server/classifier_serv.hpp
  94. +0 −7 src/server/diffv.hpp
  95. +10 −0 src/server/graph_client.hpp
  96. +24 −24 src/server/graph_impl.cpp
  97. +27 −21 src/server/graph_keeper.cpp
  98. +158 −136 src/server/graph_serv.cpp
  99. +36 −32 src/server/graph_serv.hpp
  100. +59 −0 src/server/linear_function_mixer.cpp
  101. +24 −0 src/server/linear_function_mixer.hpp
  102. +96 −0 src/server/linear_function_mixer_test.cpp
  103. +32 −0 src/server/mixable_weight_manager.cpp
  104. +16 −32 src/server/mixable_weight_manager.hpp
  105. +10 −0 src/server/recommender_client.hpp
  106. +18 −18 src/server/recommender_impl.cpp
  107. +24 −18 src/server/recommender_keeper.cpp
  108. +51 −63 src/server/recommender_serv.cpp
  109. +32 −34 src/server/recommender_serv.hpp
  110. +50 −0 src/server/recommender_serv_test.cpp
  111. +22 −1 src/server/recommender_test.cpp
  112. +10 −0 src/server/regression_client.hpp
  113. +9 −9 src/server/regression_impl.cpp
  114. +15 −9 src/server/regression_keeper.cpp
  115. +47 −88 src/server/regression_serv.cpp
  116. +28 −58 src/server/regression_serv.hpp
  117. +10 −0 src/server/stat_client.hpp
  118. +14 −14 src/server/stat_impl.cpp
  119. +20 −14 src/server/stat_keeper.cpp
  120. +32 −13 src/server/stat_serv.cpp
  121. +30 −33 src/server/stat_serv.hpp
  122. +21 −1 src/server/test_util.hpp
  123. +32 −10 src/server/wscript
  124. +1 −1  src/stat/mixable_stat.cpp
  125. +1 −0  src/storage/storage_type.hpp
  126. +28 −0 src/storage/storage_type_test.cpp
  127. +1 −0  src/storage/wscript
  128. +14 −5 tools/generator/impl_generator.ml
  129. +15 −7 tools/generator/keeper_generator.ml
  130. +1 −1  tools/generator/server_generator.ml
  131. +4 −0 tools/packaging/allinone/.gitignore
  132. +445 −0 tools/packaging/allinone/jubapkg
  133. +20 −0 tools/packaging/allinone/jubapkg_version
  134. +1 −0  tools/packaging/allinone/metadata/debian/compat
  135. +15 −0 tools/packaging/allinone/metadata/debian/control
  136. +4 −0 tools/packaging/allinone/metadata/debian/jubatus.lintian-overrides
  137. +22 −0 tools/packaging/allinone/metadata/debian/rules
  138. +75 −0 tools/packaging/homebrew/jubatus.rb
  139. +25 −0 tools/packaging/homebrew/pficommon.rb
  140. BIN  unittest_gtest.py
  141. +5 −1 wscript
View
5 Makefile
@@ -1,4 +1,4 @@
-.PHONY: all
+.PHONY: all clean build check install
all:
./waf
@@ -11,3 +11,6 @@ build:
check:
./waf --checkall
+
+install:
+ ./waf install
View
19 README.rst
@@ -31,6 +31,25 @@ LGPL 2.1
Update history
--------------
+Release 0.3.3 2012/10/29
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+Improvements
+ - deb package (#14)
+ - Jubatus loads plugin from default directory (#57)
+ - Add hash_max_size option to learn in fixed-size memory (#67)
+ - OS X Homebrew packaging (#116)
+ - GCC compilation version <= 4.2 when zookeeper enabled (#60)
+ - Experimental support for Clang compilation (#100)
+ - Make the timeout smaller in unittest
+ - libmecab_splitter works well in multi-thread environment, and now only support mecab ver. 0.99 or later
+ - word_splitter::split method is now constant
+ - global_id_generator(standalone-mode) for graph, that supports 32 bit environment (#135)
+ - Use (document_frequency + 1) to calculate IDF weight to avoid inifinity
+
+Bugfix
+ - #94, #104, #106, #108, #110, #111, #113, #114, #117, #118, #124, #126, #129, #133, #138, #139, #146, #148
+
Release 0.3.2 2012/9/21
~~~~~~~~~~~~~~~~~~~~~~~
View
4 configure
@@ -1,3 +1,3 @@
-#! /bin/sh
+#!/bin/sh
-./waf configure $*
+./waf configure "$@"
View
237 src/common/cached_zk.cpp
@@ -22,132 +22,145 @@ using pfi::concurrent::scoped_lock;
using std::vector;
using std::string;
-namespace jubatus{
-namespace common{
-
- cached_zk::cached_zk(const std::string& hosts, int timeout, const std::string& logfile):
- zk(hosts, timeout, logfile)
- {
- };
-
- cached_zk::~cached_zk(){
- };
-
- void cached_zk::list(const std::string& path, std::vector<std::string>& out){
- out.clear();
- scoped_lock lk(m_);
- std::map<std::string, std::set<std::string> >::const_iterator it = list_cache_.find(path);
- if(it == list_cache_.end()){
- //first time. get list to create cache and set the watcher at the same time.
- {DLOG(INFO) << "creating cache: " << path; }
- std::set<std::string> tmp_list;
- list_(path, tmp_list);
- list_cache_[path] = tmp_list;
-
- for(std::set<std::string>::const_iterator i=tmp_list.begin();
- i!=tmp_list.end(); ++i){
- out.push_back(*i);
- }
-
- }else{
- for(std::set<std::string>::const_iterator i=it->second.begin();
- i!=it->second.end(); ++i){
- out.push_back(*i);
- }
+namespace jubatus {
+namespace common {
+
+cached_zk::cached_zk(const string& hosts, int timeout, const string& logfile):
+ zk(hosts, timeout, logfile)
+{
+}
+
+cached_zk::~cached_zk()
+{
+}
+
+void cached_zk::list(const string& path, vector<string>& out)
+{
+ out.clear();
+ scoped_lock lk(m_);
+ std::map<string, std::set<string> >::const_iterator it = list_cache_.find(path);
+ if (it == list_cache_.end()) {
+ //first time. get list to create cache and set the watcher at the same time.
+ { DLOG(INFO) << "creating cache: " << path; }
+ std::set<string> tmp_list;
+ list_(path, tmp_list);
+ list_cache_[path] = tmp_list;
+
+ for (std::set<string>::const_iterator i=tmp_list.begin();
+ i!=tmp_list.end(); ++i) {
+ out.push_back(*i);
}
- }
- void cached_zk::list_(const std::string& path, std::set<std::string>& out){
- out.clear();
- struct String_vector s;
- int rc = zoo_wget_children(zh_, path.c_str(), cached_zk::update_cache, this, &s);
-
- if(rc == ZOK){
- std::set<std::string> cache;
- for(int i=0; i<s.count; ++i){
- out.insert(s.data[i]);
- }
- }else{
- LOG(ERROR) << zerror(rc) << " (" << path << ")";
+ } else {
+ for (std::set<string>::const_iterator i=it->second.begin();
+ i!=it->second.end(); ++i) {
+ out.push_back(*i);
}
}
-
- void cached_zk::hd_list(const std::string& path, std::string& out){
- out.clear();
- scoped_lock lk(m_);
- const std::set<std::string>& list(list_cache_[path]);
- if(!list.empty())
- out = *(list.begin());
- };
- const std::string cached_zk::type()const{
- return "cached_zk";
- }
+}
- void cached_zk::reload_cache(const std::string& path){
- scoped_lock lk(m_);
- list_(path, list_cache_[path]);
- }
- void cached_zk::clear_cache(const char* path){
- scoped_lock lk(m_);
- if(path == NULL){
- list_cache_.clear();
- znode_cache_.clear();
- }else{
- list_cache_.erase(std::string(path));
- znode_cache_.erase(std::string(path));
+void cached_zk::list_(const string& path, std::set<std::string>& out)
+{
+ out.clear();
+ struct String_vector s;
+ int rc = zoo_wget_children(zh_, path.c_str(), cached_zk::update_cache, this, &s);
+
+ if (rc == ZOK) {
+ std::set<string> cache;
+ for (int i=0; i<s.count; ++i) {
+ out.insert(s.data[i]);
}
+ } else {
+ LOG(ERROR) << zerror(rc) << " (" << path << ")";
}
+}
- void cached_zk::update_cache(zhandle_t* zh, int type, int state,
- const char* path, void* ctx){
- cached_zk* zk = reinterpret_cast<cached_zk*>(ctx);
-
- if( type == ZOO_CHILD_EVENT ){
- // update cache
-
- { LOG(INFO) << "ZOO_CHILD_EVENT: " << path; }
- zk->reload_cache(path);
-
- }else if( type == ZOO_DELETED_EVENT || // clear one cache
- type == ZOO_NOTWATCHING_EVENT || // clear one cache?
- type == ZOO_SESSION_EVENT ){ // path == NULL, clear all cache
- // clear cache
- { LOG(INFO) << "ZOO_(DELETED|NOTWATCHING|SESSION)_EVENT: " << path; }
- zk->clear_cache(path);
- }
- // ZOO_CHANGED_EVENT => ignore (FIXME, when read() cache going to be modified this needs fix)
- // ZOO_CREATED_EVENT => ignore
+void cached_zk::hd_list(const string& path, string& out)
+{
+ out.clear();
+ scoped_lock lk(m_);
+ const std::set<string>& list(list_cache_[path]);
+ if (!list.empty())
+ out = *(list.begin());
+}
+
+const string cached_zk::type() const
+{
+ return "cached_zk";
+}
+
+void cached_zk::reload_cache(const string& path)
+{
+ scoped_lock lk(m_);
+ list_(path, list_cache_[path]);
+}
+
+void cached_zk::clear_cache(const char* path)
+{
+ scoped_lock lk(m_);
+ if (path == NULL) {
+ list_cache_.clear();
+ znode_cache_.clear();
+ } else {
+ list_cache_.erase(string(path));
+ znode_cache_.erase(string(path));
}
+}
+void cached_zk::update_cache(zhandle_t* zh, int type, int state,
+ const char* path, void* ctx)
+{
+ cached_zk* zk = static_cast<cached_zk*>(ctx);
- bool cached_zk::read(const std::string& path, std::string& out){
- scoped_lock lk(m_);
- std::map<std::string, std::string>::const_iterator it = znode_cache_.find(path);
+ if (type == ZOO_CHILD_EVENT) {
+ // update cache
- if(it == znode_cache_.end()){
- {DLOG(INFO) << "creating cache: " << path; }
+ { LOG(INFO) << "ZOO_CHILD_EVENT: " << path; }
+ zk->reload_cache(path);
- if(read_(path, out))
- znode_cache_[path] = out;
+ } else if (type == ZOO_DELETED_EVENT || // clear one cache
+ type == ZOO_NOTWATCHING_EVENT || // clear one cache?
+ type == ZOO_SESSION_EVENT) { // path == NULL, clear all cache
+ // clear cache
+ { LOG(INFO) << "ZOO_(DELETED|NOTWATCHING|SESSION)_EVENT: " << path; }
+ zk->clear_cache(path);
+ }
+ // ZOO_CHANGED_EVENT => ignore (FIXME, when read() cache going to be modified this needs fix)
+ // ZOO_CREATED_EVENT => ignore
+}
- }else{
- out = it->second;
- }
- return true;
+
+bool cached_zk::read(const string& path, string& out)
+{
+ scoped_lock lk(m_);
+ std::map<string, string>::const_iterator it = znode_cache_.find(path);
+
+ if (it == znode_cache_.end()) {
+ { DLOG(INFO) << "creating cache: " << path; }
+
+ if (read_(path, out))
+ znode_cache_[path] = out;
+
+ } else {
+ out = it->second;
}
- bool zk::read_(const std::string& path, std::string& out){
- char buf[1024];
- int buflen = 1024;
- int rc = zoo_wget(zh_, path.c_str(), cached_zk::update_cache, this, buf, &buflen, NULL);
-
- if(rc == ZOK){
- out = string(buf, buflen);
- return buflen <= 1024;
- }else{
- LOG(ERROR) << zerror(rc);
- return false;
- }
- };
-
+ return true;
}
+
+bool zk::read_(const string& path, string& out)
+{
+ char buf[1024];
+ int buflen = 1024;
+ int rc = zoo_wget(zh_, path.c_str(), cached_zk::update_cache, this, buf, &buflen, NULL);
+
+ if (rc == ZOK) {
+ out = string(buf, buflen);
+ return buflen <= 1024;
+ } else {
+ LOG(ERROR) << zerror(rc);
+ return false;
+ }
}
+
+} // common
+} // jubatus
View
45 src/common/cached_zk.hpp
@@ -21,34 +21,35 @@
#include <set>
#include "zk.hpp"
-namespace jubatus{
+namespace jubatus {
namespace common {
// TODO: write zk mock and test them all?
- class cached_zk : zk {
- public:
- // timeout [ms]
- cached_zk(const std::string& hosts, int timeout = 10, const std::string& logfile = "");
- virtual ~cached_zk();
+class cached_zk : public zk {
+public:
+ // timeout [ms]
+ cached_zk(const std::string& hosts, int timeout = 10, const std::string& logfile = "");
+ virtual ~cached_zk();
- void list(const std::string& path, std::vector<std::string>& out);
- void hd_list(const std::string& path, std::string& out);
+ void list(const std::string& path, std::vector<std::string>& out);
+ void hd_list(const std::string& path, std::string& out);
- // reads data (should be smaller than 1024B)
- bool read(const std::string& path, std::string& out);
+ // reads data (should be smaller than 1024B)
+ bool read(const std::string& path, std::string& out);
- const std::string type() const;
+ const std::string type() const;
- void check_and_update(const std::string& path);
- void clear_cache(const char* path);
- static void update_cache(zhandle_t*, int, int, const char*, void*);
- void reload_cache(const std::string& path);
+ void check_and_update(const std::string& path);
+ void clear_cache(const char* path);
+ static void update_cache(zhandle_t*, int, int, const char*, void*);
+ void reload_cache(const std::string& path);
- private:
- void list_(const std::string& path, std::set<std::string>& out);
- std::map<std::string, std::set<std::string> > list_cache_;
- std::map<std::string, std::string> znode_cache_;
+private:
+ void list_(const std::string& path, std::set<std::string>& out);
+ std::map<std::string, std::set<std::string> > list_cache_;
+ std::map<std::string, std::string> znode_cache_;
- };
-}
-}
+};
+
+} // common
+} // jubatus
View
65 src/common/cached_zk_test.cpp
@@ -26,36 +26,41 @@ using namespace jubatus::common;
namespace {
- std::string path, path1;
- std::string name_, name1_;
-
- class czk_test : public ::testing::Test {
- protected:
- pfi::lang::shared_ptr<jubatus::common::lock_service> zk_;
-
- czk_test(){
- zk_ = pfi::lang::shared_ptr<jubatus::common::lock_service>
- (common::create_lock_service("zk", "localhost:2181", 1024, "test.log"));
-
- name_ = build_loc_str("localhost", 10000);
- build_actor_path(path, name_);
- name1_ = build_loc_str("localhost", 10001);
- build_actor_path(path1, name1_);
-
- zk_->create(JUBATUS_BASE_PATH, "");
- zk_->create(ACTOR_BASE_PATH, "");
-
- zk_->create(path, "hoge0", true);
- zk_->create(path1, "hoge1", true);
- };
-
- virtual ~czk_test(){
- zk_->remove(path);
- zk_->remove(path1);
- };
- virtual void restart_process(){
- };
- };
+std::string path, path1;
+std::string name_, name1_;
+
+class czk_test : public ::testing::Test {
+protected:
+ pfi::lang::shared_ptr<jubatus::common::lock_service> zk_;
+
+ czk_test() {
+ zk_ = pfi::lang::shared_ptr<jubatus::common::lock_service>
+ (common::create_lock_service("zk", "localhost:2181", 1024, "test.log"));
+
+ std::string engine_name, engine_root;
+ engine_name = "test";
+ engine_root = ACTOR_BASE_PATH + "/" + engine_name;
+
+ name_ = build_loc_str("localhost", 10000);
+ build_actor_path(path, engine_name, name_);
+ name1_ = build_loc_str("localhost", 10001);
+ build_actor_path(path1, engine_name, name1_);
+
+ zk_->create(JUBATUS_BASE_PATH, "");
+ zk_->create(ACTOR_BASE_PATH, "");
+ zk_->create(engine_root, "");
+
+ zk_->create(path, "hoge0", true);
+ zk_->create(path1, "hoge1", true);
+ }
+
+ virtual ~czk_test() {
+ zk_->remove(path);
+ zk_->remove(path1);
+ }
+ virtual void restart_process() {
+ }
+};
TEST(czk, cached_zk_trivial) {
pfi::lang::shared_ptr<jubatus::common::lock_service> czk_;
View
205 src/common/cht.cpp
@@ -24,119 +24,130 @@
#include <pficommon/data/digest/md5.h>
#include <sstream>
-namespace jubatus{
-namespace common{
-
- std::string make_hash(const std::string& key){
- std::stringstream ss;
- ss << pfi::data::digest::md5sum(key);
- return ss.str();
- };
-
- cht::cht(cshared_ptr<lock_service> z, const std::string& type, const std::string& name):
- type_(type), name_(name), lock_service_(z)
- {
- }
+namespace jubatus {
+namespace common {
+
+std::string make_hash(const std::string& key)
+{
+ std::stringstream ss;
+ ss << pfi::data::digest::md5sum(key);
+ return ss.str();
+}
- cht::~cht(){}
+cht::cht(cshared_ptr<lock_service> z, const std::string& type, const std::string& name):
+ type_(type), name_(name), lock_service_(z)
+{
+}
- // register_node :: node -> bool;
- // creates /jubatus/actors/<name>/cht/<hash(ip_port)> with contents ip_port
- void cht::register_node(const std::string& ip, int port){
- std::string path;
- build_actor_path(path, type_, name_);
- path += "/cht";
+cht::~cht()
+{
+}
- for(unsigned int i=0; i<NUM_VSERV; ++i){
- std::string hashpath = path+"/"+make_hash(build_loc_str(ip, port, i));
- lock_service_->create(hashpath, build_loc_str(ip,port), true);
- DLOG(INFO) << "created " << hashpath;
- }
- }
-
- bool cht::find(const std::string& host, int port, std::vector<std::pair<std::string,int> >& out, size_t s){
- return find(build_loc_str(host, port), out, s);
+// register_node :: node -> bool;
+// creates /jubatus/actors/<name>/cht/<hash(ip_port)> with contents ip_port
+void cht::register_node(const std::string& ip, int port)
+{
+ std::string path;
+ build_actor_path(path, type_, name_);
+ path += "/cht";
+
+ for (unsigned int i=0; i<NUM_VSERV; ++i) {
+ std::string hashpath = path+"/"+make_hash(build_loc_str(ip, port, i));
+ lock_service_->create(hashpath, build_loc_str(ip,port), true);
+ DLOG(INFO) << "created " << hashpath;
}
+}
- // return at most n nodes, if theres nodes less than n, return size is also less than n.
- // find(hash) :: lock_service -> key -> [node] where hash(node0) <= hash(key) < hash(node1)
- bool cht::find(const std::string& key, std::vector<std::pair<std::string,int> >& out, size_t n){
- out.clear();
- std::vector<std::string> hlist;
- if(! get_hashlist_(key, hlist)){
- throw JUBATUS_EXCEPTION(not_found(key));
- }
- std::string hash = make_hash(key);
- std::string path;
- build_actor_path(path, type_, name_);
- path += "/cht";
-
- std::vector<std::string>::iterator node0 = std::lower_bound(hlist.begin(), hlist.end(), hash);
- size_t idx = int(node0 - hlist.begin()) % hlist.size();
- std::string loc;
- for(size_t i=0; i<n; ++i){
- std::string ip;
- int port;
- if(lock_service_->read(path + "/" + hlist[idx], loc)){
- revert(loc, ip, port);
- out.push_back(make_pair(ip,port));
- }else{
- // TODO: output log
- throw JUBATUS_EXCEPTION(not_found(path));
- }
- idx++;
- idx %= hlist.size();
- }
- return !hlist.size();
- }
+bool cht::find(const std::string& host, int port, std::vector<std::pair<std::string,int> >& out, size_t s)
+{
+ return find(build_loc_str(host, port), out, s);
+}
- std::pair<std::string,int> cht::find_predecessor(const std::string& host, int port){
- return find_predecessor(build_loc_str(host, port));
+// return at most n nodes, if theres nodes less than n, return size is also less than n.
+// find(hash) :: lock_service -> key -> [node] where hash(node0) <= hash(key) < hash(node1)
+bool cht::find(const std::string& key, std::vector<std::pair<std::string,int> >& out, size_t n)
+{
+ out.clear();
+ std::vector<std::string> hlist;
+ if (!get_hashlist_(key, hlist)) {
+ throw JUBATUS_EXCEPTION(not_found(key));
}
- std::pair<std::string,int> cht::find_predecessor(const std::string& key){
- std::vector<std::string> hlist;
- get_hashlist_(key, hlist);
-
- std::string hash = make_hash(key);
- std::string path;
- build_actor_path(path, type_, name_);
- path += "/cht";
-
- std::vector<std::string>::iterator node0 = std::lower_bound(hlist.begin(), hlist.end(), hash);
- size_t idx = (int(node0 - hlist.begin())+ hlist.size() -1) % hlist.size();
-
+ std::string hash = make_hash(key);
+ std::string path;
+ build_actor_path(path, type_, name_);
+ path += "/cht";
+
+ std::vector<std::string>::iterator node0 = std::lower_bound(hlist.begin(), hlist.end(), hash);
+ size_t idx = int(node0 - hlist.begin()) % hlist.size();
+ std::string loc;
+ for (size_t i = 0; i < n; ++i) {
std::string ip;
int port;
- std::string loc;
- if(lock_service_->read(path + "/" + hlist[idx], loc)){
+ if (lock_service_->read(path + "/" + hlist[idx], loc)) {
revert(loc, ip, port);
- return make_pair(ip, port);
- }else{
+ out.push_back(make_pair(ip,port));
+ } else {
+ // TODO: output log
throw JUBATUS_EXCEPTION(not_found(path));
- // TODO: output log and throw exception
}
+ idx++;
+ idx %= hlist.size();
}
+ return !hlist.size();
+}
- void cht::setup_cht_dir(lock_service& ls, const std::string& type, const std::string& name){
- std::string path;
- build_actor_path(path, type, name);
- ls.create(path, "");
- path += "/cht";
- ls.create(path, "");
- }
+std::pair<std::string,int> cht::find_predecessor(const std::string& host, int port)
+{
+ return find_predecessor(build_loc_str(host, port));
+}
- bool cht::get_hashlist_(const std::string& key, std::vector<std::string>& hlist){
- hlist.clear();
- std::string path;
- build_actor_path(path, type_, name_);
- path += "/cht";
- std::vector<std::pair<std::string, int> > ret;
- lock_service_->list(path, hlist);
-
- if(hlist.empty()) return false;
- std::sort(hlist.begin(), hlist.end());
- return true;
+std::pair<std::string,int> cht::find_predecessor(const std::string& key)
+{
+ std::vector<std::string> hlist;
+ get_hashlist_(key, hlist);
+
+ std::string hash = make_hash(key);
+ std::string path;
+ build_actor_path(path, type_, name_);
+ path += "/cht";
+
+ std::vector<std::string>::iterator node0 = std::lower_bound(hlist.begin(), hlist.end(), hash);
+ size_t idx = (int(node0 - hlist.begin())+ hlist.size() -1) % hlist.size();
+
+ std::string ip;
+ int port;
+ std::string loc;
+ if (lock_service_->read(path + "/" + hlist[idx], loc)) {
+ revert(loc, ip, port);
+ return make_pair(ip, port);
+ } else {
+ throw JUBATUS_EXCEPTION(not_found(path));
+ // TODO: output log and throw exception
}
+}
+void cht::setup_cht_dir(lock_service& ls, const std::string& type, const std::string& name)
+{
+ std::string path;
+ build_actor_path(path, type, name);
+ ls.create(path, "");
+ path += "/cht";
+ ls.create(path, "");
}
+
+bool cht::get_hashlist_(const std::string& key, std::vector<std::string>& hlist)
+{
+ hlist.clear();
+ std::string path;
+ build_actor_path(path, type_, name_);
+ path += "/cht";
+ std::vector<std::pair<std::string, int> > ret;
+ lock_service_->list(path, hlist);
+
+ if (hlist.empty()) return false;
+ std::sort(hlist.begin(), hlist.end());
+ return true;
}
+
+} // common
+} // jubatus
View
87 src/common/cht.hpp
@@ -27,46 +27,47 @@
#include <pficommon/lang/cast.h>
-namespace jubatus{
-namespace common{
-
- static const unsigned int NUM_VSERV = 8;
-
- // this function does not seem pure, take care when calling from multiple threads
- std::string make_hash(const std::string& key);
-
- class cht{
- public:
- cht(cshared_ptr<lock_service>, const std::string& type, const std::string& name);
- ~cht();
-
- // node :: ip_port
- // register_node :: node -> bool;
- void register_node(const std::string&, int);
-
- template <typename T>
- bool find(const T& t, std::vector<std::pair<std::string,int> > & ret, size_t s){
- std::string k = pfi::lang::lexical_cast<std::string>(t);
- return find(k, ret, s);
- };
-
- // find(hash) :: key -> [node] where hash(node0) <= hash(key) < hash(node1) < hash(node2) < ...
- bool find(const std::string& host, int port, std::vector<std::pair<std::string,int> >&, size_t);
- bool find(const std::string&, std::vector<std::pair<std::string,int> >&, size_t);
-
- std::pair<std::string,int> find_predecessor(const std::string& host, int port);
- std::pair<std::string,int> find_predecessor(const std::string&);
-
- // run just once in starting up the process: creates <name>/cht directory.
- static void setup_cht_dir(lock_service&, const std::string&, const std::string&);
-
- private:
-
- bool get_hashlist_(const std::string& key, std::vector<std::string>&);
-
- const std::string type_;
- const std::string name_;
- cshared_ptr<lock_service> lock_service_;
- }; //cht
-}
-}
+namespace jubatus {
+namespace common {
+
+static const unsigned int NUM_VSERV = 8;
+
+// this function does not seem pure, take care when calling from multiple threads
+std::string make_hash(const std::string& key);
+
+class cht {
+public:
+ cht(cshared_ptr<lock_service>, const std::string& type, const std::string& name);
+ ~cht();
+
+ // node :: ip_port
+ // register_node :: node -> bool;
+ void register_node(const std::string&, int);
+
+ template <typename T>
+ bool find(const T& t, std::vector<std::pair<std::string,int> > & ret, size_t s) {
+ std::string k = pfi::lang::lexical_cast<std::string>(t);
+ return find(k, ret, s);
+ }
+
+ // find(hash) :: key -> [node] where hash(node0) <= hash(key) < hash(node1) < hash(node2) < ...
+ bool find(const std::string& host, int port, std::vector<std::pair<std::string,int> >&, size_t);
+ bool find(const std::string&, std::vector<std::pair<std::string,int> >&, size_t);
+
+ std::pair<std::string,int> find_predecessor(const std::string& host, int port);
+ std::pair<std::string,int> find_predecessor(const std::string&);
+
+ // run just once in starting up the process: creates <name>/cht directory.
+ static void setup_cht_dir(lock_service&, const std::string&, const std::string&);
+
+private:
+
+ bool get_hashlist_(const std::string& key, std::vector<std::string>&);
+
+ const std::string type_;
+ const std::string name_;
+ cshared_ptr<lock_service> lock_service_;
+}; //cht
+
+} // common
+} // jubatus
View
4 src/common/cht_test.cpp
@@ -31,5 +31,5 @@ TEST(cht, make_hash) {
ASSERT_NE(hash, hash3);
}
-}
-}
+} // common
+} // jubatus
View
21 src/common/global_id_generator.cpp
@@ -18,6 +18,10 @@
#include "global_id_generator.hpp"
#include <cassert>
+#ifndef ATOMIC_I8_SUPPORT
+#include <pficommon/concurrent/lock.h>
+#endif
+
namespace jubatus { namespace common {
@@ -36,7 +40,12 @@ global_id_generator::~global_id_generator()
uint64_t global_id_generator::generate()
{
if(is_standalone_){
+#ifdef ATOMIC_I8_SUPPORT
return __sync_fetch_and_add(&counter_, 1);
+#else
+ pfi::concurrent::scoped_lock lk(counter_mutex_);
+ return ++counter_;
+#endif
}else{
#ifdef HAVE_ZOOKEEPER_H
@@ -52,14 +61,16 @@ uint64_t global_id_generator::generate()
}
}
-#ifdef HAVE_ZOOKEEPER_H
void global_id_generator::set_ls(cshared_ptr<lock_service>& ls,
const std::string& path_prefix)
{
- path_ = path_prefix + "/id_generator";
- ls_ = ls;
- ls_->create(path_);
-}
+#ifdef HAVE_ZOOKEEPER_H
+ if (! is_standalone_) {
+ path_ = path_prefix + "/id_generator";
+ ls_ = ls;
+ ls_->create(path_);
+ }
#endif
+}
}}
View
11 src/common/global_id_generator.hpp
@@ -19,9 +19,11 @@
#include <stdint.h>
-#ifdef HAVE_ZOOKEEPER_H
#include "lock_service.hpp"
#include "shared_ptr.hpp"
+
+#ifndef ATOMIC_I8_SUPPORT
+#include <pficommon/concurrent/mutex.h>
#endif
namespace jubatus { namespace common {
@@ -35,20 +37,19 @@ class global_id_generator
uint64_t generate();
-#ifdef HAVE_ZOOKEEPER_H
void set_ls(cshared_ptr<lock_service>&, const std::string&);
-#endif
private:
global_id_generator();
bool is_standalone_;
uint64_t counter_;
-#ifdef HAVE_ZOOKEEPER_H
std::string path_;
cshared_ptr<lock_service> ls_;
-#endif
+#ifndef ATOMIC_I8_SUPPORT
+ pfi::concurrent::mutex counter_mutex_;
+#endif
};
}}
View
11 src/common/hash.hpp
@@ -20,13 +20,14 @@
#include <string>
#include <stdint.h>
-namespace jubatus{
+namespace jubatus {
-class hash_util{
+class hash_util {
public:
- static uint64_t calc_string_hash(const std::string& s){
+ static uint64_t calc_string_hash(const std::string& s) {
+ // FNV-1 hash function
uint64_t hash = 14695981039346656037LLU;
- for (size_t i = 0; i < s.size(); ++i){
+ for (size_t i = 0; i < s.size(); ++i) {
hash *= 1099511628211LLU;
hash ^= s[i];
}
@@ -34,4 +35,4 @@ class hash_util{
}
};
-}
+} // jubatus
View
43 src/common/lock_service.cpp
@@ -15,43 +15,50 @@
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#include "exception.hpp"
#include "lock_service.hpp"
+#include "exception.hpp"
#include "zk.hpp"
#include "cached_zk.hpp"
-namespace jubatus{ namespace common{
+namespace jubatus {
+namespace common {
lock_service* create_lock_service(const std::string& name,
- const std::string& hosts, const int timeout, const std::string& log){
-
- if(name == "zk"){
- return reinterpret_cast<lock_service*>(new zk(hosts, timeout, log));
- }
- else if(name == "cached_zk"){
- return reinterpret_cast<lock_service*>(new cached_zk(hosts, timeout, log));
+ const std::string& hosts, const int timeout, const std::string& log)
+{
+ if (name == "zk") {
+ return new zk(hosts, timeout, log);
+ } else if (name == "cached_zk") {
+ return new cached_zk(hosts, timeout, log);
}
throw JUBATUS_EXCEPTION(jubatus::exception::runtime_error(std::string("unknown lock_service: ") + name));
}
lock_service_mutex::lock_service_mutex(lock_service& ls, const std::string& path):
- path_(path){
- if(ls.type() == "zk" or ls.type() == "cached_zk"){
- impl_ = reinterpret_cast<try_lockable*>(new zkmutex(ls, path));
- }else{
+ path_(path)
+{
+ if (ls.type() == "zk" || ls.type() == "cached_zk") {
+ impl_ = new zkmutex(ls, path);
+ } else {
{ LOG(ERROR) << "unknown lock_service: " << ls.type(); }
throw JUBATUS_EXCEPTION(jubatus::exception::runtime_error(std::string("unknown lock_service: ") + ls.type()));
}
-};
+}
-bool lock_service_mutex::lock(){
+bool lock_service_mutex::lock()
+{
return impl_->lock();
}
-bool lock_service_mutex::try_lock(){
+
+bool lock_service_mutex::try_lock()
+{
return impl_->try_lock();
}
-bool lock_service_mutex::unlock(){
+
+bool lock_service_mutex::unlock()
+{
return impl_->unlock();
}
-}}
+} // common
+} // jubatus
View
120 src/common/lock_service.hpp
@@ -25,63 +25,63 @@
#include <stdint.h>
-namespace jubatus{
-namespace common{
-
- // TODO: write lock_service mock and test them all?
- class lock_service{
- public:
- // timeout [ms]
- lock_service(){};
- virtual ~lock_service(){};
-
- virtual void force_close() = 0;
- virtual void create(const std::string& path, const std::string& payload = "", bool ephemeral = false) = 0;
- virtual void remove(const std::string& path) = 0;
- virtual bool exists(const std::string& path) = 0;
-
- virtual bool bind_watcher(const std::string& path, pfi::lang::function<void(int,int,std::string)>&) = 0;
-
- // ephemeral only
- virtual void create_seq(const std::string& path, std::string&) = 0;
- virtual uint64_t create_id(const std::string& path, uint32_t prefix = 0) = 0;
-
- virtual void list(const std::string& path, std::vector<std::string>& out) = 0;
- virtual void hd_list(const std::string& path, std::string& out) = 0;
-
- // reads data (should be smaller than 1024B)
- virtual bool read(const std::string& path, std::string& out) = 0;
-
- virtual void push_cleanup(pfi::lang::function<void()>& f) = 0;
- virtual void run_cleanup() = 0;
-
- virtual const std::string& get_hosts()const = 0;
- virtual const std::string type() const = 0;
- };
-
- class try_lockable : public pfi::concurrent::lockable{
- public:
- virtual bool try_lock() = 0;
- };
-
- class lock_service_mutex : public try_lockable {
- public:
- lock_service_mutex(lock_service& ls, const std::string& path);
- virtual ~lock_service_mutex(){
- delete impl_;
- }
-
- bool lock();
- bool try_lock();
- bool unlock();
-
- protected:
- try_lockable* impl_;
- std::string path_;
- };
-
- lock_service* create_lock_service(const std::string&,
- const std::string& hosts, const int timeout, const std::string& log = "/tmp/zklog");
- // void mywatcher(zhandle_t*, int, int, const char*, void*);
-}
-}
+namespace jubatus {
+namespace common {
+
+// TODO: write lock_service mock and test them all?
+class lock_service {
+public:
+ // timeout [ms]
+ lock_service() {};
+ virtual ~lock_service() {};
+
+ virtual void force_close() = 0;
+ virtual void create(const std::string& path, const std::string& payload = "", bool ephemeral = false) = 0;
+ virtual void remove(const std::string& path) = 0;
+ virtual bool exists(const std::string& path) = 0;
+
+ virtual bool bind_watcher(const std::string& path, pfi::lang::function<void(int,int,std::string)>&) = 0;
+
+ // ephemeral only
+ virtual void create_seq(const std::string& path, std::string&) = 0;
+ virtual uint64_t create_id(const std::string& path, uint32_t prefix = 0) = 0;
+
+ virtual void list(const std::string& path, std::vector<std::string>& out) = 0;
+ virtual void hd_list(const std::string& path, std::string& out) = 0;
+
+ // reads data (should be smaller than 1024B)
+ virtual bool read(const std::string& path, std::string& out) = 0;
+
+ virtual void push_cleanup(pfi::lang::function<void()>& f) = 0;
+ virtual void run_cleanup() = 0;
+
+ virtual const std::string& get_hosts() const = 0;
+ virtual const std::string type() const = 0;
+};
+
+class try_lockable : public pfi::concurrent::lockable {
+public:
+ virtual bool try_lock() = 0;
+};
+
+class lock_service_mutex : public try_lockable {
+public:
+ lock_service_mutex(lock_service& ls, const std::string& path);
+ virtual ~lock_service_mutex() {
+ delete impl_;
+ }
+
+ bool lock();
+ bool try_lock();
+ bool unlock();
+
+protected:
+ try_lockable* impl_;
+ std::string path_;
+};
+
+lock_service* create_lock_service(const std::string&,
+ const std::string& hosts, const int timeout, const std::string& log = "/tmp/zklog");
+
+} // common
+} // jubatus
View
206 src/common/membership.cpp
@@ -18,122 +18,132 @@
#include "membership.hpp"
#include <cstdlib>
-
#include <iostream>
-using namespace std;
-
#include <pficommon/lang/cast.h>
+
+using namespace std;
using namespace pfi::lang;
-namespace jubatus{
-namespace common{
-
- // "127.0.0.1" -> 9199 -> "127.0.0.1_9199"
- std::string build_loc_str(const std::string& ip, int port, unsigned int i){
- std::string ret = ip + "_" + lexical_cast<std::string,int>(port);;
- if(i>0){
- ret += "_";
- ret += lexical_cast<std::string,int>(i);
- }
- return ret;
- }
+namespace jubatus {
+namespace common {
- // /path/base -> 127.0.0.1 -> 9199 -> /path/base/127.0.0.1_9199
- void build_existence_path(const std::string& base, const std::string& ip, int port, std::string& out){
- out = base + "/" + ip + "_" + lexical_cast<std::string,int>(port);
- }
- void build_actor_path(std::string& path, const std::string& type, const std::string& name){
- path = ACTOR_BASE_PATH + "/" + type + "/" + name;
+// "127.0.0.1" -> 9199 -> "127.0.0.1_9199"
+string build_loc_str(const string& ip, int port, unsigned int i)
+{
+ string ret = ip + "_" + lexical_cast<string, int>(port);
+ if (i > 0) {
+ ret += "_";
+ ret += lexical_cast<string,int>(i);
}
+ return ret;
+}
- // 127.0.0.1_9199 -> (127.0.0.1, 9199)
- bool revert(const std::string& name, std::string& ip, int& port){
- ip = name.substr(0, name.find("_"));
- port = atoi(name.substr(name.find("_") + 1).c_str());
- return true;
- }
+// /path/base -> 127.0.0.1 -> 9199 -> /path/base/127.0.0.1_9199
+void build_existence_path(const string& base, const string& ip, int port, string& out)
+{
+ out = base + "/" + ip + "_" + lexical_cast<string, int>(port);
+}
- // zk -> name -> ip -> port -> bool
- bool register_actor(lock_service& z,
- const std::string& type, const std::string& name,
- const std::string& ip, int port){
+void build_actor_path(string& path, const string& type, const string& name)
+{
+ path = ACTOR_BASE_PATH + "/" + type + "/" + name;
+}
- std::string path;
- build_actor_path(path, type, name);
- z.create(path, "");
- z.create(path + "/master_lock", "");
- path += "/nodes";
- z.create(path , "");
- {
- std::string path1;
- build_existence_path(path, ip, port, path1);
- z.create(path1, "", true);
- }
-
- // set exit zlistener here
- pfi::lang::function <void()> f = &force_exit;
- z.push_cleanup(f);
-
- return true;
- }
+// 127.0.0.1_9199 -> (127.0.0.1, 9199)
+bool revert(const string& name, string& ip, int& port)
+{
+ ip = name.substr(0, name.find("_"));
+ port = atoi(name.substr(name.find("_") + 1).c_str());
+ return true;
+}
- bool register_keeper(lock_service& z, const std::string& type, const std::string& ip, int port){
- std::string path = JUBAKEEPER_BASE_PATH;
- z.create(path, "");
- path += "/" + type;
- z.create(path, "");
- {
- std::string path1;
- build_existence_path(path, ip, port, path1);
- z.create(path1, "", true);
- }
- // set exit zlistener here
- pfi::lang::function <void()> f = &force_exit;
- z.push_cleanup(f);
- return true;
+// zk -> name -> ip -> port -> bool
+bool register_actor(lock_service& z,
+ const string& type, const string& name,
+ const string& ip, int port)
+{
+ string path;
+ build_actor_path(path, type, name);
+ z.create(path, "");
+ z.create(path + "/master_lock", "");
+ path += "/nodes";
+ z.create(path , "");
+ {
+ string path1;
+ build_existence_path(path, ip, port, path1);
+ z.create(path1, "", true);
}
- // zk -> name -> list( (ip, rpc_port) )
- bool get_all_actors(lock_service& z,
- const std::string& type, const std::string& name,
- std::vector<std::pair<std::string, int> >& ret){
- ret.clear();
- std::string path;
- build_actor_path(path, type, name);
- path += "/nodes";
- std::vector<std::string> list;
- z.list(path, list);
- for(std::vector<std::string>::const_iterator it = list.begin();
- it != list.end(); ++it ){
- std::string ip;
- int port;
- revert(*it, ip, port);
- ret.push_back(make_pair(ip,port));
- }
- return true;
- }
+ // set exit zlistener here
+ pfi::lang::function <void()> f = &force_exit;
+ z.push_cleanup(f);
+
+ return true;
+}
- bool push_cleanup(lock_service& z, pfi::lang::function<void()>& f){
- z.push_cleanup(f);
- return true;
+bool register_keeper(lock_service& z, const string& type, const string& ip, int port)
+{
+ string path = JUBAKEEPER_BASE_PATH;
+ z.create(path, "");
+ path += "/" + type;
+ z.create(path, "");
+ {
+ string path1;
+ build_existence_path(path, ip, port, path1);
+ z.create(path1, "", true);
}
+ // set exit zlistener here
+ pfi::lang::function <void()> f = &force_exit;
+ z.push_cleanup(f);
+ return true;
+}
- void force_exit(){
- exit(-1);
+// zk -> name -> list( (ip, rpc_port) )
+bool get_all_actors(lock_service& z,
+ const string& type, const string& name,
+ std::vector<std::pair<string, int> >& ret)
+{
+ ret.clear();
+ string path;
+ build_actor_path(path, type, name);
+ path += "/nodes";
+ std::vector<string> list;
+ z.list(path, list);
+ for (std::vector<string>::const_iterator it = list.begin();
+ it != list.end(); ++it) {
+ string ip;
+ int port;
+ revert(*it, ip, port);
+ ret.push_back(make_pair(ip,port));
}
+ return true;
+}
- void prepare_jubatus(lock_service& ls, const std::string& type, const std::string& name){
- ls.create(JUBATUS_BASE_PATH);
- ls.create(JUBAVISOR_BASE_PATH);
- ls.create(JUBAKEEPER_BASE_PATH);
- ls.create(ACTOR_BASE_PATH);
+bool push_cleanup(lock_service& z, pfi::lang::function<void()>& f)
+{
+ z.push_cleanup(f);
+ return true;
+}
- std::string path = ACTOR_BASE_PATH + "/" + type;
+void force_exit()
+{
+ exit(-1);
+}
+
+void prepare_jubatus(lock_service& ls, const string& type, const string& name)
+{
+ ls.create(JUBATUS_BASE_PATH);
+ ls.create(JUBAVISOR_BASE_PATH);
+ ls.create(JUBAKEEPER_BASE_PATH);
+ ls.create(ACTOR_BASE_PATH);
+
+ string path = ACTOR_BASE_PATH + "/" + type;
+ ls.create(path);
+ if (name != "") {
+ build_actor_path(path, type, name);
ls.create(path);
- if(name != ""){
- build_actor_path(path, type, name);
- ls.create(path);
- }
}
}
-}
+
+} // common
+} // jubatus
View
55 src/common/membership.hpp
@@ -22,39 +22,40 @@
#include <vector>
#include <map>
-namespace jubatus{
-namespace common{
+namespace jubatus {
+namespace common {
- static const std::string JUBATUS_BASE_PATH = "/jubatus";
- static const std::string JUBAVISOR_BASE_PATH = "/jubatus/supervisors";
- static const std::string JUBAKEEPER_BASE_PATH = "/jubatus/jubakeepers";
- static const std::string ACTOR_BASE_PATH = "/jubatus/actors";
+static const std::string JUBATUS_BASE_PATH = "/jubatus";
+static const std::string JUBAVISOR_BASE_PATH = "/jubatus/supervisors";
+static const std::string JUBAKEEPER_BASE_PATH = "/jubatus/jubakeepers";
+static const std::string ACTOR_BASE_PATH = "/jubatus/actors";
- // "127.0.0.1" -> 9199 -> "127.0.0.1_9199"
- std::string build_loc_str(const std::string&, int, unsigned int = 0);
+// "127.0.0.1" -> 9199 -> "127.0.0.1_9199"
+std::string build_loc_str(const std::string&, int, unsigned int = 0);
- // /path/base -> 127.0.0.1 -> 9199 -> /path/base/127.0.0.1_9199
- void build_existence_path(const std::string&, const std::string&, int, std::string&);
+// /path/base -> 127.0.0.1 -> 9199 -> /path/base/127.0.0.1_9199
+void build_existence_path(const std::string&, const std::string&, int, std::string&);
- void build_actor_path(std::string&, const std::string& type, const std::string& name);
+void build_actor_path(std::string&, const std::string& type, const std::string& name);
- // 127.0.0.1_9199 -> (127.0.0.1, 9199)
- bool revert(const std::string&, std::string&, int&);
+// 127.0.0.1_9199 -> (127.0.0.1, 9199)
+bool revert(const std::string&, std::string&, int&);
- // zk -> name -> ip -> port -> bool
- bool register_actor(lock_service&, const std::string& type, const std::string& name,
- const std::string& ip, int port);
- // zk -> name -> ip -> port -> bool
- bool register_keeper(lock_service&, const std::string& type, const std::string& ip, int);
- // zk -> name -> list( (ip, rpc_port) )
- bool get_all_actors(lock_service&, const std::string& type, const std::string&,
- std::vector<std::pair<std::string, int> >&);
+// zk -> name -> ip -> port -> bool
+bool register_actor(lock_service&, const std::string& type, const std::string& name,
+ const std::string& ip, int port);
+// zk -> name -> ip -> port -> bool
+bool register_keeper(lock_service&, const std::string& type, const std::string& ip, int);
+// zk -> name -> list( (ip, rpc_port) )
+bool get_all_actors(lock_service&, const std::string& type, const std::string&,
+ std::vector<std::pair<std::string, int> >&);
- bool push_cleanup(lock_service&, pfi::lang::function<void()>&);
+bool push_cleanup(lock_service&, pfi::lang::function<void()>&);
- void force_exit();
+void force_exit();
- void prepare_jubatus(lock_service& ls,
- const std::string& type, const std::string& name = "");
-}
-}
+void prepare_jubatus(lock_service& ls,
+ const std::string& type, const std::string& name = "");
+
+} // common
+} // jubatus
View
77 src/common/mprpc/rpc_client.cpp
@@ -118,6 +118,83 @@ void rpc_mclient::send_all(const msgpack::sbuffer& buf)
event_base_dispatch(evbase_);
}
+rpc_result_object rpc_mclient::wait(const std::string& method)
+{
+ rpc_result_object result;
+
+ if (clients_.empty())
+ throw JUBATUS_EXCEPTION(rpc_no_client() << error_method(method));
+
+ register_fd_readable_();
+ event_base_dispatch(evbase_);
+
+ size_t count = 0;
+
+ for (client_list_t::iterator it = clients_.begin(), end = clients_.end();
+ it != end; ++it) {
+ shared_ptr<async_client> client = *it;
+ async_client::response_list_t& response_list = client->response();
+
+ try {
+ if (client->read_exception())
+ client->read_exception()->throw_exception();
+ if (client->write_exception())
+ client->write_exception()->throw_exception();
+ if (response_list.empty())
+ throw JUBATUS_EXCEPTION(rpc_no_result() << error_method(method));
+
+ // If implement RPC specification strictly, you must find response by msgid,
+ // but pfi::network::mprpc::rpc_server does not support RPC pipelining.
+ rpc_response_t res = response_list.front();
+ response_list.erase(response_list.begin());
+ if (res.has_error()) {
+ if (res.error().type == msgpack::type::POSITIVE_INTEGER) {
+ // error code defined in pficommon/mprpc/message.h
+ switch (static_cast<unsigned int>(res.error().via.u64)) {
+ case pfi::network::mprpc::METHOD_NOT_FOUND:
+ throw JUBATUS_EXCEPTION(rpc_method_not_found() << error_method(method));
+
+ case pfi::network::mprpc::TYPE_MISMATCH:
+ throw JUBATUS_EXCEPTION(rpc_type_error() << error_method(method));
+
+ default:
+ throw JUBATUS_EXCEPTION(rpc_call_error()
+ << error_method(method)
+ << jubatus::exception::error_message(std::string("rpc_server error: " + pfi::lang::lexical_cast<std::string>(res.error().via.u64))));
+ }
+ } else {
+ // MEMO: other error object returned
+ throw JUBATUS_EXCEPTION(rpc_call_error()
+ << error_method(method)
+ << jubatus::exception::error_message("error response: " + pfi::lang::lexical_cast<std::string>(res.error())));
+ }
+ }
+
+ result.response.push_back(res);
+
+ count++;
+
+ // continue process next result when exception thrown
+ } catch (...) {
+ // store exception_thrower to list of error
+ result.error.push_back(rpc_error(client->host(), client->port(), jubatus::exception::get_current_exception()));
+
+ // clear last exception set by libevent callback
+ client->context_->read_exception.reset();
+ client->context_->write_exception.reset();
+ }
+ }
+
+ if (!count) {
+ rpc_no_result e;
+ if (result.has_error())
+ e << error_multi_rpc(result.error);
+ throw JUBATUS_EXCEPTION(e << error_method(method));
+ }
+
+ return result;
+}
+
} // mprpc
} // common
} // jubatus
View
22 src/common/mprpc/rpc_client.hpp
@@ -115,6 +115,13 @@ struct rpc_result {
bool has_error() const { return !error.empty(); }
};
+struct rpc_result_object {
+ std::vector<rpc_response_t> response;
+ std::vector<rpc_error> error;
+
+ bool has_error() const { return !error.empty(); }
+};
+
class rpc_mclient : pfi::lang::noncopyable
{
public:
@@ -133,6 +140,9 @@ class rpc_mclient : pfi::lang::noncopyable
template <typename Res, typename A0, typename A1, typename A2, typename A3>
rpc_result<Res> call(const std::string&, const A0&, const A1&, const A2&, const A3&, const pfi::lang::function<Res(Res,Res)>& reducer);
+ template <typename A0>
+ rpc_result_object call(const std::string&, const A0& a0);
+
private:
static void readable_callback(int fd, short int events, void* arg);
static void writable_callback(int fd, short int events, void* arg);
@@ -145,13 +155,14 @@ class rpc_mclient : pfi::lang::noncopyable
void send_all(const msgpack::sbuffer& buf);
+ rpc_result_object wait(const std::string& m);
+
template <typename Arr>
void call_(const std::string&, const Arr& a);
template <typename Res>
rpc_result<Res> join_(const std::string&, const pfi::lang::function<Res(Res,Res)>& reducer);
-
std::vector<std::pair<std::string, uint16_t> > hosts_;
int timeout_sec_;
@@ -290,6 +301,15 @@ rpc_result<Res> rpc_mclient::call(const std::string& m, const A0& a0, const A1&
return join_(m, reducer);
}
+
+
+template <typename A0>
+rpc_result_object rpc_mclient::call(const std::string& m, const A0& a0)
+{
+ call_(m, msgpack::type::tuple<A0>(a0));
+ return wait(m);
+}
+
} // mprpc
} // common
} // jubatus
View
5 src/common/mprpc/rpc_client_test.cpp
@@ -17,6 +17,7 @@
#include "rpc_client.hpp"
#include "../../framework/aggregators.hpp"
+#include "../../server/test_util.hpp"
#include "gtest/gtest.h"
#include <pficommon/concurrent/thread.h>
#include <pficommon/network/mprpc.h>
@@ -181,9 +182,9 @@ TEST(rpc_mclient, small)
threads.back()->start();
clients.push_back(make_pair(string("localhost"), port));
+ wait_server(port);
}
const size_t kServerSize = clients.size();
- usleep(500000);
{
test_mrpc_client cli0("localhost", PORT0, 3.0);
test_mrpc_client cli1("localhost", PORT1, 3.0);
@@ -311,12 +312,12 @@ TEST(rpc_mclient, socket_disconnection)
server_ptr ser(new test_mrpc_server(3.0));
thread th(pfi::lang::bind(&server_thread, ser, kPortStart));
th.start();
+ wait_server(kPortStart);
vector<pair<string,uint16_t> > clients;
clients.push_back(make_pair(string("localhost"), kPortStart));
clients.push_back(make_pair(string("localhost"), kInvalidPort)); // connection refused
- usleep(500000);
{
test_mrpc_client cli0("localhost", kPortStart, 3.0);
test_mrpc_client cli1("localhost", kInvalidPort, 3.0);
View
4 src/common/mprpc/wscript
@@ -9,14 +9,14 @@ def build(bld):
bld.shlib(
source = src,
target = 'jubacommon_mprpc',
- use = 'PFICOMMON LIBGLOG ZOOKEEPER_MT LIBEVENT MSGPACK'
+ use = 'PFICOMMON LIBGLOG ZOOKEEPER_MT LIBEVENT MSGPACK jubacommon'
)
bld.program(
features = 'gtest',
source = 'rpc_client_test.cpp',
target = 'rpc_client_test',
- includes = '. ../framework',
+ includes = '.',
use = 'PFICOMMON MSGPACK jubacommon jubacommon_mprpc',
)
View
2  src/common/rpc_util.hpp
@@ -20,7 +20,7 @@
#include <pficommon/data/serialization.h>
#include <msgpack.hpp>
-namespace jubatus{
+namespace jubatus {
typedef std::pair<std::string, int> connection_info;
View
117 src/common/util.cpp
@@ -38,8 +38,6 @@
#include <netinet/in.h>
#include <arpa/inet.h>
-#include <unistd.h>
-
#ifdef __APPLE__
#include <libproc.h>
#endif
@@ -49,28 +47,37 @@
using std::string;
using namespace pfi::lang;
-namespace jubatus { namespace util{
+namespace jubatus {
+namespace util {
+
+void get_ip(const char* nic, string& out)
+{
+ int fd;
+ struct ifreq ifr;
-void get_ip(const char* nic, string& out){
- int fd;
- struct ifreq ifr;
-
- fd = socket(AF_INET, SOCK_DGRAM, 0);
- ifr.ifr_addr.sa_family = AF_INET;
- strncpy(ifr.ifr_name, nic, IFNAMSIZ-1);
- ioctl(fd, SIOCGIFADDR, &ifr);
- close(fd);
+ fd = socket(AF_INET, SOCK_DGRAM, 0);
+ ifr.ifr_addr.sa_family = AF_INET;
+ strncpy(ifr.ifr_name, nic, IFNAMSIZ-1);
+ ioctl(fd, SIOCGIFADDR, &ifr);
+ close(fd);
- struct sockaddr_in * sin = (struct sockaddr_in*)(&(ifr.ifr_addr));
- out = inet_ntoa((struct in_addr)(sin->sin_addr));
+ struct sockaddr_in* sin = (struct sockaddr_in*)(&(ifr.ifr_addr));
+ out = inet_ntoa((struct in_addr)(sin->sin_addr));
}
-string get_ip(const char* nic){
+string get_ip(const char* nic)
+{
string ret;
get_ip(nic, ret);
return ret;
}
+string base_name(const string& path)
+{
+ size_t found = path.rfind('/');
+ return found != string::npos ? path.substr(found + 1) : path;
+}
+
std::string get_program_name()
{
// WARNING: this code will only work on linux or OS X
@@ -97,11 +104,11 @@ std::string get_program_name()
}
// get basename
- const char* last = strrchr(path, '/');
- if (!last)
+ const string program_base_name = base_name(path);
+ if (program_base_name == path)
throw JUBATUS_EXCEPTION(jubatus::exception::runtime_error(string("Failed to get program name from path: ") + path)
<< jubatus::exception::error_file_name(path));
- return std::string(last + 1);
+ return program_base_name;
}
//local server list should be like:
@@ -111,7 +118,8 @@ std::string get_program_name()
// ...
// 192.168.1.23 2345
//and must include self IP got from "eth0"
-std::string load(const std::string& file, std::vector< std::pair<std::string, int> >& s){
+std::string load(const std::string& file, std::vector< std::pair<std::string, int> >& s)
+{
std::string tmp;
std::string self;
get_ip("eth0", self);
@@ -119,20 +127,20 @@ std::string load(const std::string& file, std::vector< std::pair<std::string, in
int port;
int line = 0;
std::ifstream ifs(file.c_str());
- if(!ifs){
+ if (!ifs) {
return self;
}
- while(ifs >> tmp){
- if(self==tmp)
+ while (ifs >> tmp) {
+ if (self==tmp)
self_included = true;
- if(!(ifs >> port)){
+ if (!(ifs >> port)) {
// TODO: replace jubatus exception
throw parse_error(file, line, tmp.size(), string("input port"));
}
s.push_back(std::pair<std::string,int>(tmp, port));
line++;
}
- if(!self_included){
+ if (!self_included) {
// TODO: replace jubatus exception
throw parse_error(file, s.size(), 0, //FIXME: 0
string("self IP(eth0) not included in list"));
@@ -140,50 +148,59 @@ std::string load(const std::string& file, std::vector< std::pair<std::string, in
return self;
}
-
-int daemonize(){
+int daemonize()
+{
return daemon(0, 0);
}
-void append_env_path(const string& e, const string& argv0){
- const char * env = getenv(e.c_str());
- // char cwd[PATH_MAX];
- // getcwd(cwd, PATH_MAX);
-
+void append_env_path(const string& e, const string& argv0)
+{
+ const char* env = getenv(e.c_str());
string new_path = string(env) + ":" + argv0;
setenv(e.c_str(), new_path.c_str(), new_path.size());
}
-void append_server_path(const string& argv0){
+void append_server_path(const string& argv0)
+{
const char * env = getenv("PATH");
char cwd[PATH_MAX];
if (!getcwd(cwd, PATH_MAX)) {
throw JUBATUS_EXCEPTION(jubatus::exception::runtime_error("Failed to getcwd"))
<< jubatus::exception::error_errno(errno);
- }
-
+ }
+
string p = argv0.substr(0, argv0.find_last_of('/'));
string new_path = string(env) + ":" + cwd + "/" + p + "/../server";
setenv("PATH", new_path.c_str(), new_path.size());
}
-void get_machine_status(std::map<std::string, std::string>& ret){
- pid_t pid = getpid();
-
+void get_machine_status(machine_status_t& status)
+{
// WARNING: this code will only work on linux
- std::ostringstream fname;
- fname << "/proc/" << pid << "/statm";
- std::ifstream statm(fname.str().c_str());
-
- uint64_t vm_virt; statm >> vm_virt;
- uint64_t vm_rss; statm >> vm_rss;
- uint64_t vm_shr; statm >> vm_shr;
-
- ret["VIRT"] = pfi::lang::lexical_cast<std::string>(vm_virt);
- ret["RSS"] = pfi::lang::lexical_cast<std::string>(vm_rss);
- ret["SHR"] = pfi::lang::lexical_cast<std::string>(vm_shr);
-
+ try {
+ // /proc/[pid]/statm shows using page size
+ char path[64];
+ snprintf(path, sizeof(path), "/proc/%d/statm", getpid());
+ std::ifstream statm(path);
+
+ const long page_size = sysconf(_SC_PAGESIZE);
+ uint64_t vm_virt, vm_rss , vm_shr;
+ statm >> vm_virt >> vm_rss >> vm_shr;
+ vm_virt = vm_virt * page_size / 1024;
+ vm_rss = vm_rss * page_size / 1024;
+ vm_shr = vm_shr * page_size / 1024;
+
+ // in KB
+ status.vm_size = vm_virt; // total program size(virtual memory)
+ status.vm_resident = vm_rss; // resident set size
+ status.vm_share = vm_shr; // shared
+ } catch (...) {
+ // store zero
+ status.vm_size = 0;
+ status.vm_resident = 0;
+ status.vm_share = 0;
+ }
}
namespace {
@@ -213,7 +230,7 @@ void set_exit_on_term()
void ignore_sigpipe()
{
// portable code for socket write(2) MSG_NOSIGNAL
- if(signal(SIGPIPE, SIG_IGN) == SIG_ERR)
+ if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
throw JUBATUS_EXCEPTION(jubatus::exception::runtime_error("can't ignore SIGPIPE")
<< jubatus::exception::error_api_func("signal")
<< jubatus::exception::error_errno(errno));
View
13 src/common/util.hpp
@@ -22,11 +22,18 @@
#include <stdint.h>
-namespace jubatus{
-namespace util{
+namespace jubatus {
+namespace util {
+
+struct machine_status_t {
+ uint64_t vm_size; // VIRT
+ uint64_t vm_resident; // RSS
+ uint64_t vm_share; // SHR
+};
void get_ip(const char* nic, std::string& out);
std::string get_ip(const char* nic);
+std::string base_name(const std::string&);
std::string get_program_name();
std::string load(const std::string& file, std::vector< std::pair<std::string, int> >& s);
@@ -36,7 +43,7 @@ int daemonize();
void append_env_path(const std::string& env_, const std::string& argv0);
void append_server_path(const std::string& argv0);
-void get_machine_status(std::map<std::string, std::string>&);
+void get_machine_status(machine_status_t& status);
void set_exit_on_term();
void ignore_sigpipe();
View
14 src/common/util_test.cpp
@@ -40,6 +40,12 @@ TEST(common,util2){
ASSERT_NE(env, env2);
}
+TEST(common, base_name){
+ EXPECT_EQ("test", jubatus::util::base_name("/path/to/test"));
+ EXPECT_EQ("basename", jubatus::util::base_name("basename"));
+ EXPECT_EQ("", jubatus::util::base_name("/path/to/"));
+}
+
TEST(common,util_get_program_name){
std::string path;
EXPECT_NO_THROW({
@@ -47,3 +53,11 @@ TEST(common,util_get_program_name){
});
EXPECT_EQ(std::string("util_test"), path);
}
+
+TEST(common, util_get_machine_status)
+{
+ jubatus::util::machine_status_t status;
+ EXPECT_NO_THROW({
+ jubatus::util::get_machine_status(status);
+ });
+}
View
18 src/common/wscript
@@ -7,6 +7,19 @@ def options(opt):
def configure(conf):
conf.check_cxx(header_name = 'sys/socket.h net/if.h sys/ioctl.h', mandatory = True)
conf.check_cxx(header_name = 'netinet/in.h arpa/inet.h', mandatory = True)
+
+ # Check compiler(GCC/Clang) support atomic builtin extension
+ conf.check_cxx(fragment='''
+#include <stdint.h>
+int main() {
+ uint64_t c = 0;
+ __sync_fetch_and_add(&c, 0);
+ return 0;
+}
+''',
+ msg = 'Checking for compiler atomic builtins',
+ define_name = 'ATOMIC_I8_SUPPORT', mandatory = False)
+
conf.recurse(subdirs)
def build(bld):
@@ -19,7 +32,7 @@ def build(bld):
source = src,
target = 'jubacommon',
includes = '.',
- use = 'PFICOMMON LIBGLOG ZOOKEEPER_MT jubacommon_mprpc'
+ use = 'PFICOMMON LIBGLOG ZOOKEEPER_MT'
)
test_src = [
@@ -42,7 +55,8 @@ def build(bld):
includes = '.',
use = 'jubacommon'
)
- map(make_test, test_src)
+ for s in test_src:
+ make_test(s)
bld.install_files('${PREFIX}/include/jubatus/common/', bld.path.ant_glob('*.hpp'))
bld.recurse(subdirs)
View
467 src/common/zk.cpp
@@ -28,250 +28,279 @@ using pfi::concurrent::scoped_lock;
using std::vector;
using std::string;
-namespace jubatus{
-namespace common{
-
- zk::zk(const std::string& hosts, int timeout, const std::string& logfile):
- zh_(NULL),
- hosts_(hosts),
- logfilep_(NULL)
- {
- if(logfile != ""){
- logfilep_ = fopen(logfile.c_str(), "a+");
- if(logfilep_ == NULL){
- LOG(ERROR) << "cannot init zk logfile:" << logfile;
- throw JUBATUS_EXCEPTION(jubatus::exception::runtime_error("cannot open zk logfile")
- << jubatus::exception::error_file_name(logfile.c_str())
- << jubatus::exception::error_errno(errno)
- << jubatus::exception::error_api_func("fopen"));
- }
- zoo_set_log_stream(logfilep_);
- }
-
- zh_ = zookeeper_init(hosts.c_str(), NULL, timeout * 1000, 0, NULL, 0);
- if(!zh_){
- perror("");
- throw JUBATUS_EXCEPTION(jubatus::exception::runtime_error("cannot init zk")
- << jubatus::exception::error_api_func("zookeeper_init")
- << jubatus::exception::error_errno(errno));
- }
+namespace jubatus {
+namespace common {
- // sleep the state got not ZOO_CONNECTING_STATE
- while((state_ = zoo_state(zh_)) == ZOO_CONNECTING_STATE){
- usleep(100);
+zk::zk(const string& hosts, int timeout, const string& logfile):
+ zh_(NULL),
+ hosts_(hosts),
+ logfilep_(NULL)
+{
+ if (logfile != "") {
+ logfilep_ = fopen(logfile.c_str(), "a+");
+ if (!logfilep_) {
+ LOG(ERROR) << "cannot init zk logfile:" << logfile;
+ throw JUBATUS_EXCEPTION(jubatus::exception::runtime_error("cannot open zk logfile")
+ << jubatus::exception::error_file_name(logfile.c_str())
+ << jubatus::exception::error_errno(errno)
+ << jubatus::exception::error_api_func("fopen"));
}
+ zoo_set_log_stream(logfilep_);
+ }
- if(is_unrecoverable(zh_) == ZINVALIDSTATE){
- throw JUBATUS_EXCEPTION(jubatus::exception::runtime_error("cannot connect zk")
- << jubatus::exception::error_api_func("is_unrecoverable")
- << jubatus::exception::error_message(zerror(errno)));
- }
+ zh_ = zookeeper_init(hosts.c_str(), NULL, timeout * 1000, 0, NULL, 0);
+ if (!zh_) {
+ perror("");
+ throw JUBATUS_EXCEPTION(jubatus::exception::runtime_error("cannot init zk")
+ << jubatus::exception::error_api_func("zookeeper_init")
+ << jubatus::exception::error_errno(errno));
+ }
- zoo_set_context(zh_, this);
- zoo_set_watcher(zh_, mywatcher);
- };
+ // sleep the state got not ZOO_CONNECTING_STATE
+ while ((state_ = zoo_state(zh_)) == ZOO_CONNECTING_STATE) {
+ usleep(100);
+ }
- zk::~zk(){
- force_close();
- if(logfilep_){
- fclose(logfilep_);
- }
- };
-
- void zk::force_close(){
- zookeeper_close(zh_);
- zh_ = NULL;
+ if (is_unrecoverable(zh_) == ZINVALIDSTATE) {
+ throw JUBATUS_EXCEPTION(jubatus::exception::runtime_error("cannot connect zk")
+ << jubatus::exception::error_api_func("is_unrecoverable")
+ << jubatus::exception::error_message(zerror(errno)));
}
- void zk::create(const std::string& path, const std::string& payload, bool ephemeral){
- scoped_lock lk(m_);
- int rc = zoo_create(zh_, path.c_str(), payload.c_str(), payload.length(),
- &ZOO_OPEN_ACL_UNSAFE,
- ((ephemeral)?ZOO_EPHEMERAL:0), // | ZOO_SEQUENCE
- NULL, 0);
- if(ephemeral){
- if(rc != ZOK){
- LOG(ERROR) << path << " failed in creation:" << zerror(rc);
- throw JUBATUS_EXCEPTION(jubatus::exception::runtime_error("failed to create zk empheral node")
- << jubatus::exception::error_message(std::string("zerror: ") + zerror(rc))
- << jubatus::exception::error_api_func("zoo_create")
- << jubatus::exception::error_file_name(path));
- }
- }else{
- if(rc != ZOK && rc != ZNODEEXISTS){
- LOG(ERROR) << path << " failed in creation " << rc << " " << zerror(rc);
- }
- }
- };
-
- // "/some/path" => "/some/path0000012"
- void zk::create_seq(const std::string& path, std::string& seqfile){
- scoped_lock lk(m_);
- char path_buffer[path.size()+16];
- int rc = zoo_create(zh_, path.c_str(), NULL, 0, &ZOO_OPEN_ACL_UNSAFE,
- ZOO_EPHEMERAL|ZOO_SEQUENCE, path_buffer, path.size()+16);
- seqfile = "";
- if(rc != ZOK){
- LOG(ERROR) << path << " failed in creation - " << zerror(rc);
-
- }else{
- seqfile = path_buffer;
- }
- };
-
- uint64_t zk::create_id(const std::string& path, uint32_t prefix){
- scoped_lock lk(m_);
- struct Stat st;
- int rc = zoo_set2(zh_, path.c_str(), "dummy", 6, -1, &st);
-
- if(rc != ZOK){
- LOG(ERROR) << path << " failed on zoo_set2 " << zerror(rc);
- throw JUBATUS_EXCEPTION(jubatus::exception::runtime_error("failed create id")
- << jubatus::exception::error_message(std::string("zerror: ") + zerror(rc))
- << jubatus::exception::error_api_func("zoo_set2"));
+
+ zoo_set_context(zh_, this);
+ zoo_set_watcher(zh_, mywatcher);
+}
+
+zk::~zk()
+{
+ force_close();
+ if (logfilep_) {
+ fclose(logfilep_);
+ }
+}
+
+void zk::force_close()
+{
+ zookeeper_close(zh_);
+ zh_ = NULL;
+}
+
+