Skip to content
Browse files

Merge branch 'release/0.3.1'

  • Loading branch information...
2 parents e2a2397 + 78058d2 commit 9b24016db90ec67627371720e62a05824d8b044b @kuenishi kuenishi committed Jul 20, 2012
Showing with 1,386 additions and 1,033 deletions.
  1. +11 −0 README.rst
  2. +6 −0 src/classifier/wscript
  3. +13 −6 src/common/exception.hpp
  4. +14 −38 src/common/exception_info.hpp
  5. +50 −24 src/common/exception_test.cpp
  6. +238 −41 src/common/mprpc/async_client.cpp
  7. +114 −37 src/common/mprpc/async_client.hpp
  8. +65 −0 src/common/mprpc/exception.hpp
  9. +37 −159 src/common/mprpc/rpc_client.cpp
  10. +202 −103 src/common/mprpc/rpc_client.hpp
  11. +203 −45 src/common/mprpc/rpc_client_test.cpp
  12. +1 −1 src/common/mprpc/wscript
  13. +34 −0 src/common/util.cpp
  14. +3 −0 src/common/util.hpp
  15. +20 −12 src/common/zk.cpp
  16. +26 −21 src/framework/jubatus_serv.cpp
  17. +3 −1 src/framework/keeper.cpp
  18. +6 −11 src/framework/keeper.hpp
  19. +45 −36 src/framework/mixer.cpp
  20. +8 −8 src/framework/mixer.hpp
  21. +15 −25 src/framework/server_util.cpp
  22. +16 −10 src/framework/server_util.hpp
  23. +1 −1 src/framework/wscript
  24. +12 −7 src/jubavisor/jubavisor.cpp
  25. +1 −0 src/jubavisor/jubavisor.hpp
  26. +3 −1 src/jubavisor/main.cpp
  27. +1 −30 src/server/classifier.idl
  28. +6 −6 src/server/classifier_impl.cpp
  29. +5 −7 src/server/classifier_serv.cpp
  30. +30 −26 src/server/classifier_test.cpp
  31. +1 −50 src/server/classifier_types.hpp
  32. +13 −18 src/server/graph.idl
  33. +14 −14 src/server/graph_client.hpp
  34. +9 −9 src/server/graph_impl.cpp
  35. +6 −6 src/server/graph_keeper.cpp
  36. +9 −6 src/server/graph_serv.cpp
  37. +4 −0 src/server/graph_serv.hpp
  38. +7 −7 src/server/graph_server.hpp
  39. +8 −0 src/server/graph_test.cpp
  40. +6 −12 src/server/graph_types.hpp
  41. +3 −34 src/server/recommender.idl
  42. +12 −12 src/server/recommender_impl.cpp
  43. +2 −4 src/server/recommender_serv.cpp
  44. +11 −4 src/server/recommender_test.cpp
  45. +1 −52 src/server/recommender_types.hpp
  46. +1 −30 src/server/regression.idl
  47. +6 −6 src/server/regression_impl.cpp
  48. +4 −7 src/server/regression_serv.cpp
  49. +12 −8 src/server/regression_test.cpp
  50. +1 −50 src/server/regression_types.hpp
  51. +18 −18 src/server/stat.idl
  52. +13 −13 src/server/stat_impl.cpp
  53. +7 −7 src/server/stat_keeper.cpp
  54. +9 −0 src/server/test_util.hpp
  55. +8 −0 src/server/wscript
  56. +1 −1 src/stat/mixable_stat_test.cpp
  57. +1 −0 src/stat/stat.hpp
  58. +6 −2 src/storage/wscript
  59. +1 −4 tools/generator/OMakefile
  60. +2 −2 tools/generator/generator.ml
  61. +1 −1 wscript
View
11 README.rst
@@ -31,6 +31,17 @@ LGPL 2.1
Update history
--------------
+0.3.1 2012/7/20
+~~~~~~~~~~~~~~~
+
+Improvements
+ - RPC enhances to many exceptions and provide new error handling interface (#49)
+ - JSON interface for set_config APIs (#44)
+ - jubavisor close zk connection correctly (#74)
+
+Bugfix
+ - #73, #69, #66, #65
+
Release 0.3.0 2012/6/29
~~~~~~~~~~~~~~~~~~~~~~~
View
6 src/classifier/wscript
@@ -29,3 +29,9 @@ def build(bld):
target = "classifier_test",
includes = '.',
use = 'jubatus_classifier jubastorage')
+
+ bld.install_files('${PREFIX}/include/jubatus/classifier', [
+ 'classifier_base.hpp',
+ 'classifier_factory.hpp',
+ 'classifier_type.hpp',
+ ])
View
19 src/common/exception.hpp
@@ -38,6 +38,13 @@ typedef error_info<struct error_at_file_, char const *> error_at_file;
typedef error_info<struct error_at_func_, char const *> error_at_func;
typedef error_info<struct error_at_line_, int> error_at_line;
typedef error_info<struct error_errno_, int> error_errno;
+inline std::string to_string(const error_errno& info)
+{
+ std::string msg(strerror(info.value()));
+ msg += " (" + pfi::lang::lexical_cast<std::string>(info.value()) + ")";
+ return msg;
+}
+
typedef error_info<struct error_file_name_, std::string> error_file_name;
typedef error_info<struct error_api_func_, std::string> error_api_func;
typedef error_info<struct error_message_, std::string> error_message;
@@ -70,7 +77,7 @@ class jubatus_exception : public std::exception {
virtual exception_thrower_ptr thrower() const = 0;
template <class Exception>
- friend Exception const & add_info(Exception const & e, pfi::lang::shared_ptr<error_info_base> info);
+ friend const Exception& add_info(const Exception& e, pfi::lang::shared_ptr<error_info_base> info);
std::string name() const throw()
{
@@ -96,20 +103,20 @@ class jubatus_exception : public std::exception {
};
template <class Exception>
-inline Exception const & add_info(Exception const & e, pfi::lang::shared_ptr<error_info_base> info)
+inline const Exception& add_info(const Exception& e, pfi::lang::shared_ptr<error_info_base> info)
{
e.info_list_.push_back(info);
return e;
}
template <class Exception, class Tag, class V>
-inline Exception const & operator <<(Exception const & e, error_info<Tag, V> const & info)
+inline const Exception& operator <<(const Exception& e, const error_info<Tag, V>& info)
{
return add_info(e, pfi::lang::shared_ptr<error_info_base>(new error_info<Tag, V>(info)));
}
template <class Exception>
-inline Exception const & operator <<(Exception const & e, pfi::lang::shared_ptr<error_info_base> info)
+inline const Exception& operator <<(const Exception& e, pfi::lang::shared_ptr<error_info_base> info)
{
return add_info(e, info);
}
@@ -174,7 +181,7 @@ class jubaexception : public jubatus_exception {
};
template <class Exception>
-inline Exception const & operator <<(Exception const & e, exception_thrower_binder_type const &)
+inline const Exception& operator <<(const Exception& e, const exception_thrower_binder_type&)
{
e.bind_thrower(exception_thrower_ptr(new exception_thrower_impl<Exception>(e)));
return e;
@@ -205,7 +212,7 @@ class runtime_error : public jubaexception<runtime_error> {
namespace detail {
template <class Exception>
-exception_thrower_ptr current_std_exception(Exception const & e)
+exception_thrower_ptr current_std_exception(const Exception& e)
{
return exception_thrower_ptr(new exception_thrower_impl<Exception>(e));
}
View
52 src/common/exception_info.hpp
@@ -15,6 +15,8 @@
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#pragma once
+
#include <cstring>
#include <typeinfo>
#include <pficommon/lang/shared_ptr.h>
@@ -29,7 +31,11 @@ std::string demangle_symbol(const char *symbol);
class error_info_base {
public:
- virtual bool splitter() const = 0;
+ virtual bool splitter() const
+ {
+ return false;
+ }
+
virtual std::string tag_typeid_name() const = 0;
virtual std::string as_string() const = 0;
@@ -40,6 +46,12 @@ class error_info_base {
template <class Tag, class V>
class error_info;
+template <class Tag, class V>
+inline std::string to_string(const error_info<Tag, V>& info)
+{
+ return pfi::lang::lexical_cast<std::string, V>(info.value());
+}
+
template<>
class error_info<struct error_splitter_, void> : public error_info_base {
public:
@@ -60,42 +72,13 @@ class error_info<struct error_splitter_, void> : public error_info_base {
}
};
-template<>
-class error_info<struct error_errno_, int> : public error_info_base {
-public:
- error_info(int err)
- : value_(err)
- {
- }
-
- bool splitter() const
- {
- return false;
- }
-
- std::string tag_typeid_name() const
- {
- return jubatus::exception::detail::demangle_symbol(typeid(struct error_errno_*).name());
- }
-
- std::string as_string() const
- {
- std::string msg(strerror(value_));
- msg += " (" + pfi::lang::lexical_cast<std::string>(value_) + ")";
- return msg;
- }
-private:
- int value_;
-};
-
template <class Tag, class V>
class error_info : public error_info_base {
public:
typedef V value_type;
error_info(value_type v);
~error_info() throw();
- bool splitter() const;
std::string tag_typeid_name() const;
std::string as_string() const;
@@ -120,12 +103,6 @@ inline error_info<Tag, V>::~error_info() throw()
}
template <class Tag, class V>
-inline bool error_info<Tag, V>::splitter() const
-{
- return false;
-}
-
-template <class Tag, class V>
inline std::string error_info<Tag, V>::tag_typeid_name() const
{
return jubatus::exception::detail::demangle_symbol(typeid(Tag*).name());
@@ -134,8 +111,7 @@ inline std::string error_info<Tag, V>::tag_typeid_name() const
template <class Tag, class V>
inline std::string error_info<Tag, V>::as_string() const
{
- // TODO: implement generic and user defined converter to std::string
- return pfi::lang::lexical_cast<std::string>(value_);
+ return to_string(*this);
}
} // exception
View
74 src/common/exception_test.cpp
@@ -31,6 +31,33 @@ using jubatus::exception::error_info_list_t;
using jubatus::exception::exception_thrower_ptr;
using jubatus::exception::jubaexception;
+namespace jubatus {
+namespace exception {
+typedef error_info<struct test_my_tag_, int> test_my_tag;
+inline string to_string(const test_my_tag& info)
+{
+ return pfi::lang::lexical_cast<string>(info.value() * 2);
+}
+} // exception
+} // jubatus
+
+TEST(error_info, defined_tag)
+{
+ jubatus::exception::test_my_tag tag(1);
+
+ EXPECT_FALSE(tag.splitter());
+ EXPECT_EQ(1, tag.value());
+}
+
+TEST(error_info, error_info_base)
+{
+ jubatus::exception::test_my_tag tag(1);
+ const jubatus::exception::error_info_base& base = tag;
+
+ EXPECT_FALSE(base.splitter());
+ EXPECT_EQ("2", base.as_string());
+}
+
namespace test_exception {
class ore_exception : public jubaexception<ore_exception> {
public:
@@ -56,7 +83,7 @@ class derived_exception : public jubatus::exception::runtime_error {
// multi-derived exception cannot get as it thrower
// because thrower() returns exception_thrower_impl<runtime_error>
};
-}
+} // test_exception
TEST(exception, custom_exception)
{
@@ -145,10 +172,10 @@ TEST(exception, exception_info_macro)
} catch (const jubatus_exception& e) {
error_info_list_t info_list = e.error_info();
EXPECT_EQ(4, info_list.size());
- EXPECT_EQ(false, info_list[0]->splitter());
- EXPECT_EQ(false, info_list[1]->splitter());
- EXPECT_EQ(false, info_list[2]->splitter());
- EXPECT_EQ(true, info_list[3]->splitter());
+ EXPECT_FALSE(info_list[0]->splitter());
+ EXPECT_FALSE(info_list[1]->splitter());
+ EXPECT_FALSE(info_list[2]->splitter());
+ EXPECT_TRUE(info_list[3]->splitter());
}
}
@@ -161,12 +188,12 @@ TEST(exception, exception_info_macro_additional)
} catch (const jubatus_exception& e) {
error_info_list_t info_list = e.error_info();
EXPECT_EQ(5, info_list.size());
- EXPECT_EQ(false, info_list[0]->splitter());
+ EXPECT_FALSE(info_list[0]->splitter());
EXPECT_EQ(string("message"), info_list[0]->as_string());
- EXPECT_EQ(false, info_list[1]->splitter());
- EXPECT_EQ(false, info_list[2]->splitter());
- EXPECT_EQ(false, info_list[3]->splitter());
- EXPECT_EQ(true, info_list[4]->splitter());
+ EXPECT_FALSE(info_list[1]->splitter());
+ EXPECT_FALSE(info_list[2]->splitter());
+ EXPECT_FALSE(info_list[3]->splitter());
+ EXPECT_TRUE(info_list[4]->splitter());
}
}
@@ -191,10 +218,10 @@ TEST(exception, exception_custom_error_info)
} catch (const jubatus_exception& e) {
error_info_list_t info_list = e.error_info();
EXPECT_EQ(4, info_list.size());
- EXPECT_EQ(false, info_list[0]->splitter());
- EXPECT_EQ(false, info_list[1]->splitter());
- EXPECT_EQ(false, info_list[2]->splitter());
- EXPECT_EQ(true, info_list[3]->splitter());
+ EXPECT_FALSE(info_list[0]->splitter());
+ EXPECT_FALSE(info_list[1]->splitter());
+ EXPECT_FALSE(info_list[2]->splitter());
+ EXPECT_TRUE(info_list[3]->splitter());
}
}
@@ -217,16 +244,16 @@ TEST(exception, exception_info_add_macro)
caught = true;
error_info_list_t info_list = e.error_info();
EXPECT_EQ(9, info_list.size());
- EXPECT_EQ(false, info_list[0]->splitter());
- EXPECT_EQ(false, info_list[1]->splitter());
- EXPECT_EQ(false, info_list[2]->splitter());
- EXPECT_EQ(true, info_list[3]->splitter());
- EXPECT_EQ(false, info_list[4]->splitter());
+ EXPECT_FALSE(info_list[0]->splitter());
+ EXPECT_FALSE(info_list[1]->splitter());
+ EXPECT_FALSE(info_list[2]->splitter());
+ EXPECT_TRUE(info_list[3]->splitter());
+ EXPECT_FALSE(info_list[4]->splitter());
EXPECT_EQ(string("added"), info_list[4]->as_string());
- EXPECT_EQ(false, info_list[5]->splitter());
- EXPECT_EQ(false, info_list[6]->splitter());
- EXPECT_EQ(false, info_list[7]->splitter());
- EXPECT_EQ(true, info_list[8]->splitter());
+ EXPECT_FALSE(info_list[5]->splitter());
+ EXPECT_FALSE(info_list[6]->splitter());
+ EXPECT_FALSE(info_list[7]->splitter());
+ EXPECT_TRUE(info_list[8]->splitter());
}
EXPECT_TRUE(caught);
@@ -288,4 +315,3 @@ TEST(exception, exception_class_name)
}
#endif
-
View
279 src/common/mprpc/async_client.cpp
@@ -22,19 +22,20 @@
#include <glog/logging.h>
-#include "../exception.hpp"
-
#include <fcntl.h>
#include <string.h>
#include <errno.h>
+#include "exception.hpp"
+
using pfi::lang::shared_ptr;
using pfi::system::time::clock_time;
using pfi::system::time::get_clock_time;
namespace jubatus { namespace common { namespace mprpc {
-bool set_socket_nonblock(int sock, bool on){
+bool set_socket_nonblock(int sock, bool on)
+{
int res;
if(on){
res = fcntl(sock, F_SETFL, O_NONBLOCK);
@@ -46,82 +47,278 @@ bool set_socket_nonblock(int sock, bool on){
}
async_sock::async_sock():
- //pfi::network::mprpc::socket(::socket(AF_INET, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0)),
pfi::network::mprpc::socket(::socket(AF_INET, SOCK_STREAM, 0)),
- state(CLOSED),
- progress(0)
+ state_(CLOSED),
+ progress_(0)
{
// FIXME: SOCK_NONBLOCK is linux only
- set_socket_nonblock(this->get(), true);
- unpacker.reserve_buffer(4096);
+ if (!set_socket_nonblock(this->get(), true))
+ throw JUBATUS_EXCEPTION(rpc_internal_error()
+ << jubatus::exception::error_api_func("fcntl")
+ << jubatus::exception::error_errno(errno));
+
+ unpacker_.reserve_buffer(4096);
}
-async_sock::~async_sock(){
+async_sock::~async_sock()
+{
this->close();
}
-bool async_sock::set_async(bool on){ return on; }
-
-int async_sock::send_async(const char* buf, size_t size){
- int r = ::write(this->get(), buf+progress, size-progress);
- if(r > 0){
- progress += r;
- }
+int async_sock::send_async(const char* buf, size_t size)
+{
+ int r = ::write(this->get(), buf + progress_, size - progress_);
+ if (r > 0)
+ progress_ += r;
return r;
}
int async_sock::recv_async()
{
- if (unpacker.buffer_capacity() == 0)
- unpacker.reserve_buffer(4096);
-
- int r = ::read(this->get(), unpacker.buffer(), unpacker.buffer_capacity());
- // if(r < 0){
- // char msg[1024];
- // strerror_r(errno, msg, 1024);
- // cout << "errno:"<< errno << msg << endl;
- // }
- if(r > 0){
- unpacker.buffer_consumed(r);
- }
+ if (unpacker_.buffer_capacity() == 0)
+ unpacker_.reserve_buffer(4096);
+
+ int r = ::read(this->get(), unpacker_.buffer(), unpacker_.buffer_capacity());
+ if (r > 0)
+ unpacker_.buffer_consumed(r);
+
return r;
}
-int async_sock::connect_async(const std::string& host, uint16_t port){
+int async_sock::connect_async(const std::string& host, uint16_t port)
+{
int res;
int sock = this->get();
-
+
std::vector<pfi::network::ipv4_address> ips = resolve(host, port);
- for (int i=0; i < (int)ips.size(); i++){
+ for (size_t i = 0, size = ips.size(); i < size; i++) {
sockaddr_in addrin={};
addrin.sin_family = PF_INET;
addrin.sin_addr.s_addr = inet_addr(ips[i].to_string().c_str());
addrin.sin_port = htons(port);
-
+
res = ::connect(sock,(sockaddr*)&addrin,sizeof(addrin));
- if (res == -1){
- if (errno==EINPROGRESS){
- state = CONNECTING;
+ if (res == -1) {
+ if (errno == EINPROGRESS) {
+ state_ = CONNECTING;
return 0;
- }else{
- DLOG(ERROR) << errno;
+ } else {
+ DLOG(ERROR) << __func__ << " failed to connect: " << errno;
}
- }
- else if(res == 0){
- state = SENDING;
+ } else if (res == 0) {
+ state_ = ONLINE;
return 0;
}
}
this->close();
return -1;
}
-bool async_sock::close(){
+bool async_sock::close()
+{
return pfi::network::mprpc::socket::close();
}
+async_client::async_client(const std::string& host, uint16_t port)
+ : host_(host)
+ , port_(port)
+ , sock_(new async_sock)
+ , context_(new client_context)
+ , evbase_(NULL)
+ , send_buffer_(NULL)
+ , clear_evbase_(false)
+{
+ context_->client = this;
+}
+async_client::~async_client()
+{
+ if (clear_evbase_ && evbase_)
+ event_base_free(evbase_);
}
+
+void async_client::connect_async()
+{
+ sock_->connect_async(host_, port_);
+}
+
+void async_client::prepare_event(event_base* evbase)
+{
+ context_->evbase = evbase_ = evbase;
+
+ // use `event_new' when only support libevent 2.x
+ event_set(&context_->ev_read, sock_->get(), EV_READ | EV_TIMEOUT | EV_PERSIST, &readable_callback_cb, context_.get());
+ event_set(&context_->ev_write, sock_->get(), EV_WRITE | EV_TIMEOUT | EV_PERSIST, &writable_callback_cb, context_.get());
+ event_base_set(evbase_, &context_->ev_read);
+ event_base_set(evbase_, &context_->ev_write);
}
+
+void async_client::prepare_event()
+{
+ if (!evbase_) {
+ clear_evbase_ = true;
+ evbase_ = event_base_new();
+ if (!evbase_)
+ throw std::bad_alloc();
+
+ prepare_event(evbase_);
+ }
}
+
+void async_client::set_send_buffer(const msgpack::sbuffer& s)
+{
+ send_buffer_ = &s;
+}
+
+void async_client::register_read(const timeval& timeout)
+{
+ event_add(&context_->ev_read, &timeout);
+}
+
+void async_client::register_write(const timeval& timeout)
+{
+ event_add(&context_->ev_write, &timeout);
+}
+
+void async_client::readable_callback_cb(int fd, short int events, void* arg) throw()
+{
+ client_context* context = static_cast<client_context*>(arg);
+ async_client* client = context->client;
+ context->read_exception.reset();
+
+ try {
+ client->readable_callback(fd, events);
+
+ } catch (jubatus::exception::jubatus_exception& e) {
+ event_del(&context->ev_read);
+ client->sock_->disconnected();
+ context->read_exception = e.thrower();
+
+ try {
+ throw;
+ } catch (rpc_internal_error& e) {
+ e << jubatus::exception::error_api_func("event_base_dispatch");
+ } catch (...) {}
+
+ } catch (...) {
+ // unknown error
+ event_del(&context->ev_read);
+ event_base_loopbreak(context->evbase);
+ context->read_exception = jubatus::exception::get_current_exception();
+ }
+}
+
+void async_client::writable_callback_cb(int fd, short int events, void* arg) throw()
+{
+ client_context* context = static_cast<client_context*>(arg);
+ async_client* client = context->client;
+ context->write_exception.reset();
+
+ try {
+ client->writable_callback(fd, events);
+
+ } catch (jubatus::exception::jubatus_exception& e) {
+ event_del(&context->ev_write);
+ client->sock_->disconnected();
+ context->write_exception = e.thrower();
+
+ try {
+ throw;
+ } catch (rpc_internal_error& e) {
+ e << jubatus::exception::error_api_func("event_base_dispatch");
+ } catch (...) {}
+
+ } catch (...) {
+ // unknown error
+ event_del(&context->ev_write);
+ event_base_loopbreak(context->evbase);
+ context->write_exception = jubatus::exception::get_current_exception();
+ }
+}
+
+void async_client::readable_callback(int fd, int events)
+{
+ if (events == EV_READ) {
+ int r = sock_->recv_async();
+ if (r < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ throw JUBATUS_EXCEPTION(rpc_io_error()
+ << jubatus::exception::error_api_func("read")
+ << jubatus::exception::error_errno(errno));
+
+ } else if (r == 0) {
+ // closed socket
+ sock_->disconnected();
+
+ } else {
+ rpc_response_t res;
+
+ if (sock_->salvage(res.response, res.zone)) {
+ //cout << __FILE__ << " " << __LINE__ << ":"<< endl;
+ //cout << "\ta0: "<< int(res.response.a0) << endl;
+ //cout << "\ta2: "<< res.response.a2.type << " " << res.response.a2.is_nil() << " " << res.response.a2 << endl;
+ //cout << "\ta3: "<< res.response.a3.type << " " << res.response.a3.is_nil() << " " << res.response.a3 << endl;;
+
+ // Response received
+ event_del(&context_->ev_read);
+
+ // RPC Response Message: [type = 1, msgid, error, result]
+ if (res.response.a0 == 1)
+ results_.push_back(res);
+ }
+ }
+
+ } else {
+ // EV_TIMEOUT or error occured
+ if (events == EV_TIMEOUT)
+ throw JUBATUS_EXCEPTION(rpc_timeout_error()
+ << jubatus::exception::error_api_func("read")
+ << jubatus::exception::error_errno(errno));
+ else
+ throw JUBATUS_EXCEPTION(rpc_internal_error()
+ << jubatus::exception::error_api_func("read")
+ << jubatus::exception::error_errno(errno));
+ }
+}
+
+void async_client::writable_callback(int fd, int events)
+{
+ if (events == EV_WRITE) {
+ if (sock_->is_connecting())
+ sock_->set_online();
+
+ int r = sock_->send_async(send_buffer_->data(), send_buffer_->size());
+ if (r < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ return;
+
+ // write error
+ throw JUBATUS_EXCEPTION(rpc_io_error()
+ << jubatus::exception::error_api_func("write")
+ << jubatus::exception::error_errno(errno));
+
+ } else if (r == 0) {
+ // there is no effect
+ } else if (sock_->sent_size() == send_buffer_->size()){
+ // current packet sending finished
+ event_del(&context_->ev_write);
+ sock_->reset_progress();
+
+ }
+
+ } else {
+ // EV_TIMEOUT or error occured
+ if (events == EV_TIMEOUT)
+ throw JUBATUS_EXCEPTION(rpc_timeout_error()
+ << jubatus::exception::error_api_func("write")
+ << jubatus::exception::error_errno(errno));
+ else
+ throw JUBATUS_EXCEPTION(rpc_internal_error()
+ << jubatus::exception::error_api_func("write")
+ << jubatus::exception::error_errno(errno));
+ }
+}
+
+} // mprpc
+} // common
+} // jubatus
View
151 src/common/mprpc/async_client.hpp
@@ -22,81 +22,158 @@
#include <map>
#include <msgpack.hpp>
+#include <event.h>
#include <pficommon/lang/shared_ptr.h>
+#include <pficommon/lang/function.h>
#include <pficommon/system/time_util.h>
#include <pficommon/network/mprpc/socket.h>
+#include <pficommon/network/mprpc/exception.h>
+
+#include "../exception.hpp"
+#include "exception.hpp"
+
namespace jubatus { namespace common { namespace mprpc {
+struct rpc_response_t {
+ msgpack::type::tuple<uint8_t,uint32_t,msgpack::object,msgpack::object> response;
+ pfi::lang::shared_ptr<msgpack::zone> zone;
+
+ bool has_error() const { return !response.a2.is_nil(); }
+
+ uint32_t msgid() const { return response.a1; }
+ msgpack::object& error() { return response.a2; }
+ template<typename T> const T as() const { return response.a3.as<T>(); }
+
+};
+
class async_sock : public pfi::network::mprpc::socket {
public:
async_sock();
~async_sock();
- bool set_async(bool on);
int send_async(const char* buf, size_t size);
int recv_async();
-
- template <typename T> bool salvage(T&, pfi::lang::shared_ptr<msgpack::zone>&);
+
+ template <typename T>
+ bool salvage(T&, pfi::lang::shared_ptr<msgpack::zone>&);
int connect_async(const std::string& host, uint16_t port);
bool close();
- void set_sending(){ state = SENDING; }
- void set_recving(){ state = RECVING; }
- void disconnected(){ this->close(); state = CLOSED; }
- bool is_closed()const{ return state == CLOSED; }
- bool is_connecting()const{ return state == CONNECTING; }
- bool is_sending()const{ return state == SENDING; }
- bool is_recving()const{ return state == RECVING; }
- size_t received()const{ return progress; }
- void reset_received() { progress = 0; }
+ void set_online() { state_ = ONLINE; }
+ void disconnected() { this->close(); state_ = CLOSED; }
+ bool is_closed() const { return state_ == CLOSED; }
+ bool is_connecting() const{ return state_ == CONNECTING; }
+ bool is_online() const { return state_ == ONLINE; }
+ size_t sent_size() const { return progress_; }
+ void reset_progress() { progress_ = 0; }
+
private:
- enum { CLOSED, CONNECTING, SENDING, RECVING } state;
- size_t progress;
- msgpack::unpacker unpacker;
+ enum { CLOSED, CONNECTING, ONLINE } state_;
+ size_t progress_;
+ msgpack::unpacker unpacker_;
};
-template <typename T> bool async_sock::salvage(T& t, pfi::lang::shared_ptr<msgpack::zone>& ret_z)
+template <typename T>
+bool async_sock::salvage(T& t, pfi::lang::shared_ptr<msgpack::zone>& ret_z)
{
msgpack::unpacked msg;
- if(unpacker.next(&msg)){
- msgpack::object o = msg.get();
- std::auto_ptr<msgpack::zone> z = msg.zone();
- ret_z = pfi::lang::shared_ptr<msgpack::zone>(z.get());
- z.release();
- o.convert(&t);
- return true;
+ try {
+ if (unpacker_.next(&msg)) {
+ msgpack::object o = msg.get();
+ std::auto_ptr<msgpack::zone> z = msg.zone();
+ ret_z = pfi::lang::shared_ptr<msgpack::zone>(z.get());
+ z.release();
+ o.convert(&t);
+ return true;
+ }
+ } catch (const msgpack::type_error&) {
+ throw JUBATUS_EXCEPTION(rpc_internal_error()
+ << jubatus::exception::error_message("msgpack::type_error: cannot convert RPC response"));
+ } catch (const msgpack::unpack_error& e) {
+ throw JUBATUS_EXCEPTION(rpc_internal_error()
+ << jubatus::exception::error_message(std::string("msgpack::unpack_error: ") + e.what()));
}
return false;
}
+class rpc_mclient;
class async_client {
+ friend class rpc_mclient;
public:
- async_client(const std::string& host, uint16_t port, int timeout_sec);
+ typedef std::vector<rpc_response_t> response_list_t;
+
+ async_client(const std::string& host, uint16_t port);
~async_client();
-
+
+ void connect_async();
+
void send_async(const std::string& method, const msgpack::sbuffer& argv);
+ void send_async(const msgpack::sbuffer& argv);
void join(msgpack::object& o);
-
- void connect_async();
- void l();
-
+ void prepare_event(event_base* evbase);
+
+ void set_send_buffer(const msgpack::sbuffer&);
+
+ void register_read(const timeval&);
+ void register_write(const timeval&);
+
+ std::string host() const { return host_; }
+ uint16_t port() const { return port_; }
+ pfi::lang::shared_ptr<async_sock> sock() const { return sock_; }
+ event_base* evbase() const { return evbase_; }
+ event* read_ev() const { return &context_->ev_read; }
+ event* write_ev() const { return &context_->ev_write; }
+ jubatus::exception::exception_thrower_ptr read_exception() const { return context_->read_exception; }
+ jubatus::exception::exception_thrower_ptr write_exception() const { return context_->write_exception; }
+
+ response_list_t& response() { return results_; }
+
+ bool is_closed() const { return sock_->is_closed(); }
+
private:
- bool wait();
-
+
+ struct client_context {
+ event_base* evbase;
+ async_client* client;
+ event ev_read;
+ event ev_write;
+ jubatus::exception::exception_thrower_ptr read_exception;
+ jubatus::exception::exception_thrower_ptr write_exception;
+
+ client_context()
+ {
+ evbase = NULL;
+ client = NULL;
+ memset(&ev_read, 0, sizeof(event));
+ memset(&ev_write, 0, sizeof(event));
+ }
+ };
+
+ static void readable_callback_cb(int fd, short int events, void* arg) throw();
+ static void writable_callback_cb(int fd, short int events, void* arg) throw();
+
+ void readable_callback(int, int);
+ void writable_callback(int, int);
+
+ void prepare_event();
+
std::string host_;
uint16_t port_;
- int timeout_sec_;
+
pfi::lang::shared_ptr<async_sock> sock_;
- pfi::system::time::clock_time start_;
+ response_list_t results_;
- int epfd_;
+ pfi::lang::shared_ptr<client_context> context_;
+ event_base* evbase_;
+ const mutable msgpack::sbuffer* send_buffer_; // rpc sending buffer
+ bool clear_evbase_;
};
-}
-}
-}
+} // mprpc
+} // common
+} // jubatus
View
65 src/common/mprpc/exception.hpp
@@ -0,0 +1,65 @@
+// Jubatus: Online machine learning framework for distributed environment
+// Copyright (C) 2012 Preferred Infrastructure and Nippon Telegraph and Telephone Corporation.
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public
+// License along with this library; if not, write to the Free Software
+// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+#pragma once
+
+#include <pficommon/network/mprpc/exception.h>
+#include "../exception.hpp"
+
+namespace jubatus {
+namespace common {
+namespace mprpc {
+
+typedef jubatus::exception::error_info<struct error_method_, std::string> error_method;
+
+class rpc_no_client : public jubatus::exception::jubaexception<rpc_no_client> {
+public:
+ const char* what() const throw() { return "no client"; }
+};
+
+class rpc_no_result : public jubatus::exception::jubaexception<rpc_no_result> {
+public:
+ const char* what() const throw() { return "no result"; }
+};
+
+
+class rpc_io_error : public jubatus::exception::jubaexception<rpc_io_error> {
+};
+
+class rpc_timeout_error : public jubatus::exception::jubaexception<rpc_timeout_error> {
+};
+
+// rpc_server error
+class rpc_type_error : public jubatus::exception::jubaexception<rpc_type_error> {
+};
+
+// rpc_server error
+class rpc_method_not_found : public jubatus::exception::jubaexception<rpc_method_not_found> {
+};
+
+// rpc_server error
+class rpc_call_error : public jubatus::exception::jubaexception<rpc_call_error> {
+};
+
+class rpc_internal_error : public jubatus::exception::jubaexception<rpc_internal_error> {
+};
+
+
+} // mprpc
+} // common
+} // jubatus
+
View
196 src/common/mprpc/rpc_client.cpp
@@ -20,173 +20,66 @@
#include <glog/logging.h>
+using std::string;
+using std::vector;
using pfi::lang::shared_ptr;
using pfi::system::time::clock_time;
using pfi::system::time::get_clock_time;
namespace jubatus { namespace common { namespace mprpc {
-rpc_mclient::rpc_mclient(const std::vector<std::pair<std::string, uint16_t> >& hosts,
+rpc_mclient::rpc_mclient(const vector<std::pair<string, uint16_t> >& hosts,
int timeout_sec):
hosts_(hosts),
timeout_sec_(timeout_sec),
start_(get_clock_time()),
evbase_(::event_base_new())
{
+ if (!evbase_)
+ throw std::bad_alloc();
+
connect_async_();
create_fd_event_();
}
-rpc_mclient::rpc_mclient(const std::vector<std::pair<std::string, int> >& hosts,
+rpc_mclient::rpc_mclient(const vector<std::pair<string, int> >& hosts,
int timeout_sec):
timeout_sec_(timeout_sec),
start_(get_clock_time()),
evbase_(::event_base_new())
{
+ if (!evbase_)
+ throw std::bad_alloc();
+
for(size_t i=0; i<hosts.size(); ++i){
hosts_.push_back(hosts[i]);
}
connect_async_();
create_fd_event_();
}
-rpc_mclient::~rpc_mclient(){
- ::event_base_free(evbase_);
-}
-
-void rpc_mclient::call_async(const std::string& m)
+rpc_mclient::~rpc_mclient()
{
- call_async_(m, std::vector<int>());
+ ::event_base_free(evbase_);
}
void rpc_mclient::connect_async_()
{
clients_.clear();
- for(size_t i=0; i<hosts_.size(); ++i){
- shared_ptr<async_sock> p(new async_sock);
- p->connect_async(hosts_[i].first, hosts_[i].second);
- clients_.insert(std::make_pair(p->get(), socket_context(p)));
- }
-}
-
-static void readable_callback(int fd, short int events, void* arg)
-{
- async_context* ctx = static_cast<async_context*>(arg);
- try {
- ctx->rest -= ctx->c->readable_callback(fd, events, ctx);
- } catch (...) {
- event_base_loopbreak(ctx->evbase);
- }
-}
+ vector<std::pair<string, uint16_t> >::const_iterator it = hosts_.begin(), end = hosts_.end();
+ for (; it != end; ++it) {
+ shared_ptr<async_client> client(new async_client(it->first, it->second));
+ client->connect_async();
-int rpc_mclient::readable_callback(int fd, int events, async_context* ctx){
- int done = 0;
- pfi::lang::shared_ptr<async_sock> client = clients_[fd].socket;
-
- if (events == EV_READ) {
- int r = client->recv_async();
- if (r <= 0) {
- if (errno != EAGAIN || errno != EWOULDBLOCK) {
- client->disconnected();
- done++;
- }
-
- } else {
- rpc_response_t res;
-
- if(client->salvage(res.response, res.zone)){
- //cout << __FILE__ << " " << __LINE__ << ":"<< endl;
- //cout << "\ta0: "<< int(res.response.a0) << endl;
- //cout << "\ta2: "<< res.response.a2.type << " " << res.response.a2.is_nil() << " " << res.response.a2 << endl;
- //cout << "\ta3: "<< res.response.a3.type << " " << res.response.a3.is_nil() << " " << res.response.a3 << endl;;
-
- done++;
- if(res.response.a0 == 1){
- if(res.response.a2.is_nil()){
- ctx->ret.push_back(res);
- }
- }
- }
- }
-
- } else {
- // EV_TIMEOUT or error occured
- if (events == EV_TIMEOUT)
- ;// TODO: push timeout exception
-
- client->disconnected();
- done++;
+ clients_.push_back(client);
}
-
- if (done)
- event_del(&clients_[fd].ev_read);
-
- if (client->is_closed())
- clients_.erase(fd);
-
- return done;
-}
-
-static void writable_callback(int fd, short int events, void* arg)
-{
- async_context* ctx = static_cast<async_context*>(arg);
- try {
- ctx->rest -= ctx->c->writable_callback(fd, events, ctx);
- } catch (...) {
- event_base_loopbreak(ctx->evbase);
- }
-}
-
-int rpc_mclient::writable_callback(int fd, int events, async_context* ctx){
- int done = 0;
- pfi::lang::shared_ptr<async_sock> client = clients_[fd].socket;
-
- if (events == EV_WRITE) {
- if(client->is_connecting()){
- client->set_sending();
- }
-
- int r = client->send_async(ctx->buf->data(), ctx->buf->size());
- if (r <= 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
- client->disconnected();
- done++;
-
- } else if (client->received() == ctx->buf->size()){
- done++;
- client->reset_received();
- client->set_recving();
-
- }
-
- } else {
- // EV_TIMEOUT or error occured
- if (events == EV_TIMEOUT)
- ;// TODO: push timeout exception
-
- clients_[fd].socket->disconnected();
- done++;
- }
-
- if (done)
- event_del(&clients_[fd].ev_write);
-
- if (client->is_closed())
- clients_.erase(fd);
-
- return done;
}
void rpc_mclient::create_fd_event_()
{
- socket_info_map_t::iterator it, end;
- for (it = clients_.begin(), end = clients_.end(); it != end; ++it) {
- socket_context& c = it->second;
-
- // use `event_new' when only support libevent 2.x
- event_set(&c.ev_read, c.socket->get(), EV_READ | EV_TIMEOUT | EV_PERSIST, &mprpc::readable_callback, &context_);
- event_set(&c.ev_write, c.socket->get(), EV_WRITE | EV_TIMEOUT | EV_PERSIST, &mprpc::writable_callback, &context_);
- event_base_set(evbase_, &c.ev_read);
- event_base_set(evbase_, &c.ev_write);
+ for (client_list_t::iterator it = clients_.begin(), end = clients_.end();
+ it != end; ++it) {
+ (*it)->prepare_event(evbase_);
}
}
@@ -196,50 +89,35 @@ void rpc_mclient::register_fd_readable_()
timeout.tv_sec = timeout_sec_;
timeout.tv_usec = 0;
- socket_info_map_t::iterator it, end;
- for (it = clients_.begin(), end = clients_.end(); it != end; ++it) {
- event_add(&it->second.ev_read, &timeout);
+ for (client_list_t::iterator it = clients_.begin(), end = clients_.end();
+ it != end; ++it) {
+ if ((*it)->is_closed())
+ continue;
+ (*it)->register_read(timeout);
}
}
-void rpc_mclient::register_fd_writable_()
+void rpc_mclient::register_fd_writable_(const msgpack::sbuffer& sbuf)
{
struct timeval timeout;
timeout.tv_sec = timeout_sec_;
timeout.tv_usec = 0;
- socket_info_map_t::iterator it, end;
- for (it = clients_.begin(), end = clients_.end(); it != end; ++it) {
- event_add(&it->second.ev_write, &timeout);
+ for (client_list_t::iterator it = clients_.begin(), end = clients_.end();
+ it != end; ++it) {
+ if ((*it)->is_closed())
+ continue;
+ (*it)->set_send_buffer(sbuf);
+ (*it)->register_write(timeout);
}
}
-void rpc_mclient::send_async(const msgpack::sbuffer& buf)
+void rpc_mclient::send_all(const msgpack::sbuffer& buf)
{
- context_ = async_context();
- context_.c = this;
- context_.evbase = evbase_;
- context_.rest = clients_.size();
- context_.buf = &buf;
-
- register_fd_writable_();
-
+ register_fd_writable_(buf);
event_base_dispatch(evbase_);
- // FIXME: It is desirable to event_del all write event for preparing failure.
- // TODO: check timeout or connection error
-}
-
-
-void rpc_mclient::join_some_()
-{
- context_.ret.clear();
-
- event_base_dispatch(evbase_);
- // FIXME: It is desirable to event_del all read event for preparing failure.
- // TODO: check timeout or connection error
-}
-
-}
-}
}
+} // mprpc
+} // common
+} // jubatus
View
305 src/common/mprpc/rpc_client.hpp
@@ -22,54 +22,97 @@
#include <map>
#include <msgpack.hpp>
+#include <event.h>
#include <pficommon/lang/shared_ptr.h>
#include <pficommon/lang/function.h>
-#include <pficommon/system/time_util.h>
-#include "async_client.hpp"
#include <glog/logging.h>
#include <pficommon/data/unordered_map.h>
+#include <pficommon/data/string/utility.h>
#include <pficommon/lang/noncopyable.h>
-#include <pficommon/network/mprpc/exception.h>
+#include <pficommon/network/mprpc/message.h>
-#include <event.h>
+#include "../exception.hpp"
+#include "exception.hpp"
+#include "async_client.hpp"
namespace jubatus { namespace common { namespace mprpc {
class rpc_mclient;
-struct rpc_response_t {
- msgpack::type::tuple<uint8_t,uint32_t,msgpack::object,msgpack::object> response;
- pfi::lang::shared_ptr<msgpack::zone> zone;
+class rpc_error {
+public:
+ rpc_error(const std::string& host, uint16_t port)
+ : host_(host), port_(port)
+ {}
- template<typename T> const T as() const { return response.a3.as<T>(); }
-};
+ rpc_error(const std::string& host, uint16_t port, jubatus::exception::exception_thrower_ptr thrower)
+ : host_(host), port_(port), exception_(thrower)
+ {}
+
+ std::string host() const { return host_; }
+ uint16_t port() const { return port_; }
-struct async_context {
- rpc_mclient* c;
- event_base* evbase;
- const msgpack::sbuffer* buf;
- size_t rest;
- std::vector<rpc_response_t> ret;
+ bool has_exception() const { return exception_; }
+ void throw_exception() const { exception_->throw_exception(); }
+ jubatus::exception::exception_thrower_ptr exception() const { return exception_; }
+
+private:
+ std::string host_;
+ uint16_t port_;
+ jubatus::exception::exception_thrower_ptr exception_;
};
-struct socket_context {
- event ev_read;
- event ev_write;
- pfi::lang::shared_ptr<async_sock> socket;
+typedef jubatus::exception::error_info<struct error_multi_rpc_, std::vector<rpc_error> > error_multi_rpc;
+inline std::string to_string(const error_multi_rpc& info)
+{
+ std::ostringstream result;
- socket_context(pfi::lang::shared_ptr<async_sock> s)
- : socket(s)
- {
- memset(&ev_read, 0, sizeof(event));
- memset(&ev_write, 0, sizeof(event));
+ size_t host_size = info.value().size();
+ if (host_size == 1) {
+ result << "rpc_error with 1 server" << '\n';
+ } else if (host_size > 1) {
+ result << "rpc_error with " << host_size << " servers" << '\n';
}
- socket_context()
- {
- memset(&ev_read, 0, sizeof(event));
- memset(&ev_write, 0, sizeof(event));
+ std::vector<rpc_error> errors = info.value();
+ for (std::vector<rpc_error>::const_iterator it = errors.begin(), end = errors.end();
+ it != end; ++it) {
+ result << " host: " << it->host() << ", port: " << it->port() << '\n';
+ std::ostringstream tmp;
+ if (it->has_exception()) {
+ try {
+ it->throw_exception();
+ } catch (const jubatus::exception::jubatus_exception& e) {
+ tmp << e.diagnostic_information(false);
+ } catch (const std::exception& e) {
+ tmp << e.what();
+ } catch (...) {
+ tmp << "...";
+ }
+
+ // Indent each line
+ std::vector<std::string> lines = pfi::data::string::split(tmp.str(), '\n');
+ std::ostringstream msg;
+ for (std::vector<std::string>::iterator it = lines.begin(), end = lines.end();
+ it != end; ++it) {
+ if (it->empty()) continue;
+ msg << " " << *it << '\n';
+ }
+ result << msg.str();
+ }
}
+
+ return result.str();
+}
+
+template <class Res>
+struct rpc_result {
+ pfi::lang::shared_ptr<Res> value;
+ std::vector<rpc_error> error;
+
+ Res& operator*() const { return *value; }
+ bool has_error() const { return !error.empty(); }
};
class rpc_mclient : pfi::lang::noncopyable
@@ -81,116 +124,172 @@ class rpc_mclient : pfi::lang::noncopyable
int timeout_sec);
~rpc_mclient();
- template <typename Res, typename Argv>
- Res call(const std::string& m, const Argv& a,
- const pfi::lang::function<Res(Res,Res)>& reducer){
- call_async(m, a);
- return join_all(reducer);
- }
-
- void send_async(const msgpack::sbuffer& buf);
-
- void call_async(const std::string&);
-
- template <typename A0>
- void call_async(const std::string&, const A0& a0);
- template <typename A0, typename A1>
- void call_async(const std::string&, const A0& a0, const A1& a1);
- template <typename A0, typename A1, typename A2>
- void call_async(const std::string&, const A0& a0, const A1& a1, const A2& a2);
- template <typename A0, typename A1, typename A2, typename A3>
- void call_async(const std::string&, const A0&, const A1&, const A2&, const A3&);
+ template <typename Res, typename A0>
+ rpc_result<Res> call(const std::string&, const A0& a0, const pfi::lang::function<Res(Res,Res)>& reducer);
+ template <typename Res, typename A0, typename A1>
+ rpc_result<Res> call(const std::string&, const A0& a0, const A1& a1, const pfi::lang::function<Res(Res,Res)>& reducer);
+ template <typename Res, typename A0, typename A1, typename A2>
+ rpc_result<Res> call(const std::string&, const A0& a0, const A1& a1, const A2& a2, const pfi::lang::function<Res(Res,Res)>& reducer);
+ template <typename Res, typename A0, typename A1, typename A2, typename A3>
+ rpc_result<Res> call(const std::string&, const A0&, const A1&, const A2&, const A3&, const pfi::lang::function<Res(Res,Res)>& reducer);
- template <typename Res>
- Res join_all(const pfi::lang::function<Res(Res,Res)>& reducer);
-
- int readable_callback(int, int, async_context*);
- int writable_callback(int, int, async_context*);
+private:
+ static void readable_callback(int fd, short int events, void* arg);
+ static void writable_callback(int fd, short int events, void* arg);
+ void connect_async_();
-private:
void create_fd_event_();
void register_fd_readable_();
- void register_fd_writable_();
+ void register_fd_writable_(const msgpack::sbuffer&);
+
+ void send_all(const msgpack::sbuffer& buf);
template <typename Arr>
- void call_async_(const std::string&, const Arr& a);
+ void call_(const std::string&, const Arr& a);
+
+ template <typename Res>
+ rpc_result<Res> join_(const std::string&, const pfi::lang::function<Res(Res,Res)>& reducer);
- void connect_async_();
- void join_some_();
std::vector<std::pair<std::string, uint16_t> > hosts_;
int timeout_sec_;
- typedef pfi::data::unordered_map<int, socket_context> socket_info_map_t;
- socket_info_map_t clients_;
- pfi::system::time::clock_time start_;
+ typedef std::vector<pfi::lang::shared_ptr<async_client> > client_list_t;
+ client_list_t clients_;
- // WARN: Don't move context_ address which is referenced
- // by readable_callback and writable_callback.
- async_context context_;
+ pfi::system::time::clock_time start_;
event_base* evbase_;
};
template <typename Arr>
-void rpc_mclient::call_async_(const std::string& m, const Arr& argv)
+void rpc_mclient::call_(const std::string& m, const Arr& argv)
{
msgpack::sbuffer sbuf;
msgpack::type::tuple<uint8_t,uint32_t,std::string,Arr> rpc_request(0, 0xDEADBEEF, m, argv);
msgpack::pack(&sbuf, rpc_request);
- send_async(sbuf);
+ send_all(sbuf);
}
-template <typename A0>
-void rpc_mclient::call_async(const std::string& m, const A0& a0)
-{
- call_async_(m, msgpack::type::tuple<A0>(a0));
-}
-
-template <typename A0, typename A1>
-void rpc_mclient::call_async(const std::string& m, const A0& a0, const A1& a1)
-{
- call_async_(m, msgpack::type::tuple<A0, A1>(a0, a1));
-}
-template <typename A0, typename A1, typename A2>
-void rpc_mclient::call_async(const std::string& m, const A0& a0, const A1& a1, const A2& a2)
-{
- call_async_(m, msgpack::type::tuple<A0, A1, A2>(a0, a1, a2));
-}
-template <typename A0, typename A1, typename A2, typename A3>
-void rpc_mclient::call_async(const std::string& m, const A0& a0, const A1& a1, const A2& a2, const A3& a3)
-{
- call_async_(m, msgpack::type::tuple<A0, A1, A2, A3>(a0, a1, a2, a3));
-}
-
-
template <typename Res>
-Res rpc_mclient::join_all(const pfi::lang::function<Res(Res,Res)>& reducer)
+rpc_result<Res> rpc_mclient::join_(const std::string& method, const pfi::lang::function<Res(Res,Res)>& reducer)
{
- if (clients_.empty())
- throw pfi::network::mprpc::rpc_error("no clients.");
+ using pfi::lang::shared_ptr;
+ rpc_result<Res> result;
- context_ = async_context();
- context_.c = this;
- context_.evbase = evbase_;
- context_.rest = clients_.size();
- context_.buf = NULL;
+ if (clients_.empty())
+ throw JUBATUS_EXCEPTION(rpc_no_client() << error_method(method));
register_fd_readable_();
+ event_base_dispatch(evbase_);
+
+ size_t count = 0;
+
+ for (client_list_t::iterator it = clients_.begin(), end = clients_.end();
+ it != end; ++it) {
+ shared_ptr<async_client> client = *it;
+ async_client::response_list_t& response_list = client->response();
+
+ try {
+ if (client->read_exception())
+ client->read_exception()->throw_exception();
+ if (client->write_exception())
+ client->write_exception()->throw_exception();
+ if (response_list.empty())
+ throw JUBATUS_EXCEPTION(rpc_no_result() << error_method(method));
+
+ // If implement RPC specification strictly, you must find response by msgid,
+ // but pfi::network::mprpc::rpc_server does not support RPC pipelining.
+ rpc_response_t res = response_list.front();
+ response_list.erase(response_list.begin());
+ if (res.has_error()) {
+ if (res.error().type == msgpack::type::POSITIVE_INTEGER) {
+ // error code defined in pficommon/mprpc/message.h
+ switch (static_cast<unsigned int>(res.error().via.u64)) {
+ case pfi::network::mprpc::METHOD_NOT_FOUND:
+ throw JUBATUS_EXCEPTION(rpc_method_not_found() << error_method(method));
+
+ case pfi::network::mprpc::TYPE_MISMATCH:
+ throw JUBATUS_EXCEPTION(rpc_type_error() << error_method(method));
+
+ default:
+ throw JUBATUS_EXCEPTION(rpc_call_error()
+ << error_method(method)
+ << jubatus::exception::error_message(std::string("rpc_server error: " + pfi::lang::lexical_cast<std::string>(res.error().via.u64))));
+ }
+ } else {
+ // MEMO: other error object returned
+ throw JUBATUS_EXCEPTION(rpc_call_error()
+ << error_method(method)
+ << jubatus::exception::error_message("error response: " + pfi::lang::lexical_cast<std::string>(res.error())));
+ }
+ }
+
+ try {
+ // MEMO: msgpack can be thrown an exception
+ if (result.value) {
+ *result.value = reducer(*result, res.as<Res>());
+ } else {
+ result.value = shared_ptr<Res>(new Res(res.as<Res>()));
+ }
+ } catch (const msgpack::type_error&) {
+ throw JUBATUS_EXCEPTION(rpc_type_error()
+ << error_method(method)
+ << jubatus::exception::error_message("recv object cannot convert"));
+ }
+
+ count++;
+
+ // continue process next result when exception thrown
+ } catch (...) {
+ // store exception_thrower to list of error
+ result.error.push_back(rpc_error(client->host(), client->port(), jubatus::exception::get_current_exception()));
+
+ // clear last exception set by libevent callback
+ client->context_->read_exception.reset();
+ client->context_->write_exception.reset();
+ }
+ }
- join_some_();
- if (context_.ret.empty())
- throw pfi::network::mprpc::rpc_error("no results.");
-
- Res result = context_.ret[0].as<Res>();
- for(size_t i=1;i<context_.ret.size();++i){
- result = reducer(result, context_.ret[i].as<Res>());
+ if (!count) {
+ rpc_no_result e;
+ if (result.has_error())
+ e << error_multi_rpc(result.error);
+ throw JUBATUS_EXCEPTION(e << error_method(method));
}
return result;
}
+template <typename Res, typename A0>
+rpc_result<Res> rpc_mclient::call(const std::string& m, const A0& a0, const pfi::lang::function<Res(Res,Res)>& reducer)
+{
+ call_(m, msgpack::type::tuple<A0>(a0));
+ return join_(m, reducer);
}
+
+template <typename Res, typename A0, typename A1>
+rpc_result<Res> rpc_mclient::call(const std::string& m, const A0& a0, const A1& a1, const pfi::lang::function<Res(Res,Res)>& reducer)
+{
+ call_(m, msgpack::type::tuple<A0, A1>(a0, a1));
+ return join_(m, reducer);
}
+
+template <typename Res, typename A0, typename A1, typename A2>
+rpc_result<Res> rpc_mclient::call(const std::string& m, const A0& a0, const A1& a1, const A2& a2, const pfi::lang::function<Res(Res,Res)>& reducer)
+{
+ call_(m, msgpack::type::tuple<A0, A1, A2>(a0, a1, a2));
+ return join_(m, reducer);
+}
+
+template <typename Res, typename A0, typename A1, typename A2, typename A3>
+rpc_result<Res> rpc_mclient::call(const std::string& m, const A0& a0, const A1& a1, const A2& a2, const A3& a3, const pfi::lang::function<Res(Res,Res)>& reducer)
+{
+ call_(m, msgpack::type::tuple<A0, A1, A2, A3>(a0, a1, a2, a3));
+ return join_(m, reducer);
}
+
+} // mprpc
+} // common
+} // jubatus
View
248 src/common/mprpc/rpc_client_test.cpp
@@ -31,6 +31,12 @@
using namespace std;
using pfi::lang::function;
+using pfi::lang::shared_ptr;
+using pfi::concurrent::thread;
+using jubatus::common::mprpc::rpc_result;
+using jubatus::common::mprpc::rpc_error;
+using jubatus::common::mprpc::error_multi_rpc;
+using jubatus::exception::error_info_list_t;
struct strw{
string key;
@@ -72,34 +78,108 @@ static vector<string> concat_vector(const vector<string> lhs, const vector<strin
MPRPC_GEN(1, test_mrpc, test_bool, test_twice, add_all, various, sum, vec);
-static void server_thread(unsigned u){
- test_mrpc_server srv(3.0);
- srv.set_test_bool(&test_bool);
- srv.set_test_twice(&test_twice);
- srv.set_add_all(&add_all);
- srv.set_various(&various);
- srv.set_sum(&sum);
- srv.set_vec(&vec);
- srv.serv(u, 10);
-}
+typedef shared_ptr<test_mrpc_server> server_ptr;
+typedef vector<server_ptr> server_list;
+typedef vector<shared_ptr<thread> > thread_list;
-static void fork_server(unsigned u){
- pfi::concurrent::thread th(pfi::lang::bind(&server_thread, u));
- th.start();
- th.detach();
+static void server_thread(server_ptr srv, unsigned u)
+{
+ srv->set_test_bool(&test_bool);
+ srv->set_test_twice(&test_twice);
+ srv->set_add_all(&add_all);
+ srv->set_various(&various);
+ srv->set_sum(&sum);
+ srv->set_vec(&vec);
+ srv->serv(u, 10);
}
static const uint16_t PORT0 = 60023;
static const uint16_t PORT1 = 60024;
static const uint16_t kPortStart = 60023;
static const uint16_t kPortEnd = kPortStart + 10;
+TEST(rpc_mclient, no_client)
+{
+ vector<pair<string,uint16_t> > hosts;
+ jubatus::common::mprpc::rpc_mclient cli(hosts, 3.0);
+
+ // MUST USE with some hosts
+ ASSERT_THROW(cli.call("test", 1234, function<bool(bool,bool)>(&jubatus::framework::all_and)),
+ jubatus::common::mprpc::rpc_no_client);
+}
+
+TEST(rpc_mclient, no_result)
+{
+ vector<pair<string,uint16_t> > hosts;
+ hosts.push_back(make_pair(string("localhost"), kPortStart));
+ hosts.push_back(make_pair(string("localhost"), kPortStart + 1));
+
+ jubatus::common::mprpc::rpc_mclient cli(hosts, 3.0);
+ ASSERT_THROW(cli.call("test", 1234, function<bool(bool,bool)>(&jubatus::framework::all_and)),
+ jubatus::common::mprpc::rpc_no_result);
+}
+
+namespace {
+void timeout_server(pfi::network::mprpc::socket* server_socket)
+{
+ ::accept(server_socket->get(), NULL, NULL);
+
+ // wait socket shutdown
+ ::accept(server_socket->get(), NULL, NULL);
+}
+}
+
+TEST(rpc_mclient, error_multi_rpc)
+{
+ pfi::network::mprpc::socket server_socket;
+ server_socket.listen(kPortStart);
+ thread t(pfi::lang::bind(&timeout_server, &server_socket));
+ t.start();
+
+ vector<pair<string,uint16_t> > hosts;
+ hosts.push_back(make_pair(string("localhost"), kPortStart));
+ jubatus::common::mprpc::rpc_mclient cli(hosts, 1.0);
+
+ // error_multi_rpc
+ try {
+ cli.call("test", 1234, function<bool(bool,bool)>(&jubatus::framework::all_and));
+
+ } catch (jubatus::common::mprpc::rpc_no_result& e) {
+ const error_info_list_t& list = e.error_info();
+ bool has_error_multi_rpc = false;
+ for (error_info_list_t::const_iterator it = list.begin(), end = list.end();
+ it != end; ++it) {
+ if (error_multi_rpc* multi_error = dynamic_cast<error_multi_rpc*>(it->get())) {
+ has_error_multi_rpc = true;
+ std::vector<rpc_error> error_list = multi_error->value();
+ EXPECT_EQ(1, error_list.size());
+
+ EXPECT_EQ(string("localhost"), error_list[0].host());
+ EXPECT_EQ(kPortStart, error_list[0].port());
+
+ EXPECT_THROW(error_list[0].throw_exception(), jubatus::common::mprpc::rpc_timeout_error);
+ }
+ }
+
+ EXPECT_TRUE(has_error_multi_rpc);
+ }
+
+ server_socket.close();
+}
+
+
TEST(rpc_mclient, small)
{
vector<pair<string,uint16_t> > clients;
+ server_list servers;
+ thread_list threads;
for (uint16_t port = kPortStart; port <= kPortEnd; port++) {
- fork_server(port);
+ server_ptr ser = server_ptr(new test_mrpc_server(3.0));
+ servers.push_back(ser);
+ threads.push_back(shared_ptr<thread>(new thread(pfi::lang::bind(&server_thread, ser, port))));
+ threads.back()->start();
+
clients.push_back(make_pair(string("localhost"), port));
}
const size_t kServerSize = clients.size();
@@ -112,20 +192,19 @@ TEST(rpc_mclient, small)
}
jubatus::common::mprpc::rpc_mclient cli(clients, 3.0);
{
- cli.call_async("test_bool", 73684);
- EXPECT_FALSE(cli.join_all(function<bool(bool,bool)>(&jubatus::framework::all_and)));
+ rpc_result<bool> r = cli.call("test_bool", 73684, function<bool(bool,bool)>(&jubatus::framework::all_and));
+ EXPECT_FALSE(*r);
}
{
int ans = 73684*2 * kServerSize;
- cli.call_async("test_twice", 73684);
- EXPECT_EQ(ans,
- cli.join_all(function<int(int,int)>(&jubatus::framework::add<int>)));
+ rpc_result<int> r = cli.call("test_twice", 73684, function<int(int,int)>(&jubatus::framework::add<int>));
+
+ EXPECT_EQ(ans, *r);
}
{
int ans = kServerSize*(23+21-234);
- cli.call_async("add_all", 23,21,-234);
- EXPECT_EQ(ans,
- cli.join_all(function<int(int,int)>(&jubatus::framework::add<int>)));
+ rpc_result<int> r = cli.call("add_all", 23,21,-234, function<int(int,int)>(&jubatus::framework::add<int>));
+ EXPECT_EQ(ans, *r);
}
{
int i = 234;
@@ -138,50 +217,129 @@ TEST(rpc_mclient, small)
for (size_t c = 0; c < kServerSize; c++) {
ans = concat(ans, various(i,f,d,s));
}
- cli.call_async("various", i,f,d,s);
- EXPECT_EQ(ans, cli.join_all(function<string(string,string)>(&concat)));
+ rpc_result<string> r = cli.call("various", i, f, d, s, function<string(string,string)>(&concat));
+ EXPECT_EQ(ans, *r);
}
{
const int payload_count = 1024 * 1024;
vector<int> hoge(payload_count, 10);
- cli.call_async("sum", hoge);
int ans = 10 * payload_count * kServerSize;
- EXPECT_EQ(ans,
- cli.join_all(function<int(int,int)>(&jubatus::framework::add<int>)));
+ rpc_result<int> r = cli.call("sum", hoge, function<int(int,int)>(&jubatus::framework::add<int>));
+ EXPECT_EQ(ans, *r);
}
{
- cli.call_async("vec", string("a"), 200);
pfi::lang::function<std::vector<std::string>
(const std::vector<std::string>, const std::vector<std::string>)> f = &concat_vector;
- vector<string> x = cli.join_all<vector<string> > (f);
- EXPECT_EQ(200 * kServerSize, x.size());
+
+ rpc_result<vector<string> > r = cli.call("vec", string("a"), 200, f);
+ EXPECT_EQ(200 * kServerSize, r.value->size());
+ }
+
+ { // server_error: method_not_found
+ //ASSERT_THROW(cli.call("undefined_method", 1, function<int(int,int)>(&jubatus::framework::add<int>)),
+ // jubatus::common::mprpc::rpc_no_result);
+
+ try {
+ rpc_result<int> r = cli.call("undefined_method", 1, function<int(int,int)>(&jubatus::framework::add<int>));
+
+ } catch (jubatus::common::mprpc::rpc_no_result& e) {
+ const error_info_list_t& list = e.error_info();
+ bool has_error_multi_rpc = false;
+ // FIXME: support `error_multi_rpc* multi_error = jubatus_exception::find_first<error_multi_rpc>();' like format
+ for (error_info_list_t::const_iterator it = list.begin(), end = list.end();
+ it != end; ++it) {
+ if (error_multi_rpc* multi_error = dynamic_cast<error_multi_rpc*>(it->get())) {
+ has_error_multi_rpc = true;
+ std::vector<rpc_error> error_list = multi_error->value();
+
+ for (size_t i = 0; i < error_list.size(); i++) {
+ EXPECT_EQ(string("localhost"), error_list[i].host());
+ EXPECT_EQ(kPortStart + i, error_list[i].port());
+
+ ASSERT_THROW(error_list[i].throw_exception(), jubatus::common::mprpc::rpc_method_not_found);
+ // TODO: check exception error_info has jubatus::common::error_method("undefined_method")
+ // but I checked using jubatus_exception::diagnostic_information manually.
+ }
+ }
+ }
+ EXPECT_TRUE(has_error_multi_rpc);
+ }
}
+
+ { // server_error: rpc_type_error
+ ASSERT_THROW(cli.call("sum", string("test"), function<int(int,int)>(&jubatus::framework::add<int>)),
+ jubatus::common::mprpc::rpc_no_result);
+
+ try {
+ cli.call("sum", string("test"), function<int(int,int)>(&jubatus::framework::add<int>));
+ } catch (jubatus::common::mprpc::rpc_no_result& e) {
+ cout << "rpc_no_result" << e.diagnostic_information(true) << endl;
+
+ const error_info_list_t& list = e.error_info();
+ bool has_error_multi_rpc = false;
+ // FIXME: support `error_multi_rpc* multi_error = jubatus_exception::find_first<error_multi_rpc>();' like format
+ for (error_info_list_t::const_iterator it = list.begin(), end = list.end();
+ it != end; ++it) {
+ if (error_multi_rpc* multi_error = dynamic_cast<error_multi_rpc*>(it->get())) {
+ has_error_multi_rpc = true;
+ std::vector<rpc_error> error_list = multi_error->value();
+
+ for (size_t i = 0; i < error_list.size(); i++) {
+ EXPECT_EQ(string("localhost"), error_list[i].host());
+ EXPECT_EQ(kPortStart + i, error_list[i].port());
+
+ ASSERT_THROW(error_list[i].throw_exception(), jubatus::common::mprpc::rpc_type_error);
+ // TODO: check exception error_info has jubatus::common::error_method("sum")
+ // but I checked using jubatus_exception::diagnostic_information manually.
+ }
+ }
+ }
+ EXPECT_TRUE(has_error_multi_rpc);
+ }
+ }
+
+
+ for (size_t i = 0; i < servers.size(); i++)
+ servers[i]->stop();
}
TEST(rpc_mclient, socket_disconnection)
{
- vector<pair<string,uint16_t> > clients;
+ const int kInvalidPort = kPortStart + 1000;
- for (uint16_t port = kPortStart; port <= kPortStart + 2; port++) {
- fork_server(port);
- if (port == kPortStart)
- clients.push_back(make_pair(string("localhost"), port + 1000)); // connection refused
- else
- clients.push_back(make_pair(string("localhost"), port));
- }
+ server_ptr ser(new test_mrpc_server(3.0));
+ thread th(pfi::lang::bind(&server_thread, ser, kPortStart));
+ th.start();
+
+ vector<pair<string,uint16_t> > clients;
+ clients.push_back(make_pair(string("localhost"), kPortStart));
+ clients.push_back(make_pair(string("localhost"), kInvalidPort)); // connection refused
usleep(500000);
{
- test_mrpc_client cli0("localhost", PORT0, 3.0);
- test_mrpc_client cli1("localhost", PORT1, 3.0);
+ test_mrpc_client cli0("localhost", kPortStart, 3.0);
+ test_mrpc_client cli1("localhost", kInvalidPort, 3.0);
EXPECT_EQ(true, cli0.call_test_bool(23));
- EXPECT_EQ(24, cli1.call_test_twice(12));
+ EXPECT_THROW(cli1.call_test_twice(12), pfi::network::mprpc::rpc_error);
}
- jubatus::common::mprpc::rpc_mclient cli(clients, 3.0);
+ jubatus::common::mprpc::rpc_mclient cli(clients, 1.0);
{
- cli.call_async("test_bool", 73684);
- EXPECT_FALSE(cli.join_all(function<bool(bool,bool)>(&jubatus::framework::all_and)));
+ rpc_result<bool> r = cli.call("test_bool", 73684, function<bool(bool,bool)>(&jubatus::framework::all_and));
+ EXPECT_FALSE(*r);
+
+ EXPECT_TRUE(r.has_error());
+ ASSERT_EQ(1, r.error.size());
+
+ rpc_error& error = r.error.front();
+ EXPECT_EQ(string("localhost"), error.host());
+ EXPECT_EQ(kInvalidPort, error.port());
+
+ EXPECT_TRUE(error.has_exception());
+ EXPECT_THROW(error.throw_exception(), jubatus::common::mprpc::rpc_io_error);
}
+
+ ser->stop();
}
+
View
2 src/common/mprpc/wscript
@@ -17,7 +17,7 @@ def build(bld):
source = 'rpc_client_test.cpp',
target = 'rpc_client_test',
includes = '. ../framework',
- use = 'PFICOMMON MSGPACK jubacommon_mprpc',
+ use = 'PFICOMMON MSGPACK jubacommon jubacommon_mprpc',
)
bld.install_files('${PREFIX}/include/jubatus/common/mprpc', bld.path.ant_glob('*.hpp'))
View
34 src/common/util.cpp
@@ -20,6 +20,7 @@
#include <cstring>
#include <cstdio>
#include <cstdlib>
+#include <csignal>
#include <errno.h>
#include <string.h>
@@ -185,6 +186,39 @@ void get_machine_status(std::map<std::string, std::string>& ret){
}
+namespace {
+void exit_on_term(int)
+{
+ exit(0);
+}
+}
+
+void set_exit_on_term()
+{
+ struct sigaction sigact;
+ sigact.sa_handler = exit_on_term;
+ sigact.sa_flags = SA_RESTART;
+
+ if (sigaction(SIGTERM, &sigact, NULL) != 0)
+ throw JUBATUS_EXCEPTION(jubatus::exception::runtime_error("can't set SIGTERM handler")
+ << jubatus::exception::error_api_func("sigaction")
+ << jubatus::exception::error_errno(errno));
+
+ if (sigaction(SIGINT, &sigact, NULL) != 0)
+ throw JUBATUS_EXCEPTION(jubatus::exception::runtime_error("can't set SIGINT handler")
+ << jubatus::exception::error_api_func("sigaction")
+ << jubatus::exception::error_errno(errno));
+}
+
+void ignore_sigpipe()
+{
+ // portable code for socket write(2) MSG_NOSIGNAL
+ if(signal(SIGPIPE, SIG_IGN) == SIG_ERR)
+ throw JUBATUS_EXCEPTION(jubatus::exception::runtime_error("can't ignore SIGPIPE")
+ << jubatus::exception::error_api_func("signal")
+ << jubatus::exception::error_errno(errno));
+}
+
} //util
} //jubatus
View
3 src/common/util.hpp
@@ -38,5 +38,8 @@ void append_server_path(const std::string& argv0);
void get_machine_status(std::map<std::string, std::string>&);
+void set_exit_on_term();
+void ignore_sigpipe();
+
} //util
} //namespace jubatus
View
32 src/common/zk.cpp
@@ -18,6 +18,7 @@
#include "zk.hpp"
#include <assert.h>
+#include <unistd.h>
#include <pficommon/concurrent/lock.h>
#include <pficommon/lang/bind.h>
@@ -47,18 +48,24 @@ namespace common{
zoo_set_log_stream(logfilep_);
}
- do{
- // FIXME?: this loop will call zookeeper_init many times till
- // the state got ZOO_CONNECTING_STAT
- // timeout is supposed to be ms??
- zh_ = zookeeper_init(hosts.c_str(), NULL, timeout * 1000, 0, NULL, 0);
- if(zh_ == NULL){
- perror("");
- throw JUBATUS_EXCEPTION(jubatus::exception::runtime_error("cannot init zk")
- << jubatus::exception::error_api_func("zookeeper_init"));
- }
- state_ = zoo_state(zh_);
- }while(state_ == ZOO_CONNECTING_STATE);
+ zh_ = zookeeper_init(hosts.c_str(), NULL, timeout * 1000, 0, NULL, 0);
+ if(!zh_){
+ perror("");
+ throw JUBATUS_EXCEPTION(jubatus::exception::runtime_error("cannot init zk")
+ << jubatus::exception::error_api_func("zookeeper_init")
+ << jubatus::exception::error_errno(errno));
+ }
+
+ // sleep the state got not ZOO_CONNECTING_STATE
+ while((state_ = zoo_state(zh_)) == ZOO_CONNECTING_STATE){
+ usleep(100);
+ }
+
+ if(is_unrecoverable(zh_) == ZINVALIDSTATE){
+ throw JUBATUS_EXCEPTION(jubatus::exception::runtime_error("cannot connect zk")
+ << jubatus::exception::error_api_func("is_unrecoverable")
+ << jubatus::exception::error_message(zerror(errno)));
+ }
zoo_set_context(zh_, this);
zoo_set_watcher(zh_, mywatcher);
@@ -73,6 +80,7 @@ namespace common{
void zk::force_close(){
zookeeper_close(zh_);
+ zh_ = NULL;
}
void zk::create(const std::string& path, const std::string& payload, bool ephemeral){
scoped_lock lk(m_);
View
47 src/framework/jubatus_serv.cpp
@@ -37,6 +37,8 @@ using pfi::lang::function;
using pfi::system::time::clock_time;
using pfi::system::time::get_clock_time;
+using jubatus::common::mprpc::rpc_result;
+
namespace jubatus { namespace framework {
jubatus_serv::jubatus_serv(const server_argv& a, const std::string& base_path):
@@ -51,7 +53,7 @@ jubatus_serv::jubatus_serv(const server_argv& a, const std::string& base_path):
base_path_(a_.tmpdir)
{
-};
+}
int jubatus_serv::start(pfi::network::mprpc::rpc_server& serv){
@@ -110,13 +112,13 @@ void jubatus_serv::register_mixable(mixable0* m){
#endif
mixables_.push_back(m);
-};
+}
void jubatus_serv::use_cht(){
#ifdef HAVE_ZOOKEEPER_H
use_cht_ = true;
#endif
-};
+}
std::map<std::string, std::map<std::string,std::string> > jubatus_serv::get_status() const {
std::map<std::string, std::string> data;
@@ -142,15 +144,15 @@ std::map<std::string, std::map<std::string,std::string> > jubatus_serv::get_stat
std::map<std::string, std::map<std::string,std::string> > ret;
ret[get_server_identifier()] = data;
return ret;
-};
+}
std::string jubatus_serv::get_server_identifier()const{
std::stringstream ss;
ss << a_.eth;
ss << "_";
ss << a_.port;
return ss.str();
-};
+}
//here
#ifdef HAVE_ZOOKEEPER_H
@@ -190,30 +192,33 @@ void jubatus_serv::join_to_cluster(common::cshared_ptr<jubatus::common::lock_ser
mixables_[i]->load(ss);
}
DLOG(INFO) << "all data successfully loaded to " << mixables_.size() << " mixables.";
-};
+}
std::string jubatus_serv::get_storage(){
std::stringstream ss;
for(size_t i=0; i<mixables_.size(); ++i){
+ if(mixables_[i] == NULL){
+ LOG(ERROR) << i << "th mixable is null";
+ throw JUBATUS_EXCEPTION(config_not_set());
+ }
mixables_[i]->save(ss);
}
LOG(INFO) << "new server has come. Sending back " << ss.str().size() << " bytes.";
return ss.str();
}
std::vector<std::string> jubatus_serv::get_diff_impl(int){
- // if(mixables_.empty()){
- // //throw config_not_set(); nothing to mix
- // }
std::vector<std::string> o;
- {
- scoped_lock lk(rlock(m_));
- for(size_t i=0; i<mixables_.size(); ++i){
- o.push_back(mixables_[i]->get_diff());
- }
+
+ scoped_lock lk(rlock(m_));
+ if(mixables_.empty()){
+ throw JUBATUS_EXCEPTION(config_not_set()); // nothing to mix
+ }
+ for(size_t i=0; i<mixables_.size(); ++i){
+ o.push_back(mixables_[i]->get_diff());
}
return o;
-};
+}
int jubatus_serv::put_diff_impl(std::vector<std::string> unpacked){
scoped_lock lk(wlock(m_));
@@ -226,7 +231,7 @@ int jubatus_serv::put_diff_impl(std::vector<std::string> unpacked){
}
mixer_->clear();
return 0;
-};
+}
std::vector<std::string> jubatus_serv::mix_agg(const std::vector<std::string>& lhs,
const std::vector<std::string>& rhs){
@@ -240,7 +245,7 @@ std::vector<std::string> jubatus_serv::mix_agg(const std::vector<std::string>& l
ret.push_back(tmp);
}