Skip to content

Commit

Permalink
Super basic put to master works!
Browse files Browse the repository at this point in the history
  • Loading branch information
irbowen committed Apr 16, 2017
1 parent cb84eca commit e169d95
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 17 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ DEBUG = -g
RELEASE = -O3
VERSION = $(RELEASE)

CXXFLAGS = -std=c++14 $(VERSION) -pedantic
CXXFLAGS = -std=c++14 $(VERSION) -pedantic -pthread

BOTH_LIB := objs/network.o objs/message.o
REPLICA_LIB := objs/paxos_main.o objs/replica.o objs/acceptor.o objs/learner.o objs/proposer.o objs/kv_store.o $(BOTH_LIB)
Expand Down
6 changes: 3 additions & 3 deletions headers/shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ class Shard {
void run();
/* These functions assume that this shard has the key that you want */
void register_msg(Message* message);
Message* handle_get(std::string key);
Message* handle_put(std::string key, std::string value);
Message* handle_delete(std::string key);
Message* handle_get(std::string key, node sender);
Message* handle_put(std::string key, std::string value, node sender);
Message* handle_delete(std::string key, node sender);
};

#endif
11 changes: 8 additions & 3 deletions src/client_lib.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ string client_lib::get(string key) {
msg.receivers.push_back(node(master_port_, master_host_));
while (true) {
net_.sendto(&msg);
cout << "Sent: " << msg.serialize() << endl;
Message *reply = net_.recv_from();
cout << "Recv: " << reply->serialize() << endl;
if (reply != nullptr && reply->msg_type == MessageType::MASTER_ACK) {
string val = reply->value;
delete(reply);
Expand All @@ -39,10 +41,12 @@ void client_lib::put(string key, string value) {
msg.receivers.push_back(node(master_port_, master_host_));
while (true) {
net_.sendto(&msg);
cout << "Sent: " << msg.serialize() << endl;
Message *reply = net_.recv_from();
cout << "Recv: " << reply->serialize() << endl;
if (reply != nullptr && reply->msg_type == MessageType::MASTER_ACK) {
string val = reply->value;
delete(reply);
return;
}
}
}
Expand All @@ -55,10 +59,11 @@ void client_lib::delete_key(std::string key) {
msg.receivers.push_back(node(master_port_, master_host_));
while (true) {
net_.sendto(&msg);
cout << "Sent: " << msg.serialize() << endl;
Message *reply = net_.recv_from();
cout << "Recv: " << reply->serialize() << endl;
if (reply != nullptr && reply->msg_type == MessageType::MASTER_ACK) {
string val = reply->value;
delete(reply);
return;
}
}
}
3 changes: 2 additions & 1 deletion src/kv_client_inter_1.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ int main(){
client_lib c_lib(7000, "127.0.0.1", 5000, "127.0.0.1");

string str;
cout << "put, get, or delete?\n";
while (getline(cin, str)) {
cout << "put, get, or delete?\n";
if (str == "put") {
string key, value;
cout << "Key?:" << endl;
Expand All @@ -35,6 +35,7 @@ int main(){
else if (str == "break") {
break;
}
cout << "put, get, or delete?\n";
}

return 0;
Expand Down
2 changes: 2 additions & 0 deletions src/master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ Master::Master(int port, string host, string master_config_file)
string file_name;
while (config_fs >> file_name) {
Shard* s = new Shard(port_ + counter, host_, file_name);
thread t(&Shard::run, s);
t.detach();
shards_.push_back(s);
counter++;
}
Expand Down
32 changes: 23 additions & 9 deletions src/shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
using namespace std;

Shard::Shard(int port, std::string host, std::string config_filename)
: config_filename_(config_filename), net_(port, host)
: port_(port), host_(host), config_filename_(config_filename), net_(port, host)
{
string h, p, rep_id;
ifstream config_fs(config_filename);
cout << "Shard is online with these replicas: ";
cout << "Shard(" << port << ", " << host << ") is online with these replicas: ";
while (config_fs >> h >> p >> rep_id) {
replicas_.push_back(node(stoi(p), h));
cout << " " << host << ":" << p << ", ";
Expand All @@ -30,19 +30,26 @@ void Shard::run() {
case MessageType::NO_ACTION:
break;
case MessageType::GET: {
reply = handle_get(next_msg->key);
reply = handle_get(next_msg->key, next_msg->sender);
break;
}
case MessageType::PUT: {
reply = handle_put(next_msg->key, next_msg->value);
reply = handle_put(next_msg->key, next_msg->value, next_msg->sender);
break;
}
case MessageType::DELETE: {
reply = handle_delete(next_msg->key);
reply = handle_delete(next_msg->key, next_msg->sender);
break;
}
break;
}
if (reply != nullptr && reply->msg_type != MessageType::NO_ACTION) {
reply->sender = node(port_, host_);
cout << "[SHARD] - Sending reply: " << reply->serialize() << endl;
net_.sendto(reply);
}
delete(next_msg);
delete(reply);
}
}

Expand All @@ -52,7 +59,7 @@ void Shard::register_msg(Message* message) {
cv.notify_one();
}

Message* Shard::handle_get(string key) {
Message* Shard::handle_get(string key, node sender) {
Message msg;
msg.seq_num = client_seq_num_;
msg.msg_type = MessageType::GET;
Expand All @@ -71,13 +78,16 @@ Message* Shard::handle_get(string key) {
cout << "THIS IS A GET ACK{{" << val << "}}" << endl;
client_seq_num_++;
// Send the same message we just got back to the client
reply->msg_type = MessageType::MASTER_ACK;
reply->receivers.clear();
reply->receivers.push_back(sender);
return reply;
}
cur_view_num_ += 1;
}
}

Message* Shard::handle_put(string key, string value) {
Message* Shard::handle_put(string key, string value, node sender) {
Message msg;
msg.seq_num = client_seq_num_;
msg.msg_type = MessageType::PUT;
Expand All @@ -99,13 +109,17 @@ Message* Shard::handle_put(string key, string value) {
<< " " << reply->get_value() << "}}" << endl;
client_seq_num_++;
// Send the same message we just got back to the client
reply->msg_type = MessageType::MASTER_ACK;
reply->receivers.clear();
reply->receivers.push_back(sender);
cout << "RETURNING THIS: " << reply->serialize() << endl;
return reply;
}
cur_view_num_ += 1;
}
}

Message* Shard::handle_delete(string key) {
return handle_put(key, "ERROR: Key not found");
Message* Shard::handle_delete(string key, node sender) {
return handle_put(key, "ERROR: Key not found", sender);
}

0 comments on commit e169d95

Please sign in to comment.