-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.thh
132 lines (121 loc) · 4.02 KB
/
client.thh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#include "mpfd.hh"
#include "rpc_msg.hh"
#include "log.hh"
#include "network.hh"
class Paxos_Client : public tamer::tamed_class {
private:
tamer::fd cfd_;
msgpack_fd mpfd_;
Json config_;
int request_timeout_;
int master_;
public:
Paxos_Client(Json config): config_(config),request_timeout_(2),master_(15800) {
}
~Paxos_Client() {
if (cfd_)
cfd_.close();
}
// client functions
tamed void client_init(const char* hostname, int port,
struct in_addr hostip,tamer::event<> done);
tamed void get_master(tamer::event<> ev);
tamed void make_request(Json msg, tamer::event<Json> ev);
tamed void wait_for_notifications(tamer::event<> ev);
};
tamed void Paxos_Client::client_init(const char* hostname, int port,
struct in_addr hostip,tamer::event<> done) {
tvars {
int s = 100;
}
// lookup hostname address
{
in_addr_t a = hostname ? inet_addr(hostname) : htonl(INADDR_LOOPBACK);
if (a != INADDR_NONE)
hostip.s_addr = a;
else {
struct hostent* hp = gethostbyname(hostname);
if (hp == NULL || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
std::cout << "lookup " << hostname << ": " << hstrerror(h_errno) << std::endl;
return;
}
hostip = *((struct in_addr*) hp->h_addr);
}
}
if (cfd_)
cfd_.close();
twait { tamer::tcp_connect(hostip, port, make_event(cfd_)); }
while (!cfd_) {
INFO() << "delaying to connect to " << port << ": " << s;
twait { tamer::at_delay_msec(s,make_event()); }
twait { tamer::tcp_connect(hostip, port, make_event(cfd_)); }
if (s <= 10000)
s *= 2;
}
mpfd_.initialize(cfd_);
done();
}
tamed void Paxos_Client::get_master(tamer::event<> ev)
{
tvars {
RPC_Msg request_master, reply_master;
Json master_info;
struct in_addr hostip;
bool master_known=false;
int to,i;
}
INFO() << "get_master called";
do {
do {
INFO() << "Attempting to contact master " << master_;
mpfd_.clear();
twait { client_init("localhost",master_,hostip,make_event()); }
request_master = RPC_Msg(Json::array(String("get_master")));
twait { mpfd_.call(request_master,with_timeout(request_timeout_,make_event(reply_master.json()),to)); }
if (to != 0) {
INFO() << "retrying get_master";
// select another port to contact
for (i = 0; i < config_.size(); ++i)
if (config_[i][0].as_i() != master_)
master_ = config_[i][0].as_i();
}
} while(to != 0);
// FIXME: validate
INFO() << reply_master.content();
if( reply_master.content()[0].as_s()=="ACK" ) {
master_known = true;
// master_ data already set.
} else if( reply_master.content()[0].as_s()=="NACK" ) {
master_known = false;
if( reply_master.content()[1].as_s()=="NOT_MASTER"
&& reply_master.content()[3].as_i() > 0) { // be sure to check for an actual port being returned
master_ = reply_master.content()[3].as_i();
}
}
} while( !master_known );
INFO() << "Master identified";
ev();
}
tamed void Paxos_Client::make_request(Json msg, tamer::event<Json> ev) {
tvars {
RPC_Msg req,ret;
int done(0);
}
twait { get_master(make_event()); }
if (!mpfd_) {
std::cerr << "!mpfd_" << std::endl;
abort();
}
do {
INFO() << "making request to " << master_;
req = RPC_Msg(Json::array("request",msg));
twait { mpfd_.call(req,with_timeout(request_timeout_,make_event(ret.json()),done)); }
if (done != 0) { // request timed out
INFO () << "client request timed out";
twait { get_master(make_event()); }
}
} while(done != 0);
INFO() << "client finished request: " << ret.content();
swap(*ev.result_pointer(),ret.content());
ev.unblock();
}