Skip to content
This repository

Some mprpc fixes #49

Merged
merged 3 commits into from about 2 years ago

1 participant

Shuzo Kashihara
Shuzo Kashihara
Owner
suma commented
  • Fixed mprpc/rpc_server timeout(including rpc_timeout_error exception handling #30)
  • Fixed correct way socket closing to call shutdown before closing socket
  • Added 'running' and 'stop' member functions to rpc_server
  • Added timeout testing to mprpc_test, and refactored
Shuzo Kashihara suma merged commit 6ea510b into from
Shuzo Kashihara suma closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
1  src/network/mprpc/object_stream.cpp
@@ -60,6 +60,7 @@ object_stream::object_stream(const std::string& host, uint16_t port) :
60 60
61 61 object_stream::~object_stream()
62 62 {
  63 + ::shutdown(iofd, SHUT_RDWR);
63 64 ::close(iofd);
64 65 }
65 66
29 src/network/mprpc/rpc_server.cpp
@@ -67,7 +67,9 @@ bool basic_server::create(uint16_t port, int backlog)
67 67
68 68
69 69 rpc_server::rpc_server(double timeout_sec) :
70   - timeout_sec(timeout_sec) { }
  70 + timeout_sec(timeout_sec),
  71 + serv_running(false)
  72 +{ }
71 73
72 74 rpc_server::~rpc_server() { }
73 75
@@ -80,6 +82,8 @@ bool rpc_server::serv(uint16_t port, int nthreads)
80 82 if (!basic_server::create(port))
81 83 return false;
82 84
  85 + serv_running = true;
  86 +
83 87 std::vector<shared_ptr<thread> > ths(nthreads);
84 88 for (int i=0; i<nthreads; i++) {
85 89 ths[i] = shared_ptr<thread>(new thread(
@@ -94,9 +98,20 @@ bool rpc_server::serv(uint16_t port, int nthreads)
94 98 return true;
95 99 }
96 100
  101 +bool rpc_server::running() const
  102 +{
  103 + return serv_running;
  104 +}
  105 +
  106 +void rpc_server::stop()
  107 +{
  108 + serv_running = false;
  109 + close();
  110 +}
  111 +
97 112 void rpc_server::process()
98 113 {
99   - while(true) {
  114 + while(serv_running) {
100 115 int s;
101 116 NO_INTR(s, ::accept(sock.get(), NULL, NULL));
102 117 if (FAILED(s)) { continue; }
@@ -104,7 +119,7 @@ void rpc_server::process()
104 119
105 120 ns.set_nodelay(true);
106 121 if(timeout_sec > 0) {
107   - if(!sock.set_timeout(timeout_sec)) {
  122 + if(!ns.set_timeout(timeout_sec)) {
108 123 continue;
109 124 }
110 125 }
@@ -112,9 +127,13 @@ void rpc_server::process()
112 127 pfi::lang::shared_ptr<rpc_stream> rs(new rpc_stream(ns.get(), timeout_sec));
113 128 ns.release();
114 129
115   - while(true) {
  130 + while(serv_running) {
116 131 rpc_message msg;
117   - if(!rs->receive(&msg)) {
  132 + try {
  133 + if(!rs->receive(&msg)) {
  134 + break;
  135 + }
  136 + } catch (const rpc_timeout_error&) {
118 137 break;
119 138 }
120 139
3  src/network/mprpc/rpc_server.h
@@ -67,6 +67,8 @@ class rpc_server : public basic_server {
67 67 ~rpc_server();
68 68
69 69 bool serv(uint16_t port, int nthreads);
  70 + bool running() const;
  71 + void stop();
70 72 void process();
71 73
72 74 template <class T>
@@ -74,6 +76,7 @@ class rpc_server : public basic_server {
74 76
75 77 private:
76 78 double timeout_sec;
  79 + volatile bool serv_running;
77 80
78 81 void add(const std::string &name,
79 82 pfi::lang::shared_ptr<invoker_base> invoker);
1  src/network/mprpc/socket.cpp
@@ -61,6 +61,7 @@ bool socket::close()
61 61 if(fd < 0) { return false; }
62 62
63 63 int ret;
  64 + ::shutdown(fd, SHUT_RDWR);
64 65 NO_INTR(ret,::close(fd));
65 66
66 67 if (FAILED(ret)){
226 src/network/mprpc_test.cpp
@@ -38,10 +38,12 @@
38 38
39 39 #include "../lang/bind.h"
40 40 #include "../concurrent/thread.h"
  41 +#include "../system/time_util.h"
41 42
42 43 using namespace std;
43 44 using namespace pfi::lang;
44 45 using namespace pfi::concurrent;
  46 +using namespace pfi::system::time;
45 47
46 48 MPRPC_PROC(test_str, string(string));
47 49 static string test_str(const string& v){ return v; }
@@ -56,82 +58,152 @@ static set<int> test_set(const set<int>& v){ return v; }
56 58
57 59 MPRPC_GEN(1, testrpc, test_str, test_pair, test_vec, test_map, test_set);
58 60
59   -static void server_thread()
  61 +namespace {
  62 +const string kLocalhost = "localhost";
  63 +const int kServThreads = 10;
  64 +const int kTestRPCPort = 31241;
  65 +const int kTestStructRPCPort = 31242;
  66 +const double kServerTimeout = 3.0;
  67 +const double kClientTimeout = 3.0;
  68 +const double kTestTimeout = 0.5;
  69 +}
  70 +
  71 +static void server_thread(testrpc_server *ser)
60 72 {
61   - testrpc_server ser(3.0);
62   - ser.set_test_str(&test_str);
63   - ser.set_test_pair(&test_pair);
64   - ser.set_test_vec(&test_vec);
65   - ser.set_test_map(&test_map);
66   - ser.set_test_set(&test_set);
67   - ser.serv(31241, 10);
  73 + ser->set_test_str(&test_str);
  74 + ser->set_test_pair(&test_pair);
  75 + ser->set_test_vec(&test_vec);
  76 + ser->set_test_map(&test_map);
  77 + ser->set_test_set(&test_set);
  78 + ser->serv(kTestRPCPort, kServThreads);
68 79 }
69 80
70 81 TEST(mprpc, mprpc_test)
71 82 {
72   - thread t(&server_thread);
  83 + testrpc_server ser(kServerTimeout);
  84 + thread t(pfi::lang::bind(&server_thread, &ser));
73 85 t.start();
74 86
75 87 sleep(1);
76 88
77 89 int times = 100;
78   - EXPECT_NO_THROW({ testrpc_client cln1("localhost", 31241, 3.0); });
79   - testrpc_client cln("localhost", 31241, 3.0);
80   - for (int t=0;t<times;t++) {
81   - {
82   - string v;
83   - for (int i=0;i<10;i++)
84   - v+='0'+(rand()%10);
85   - string r;
86   - EXPECT_NO_THROW({ r = cln.call_test_str(v); });
87   - EXPECT_EQ(r.size(), 10U);
88   - for (int i=0;i<10;i++)
89   - EXPECT_EQ(r[i], v[i]);
90   - }
91   - {
92   - string vs;
93   - for (int i=0;i<10;i++)
94   - vs+='0'+(rand()%10);
95   - pair<int, string> v = make_pair(rand(), vs);
96   - pair<int, string> r;
97   - EXPECT_NO_THROW({ r = cln.call_test_pair(v); });
98   - EXPECT_EQ(r.first, v.first);
99   - EXPECT_EQ(r.second, v.second);
100   - }
101   - {
102   - vector<int> v;
103   - for (int i=0;i<10;i++)
104   - v.push_back(rand());
105   - vector<int> r;
106   - EXPECT_NO_THROW({ r = cln.call_test_vec(v); });
107   - EXPECT_EQ(r.size(), 10U);
108   - for (int i=0;i<10;i++)
109   - EXPECT_EQ(r[i], v[i]);
110   - }
111   - {
112   - map<int, int> v;
113   - for (int i=0;i<10;i++)
114   - v[i]=rand();
115   - map<int, int> r;
116   - EXPECT_NO_THROW({ r = cln.call_test_map(v); });
117   - EXPECT_EQ(r.size(), 10U);
118   - for (int i=0;i<10;i++){
119   - EXPECT_EQ(r[i], v[i]);
  90 + EXPECT_NO_THROW({ testrpc_client cln1(kLocalhost, kTestRPCPort, kClientTimeout); });
  91 + {
  92 + testrpc_client cln(kLocalhost, kTestRPCPort, kClientTimeout);
  93 + for (int t=0;t<times;t++) {
  94 + {
  95 + string v;
  96 + for (int i=0;i<10;i++)
  97 + v+='0'+(rand()%10);
  98 + string r;
  99 + EXPECT_NO_THROW({ r = cln.call_test_str(v); });
  100 + EXPECT_EQ(r.size(), 10U);
  101 + for (int i=0;i<10;i++)
  102 + EXPECT_EQ(r[i], v[i]);
120 103 }
121   - }
122   - {
123   - set<int> v;
124   - for (int i=0;i<10;i++)
125   - v.insert(i*100);
126   - set<int> r;
127   - EXPECT_NO_THROW({ r = cln.call_test_set(v); });
128   - EXPECT_EQ(r.size(), 10U);
129   - int cnt = 0;
130   - for (set<int>::iterator it=v.begin();it!=v.end();++it){
131   - EXPECT_EQ(*it, cnt++ * 100);
  104 + {
  105 + string vs;
  106 + for (int i=0;i<10;i++)
  107 + vs+='0'+(rand()%10);
  108 + pair<int, string> v = make_pair(rand(), vs);
  109 + pair<int, string> r;
  110 + EXPECT_NO_THROW({ r = cln.call_test_pair(v); });
  111 + EXPECT_EQ(r.first, v.first);
  112 + EXPECT_EQ(r.second, v.second);
  113 + }
  114 + {
  115 + vector<int> v;
  116 + for (int i=0;i<10;i++)
  117 + v.push_back(rand());
  118 + vector<int> r;
  119 + EXPECT_NO_THROW({ r = cln.call_test_vec(v); });
  120 + EXPECT_EQ(r.size(), 10U);
  121 + for (int i=0;i<10;i++)
  122 + EXPECT_EQ(r[i], v[i]);
  123 + }
  124 + {
  125 + map<int, int> v;
  126 + for (int i=0;i<10;i++)
  127 + v[i]=rand();
  128 + map<int, int> r;
  129 + EXPECT_NO_THROW({ r = cln.call_test_map(v); });
  130 + EXPECT_EQ(r.size(), 10U);
  131 + for (int i=0;i<10;i++){
  132 + EXPECT_EQ(r[i], v[i]);
  133 + }
  134 + }
  135 + {
  136 + set<int> v;
  137 + for (int i=0;i<10;i++)
  138 + v.insert(i*100);
  139 + set<int> r;
  140 + EXPECT_NO_THROW({ r = cln.call_test_set(v); });
  141 + EXPECT_EQ(r.size(), 10U);
  142 + int cnt = 0;
  143 + for (set<int>::iterator it=v.begin();it!=v.end();++it){
  144 + EXPECT_EQ(*it, cnt++ * 100);
  145 + }
132 146 }
133 147 }
134 148 }
  149 +
  150 + ser.stop();
  151 + t.join();
  152 +}
  153 +
  154 +TEST(mprpc, mprpc_server_timeout_test)
  155 +{
  156 + testrpc_server ser(kTestTimeout);
  157 + thread t(pfi::lang::bind(&server_thread, &ser));
  158 + t.start();
  159 +
  160 + sleep(1);
  161 +
  162 + // connect server but server disconnect by timeout
  163 + pfi::network::mprpc::socket sock;
  164 + ASSERT_TRUE(sock.connect(kLocalhost, kTestRPCPort));
  165 +
  166 + clock_time start_time = get_clock_time();
  167 + int res;
  168 + EXPECT_EQ(0, ::read(sock.get(), &res, sizeof(res)));
  169 + EXPECT_GT((get_clock_time() - start_time), kTestTimeout);
  170 +
  171 + ser.stop();
  172 + t.join();
  173 +}
  174 +
  175 +namespace {
  176 +void timeout_server_thread(pfi::network::mprpc::socket *server_socket)
  177 +{
  178 + sleep(1);
  179 +
  180 + ::accept(server_socket->get(), NULL, NULL);
  181 + sleep(1 + kTestTimeout);
  182 +
  183 + // wait for socket shutdown listened socket
  184 + ::accept(server_socket->get(), NULL, NULL);
  185 +}
  186 +} // namespace
  187 +
  188 +TEST(mprpc, mprpc_client_timeout_test)
  189 +{
  190 + pfi::network::mprpc::socket server_socket;
  191 + server_socket.listen(kTestRPCPort);
  192 + thread t(pfi::lang::bind(&timeout_server_thread, &server_socket));
  193 + t.start();
  194 +
  195 + sleep(1);
  196 +
  197 + EXPECT_NO_THROW({ testrpc_client cln1(kLocalhost, kTestRPCPort, kTestTimeout); });
  198 +
  199 + { // connect server but client disconnect by timeout
  200 + testrpc_client cln(kLocalhost, kTestRPCPort, kTestTimeout);
  201 + string v, r;
  202 + EXPECT_THROW({ r = cln.call_test_str(v); }, pfi::network::mprpc::rpc_timeout_error);
  203 + }
  204 +
  205 + server_socket.close();
  206 + t.join();
135 207 }
136 208
137 209 // test for struct and empty vector
@@ -145,25 +217,31 @@ struct tstruct {
145 217 MPRPC_PROC(test_struct, tstruct(tstruct));
146 218 MPRPC_GEN(1, test_struct_rpc, test_struct);
147 219 static tstruct f_test_struct(tstruct t) { return t; }
148   -static void struct_server_thread()
  220 +static void struct_server_thread(test_struct_rpc_server *ser)
149 221 {
150   - test_struct_rpc_server ser(3.0);
151   - ser.set_test_struct(&f_test_struct);
152   - ser.serv(31242, 10);
  222 + ser->set_test_struct(&f_test_struct);
  223 + ser->serv(kTestStructRPCPort, kServThreads);
153 224 }
  225 +
154 226 TEST(mprpc, test_struct)
155 227 {
156   - thread t(&struct_server_thread);
  228 + test_struct_rpc_server ser(kServerTimeout);
  229 + thread t(pfi::lang::bind(&struct_server_thread, &ser));
157 230 t.start();
158 231
159 232 sleep(1);
160 233
161   - tstruct t1;
162   - t1.i = 9;
163   - EXPECT_NO_THROW({ test_struct_rpc_client cln1("localhost", 31242, 3.0); });
164   - test_struct_rpc_client cln("localhost", 31242, 3.0);
  234 + {
  235 + tstruct t1;
  236 + t1.i = 9;
  237 + EXPECT_NO_THROW({ test_struct_rpc_client cln1(kLocalhost, kTestStructRPCPort, kClientTimeout); });
  238 + test_struct_rpc_client cln(kLocalhost, kTestStructRPCPort, kClientTimeout);
  239 +
  240 + tstruct t2;
  241 + EXPECT_NO_THROW({ t2 = cln.call_test_struct(t1); });
  242 + EXPECT_EQ(t1.i, t2.i);
  243 + }
165 244
166   - tstruct t2;
167   - EXPECT_NO_THROW({ t2 = cln.call_test_struct(t1); });
168   - EXPECT_EQ(t1.i, t2.i);
  245 + ser.stop();
  246 + t.join();
169 247 }

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.