Skip to content

Commit

Permalink
Fix race condition in condition variable notify/wait.
Browse files Browse the repository at this point in the history
  • Loading branch information
kishorenc committed Feb 15, 2021
1 parent b59b01a commit a1be5a2
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 13 deletions.
12 changes: 6 additions & 6 deletions include/http_data.h
Expand Up @@ -45,10 +45,11 @@ class await_t {
await_t(): ready(false) {}

void notify() {
{
std::lock_guard<std::mutex> lk(mcv);
ready = true;
}
// Ideally we don't need lock over notify but it is needed here because
// the parent object could be deleted after lock on mutex is released but
// before notify can be called on condition variable.
std::lock_guard<std::mutex> lk(mcv);
ready = true;
cv.notify_all();
}

Expand All @@ -63,7 +64,7 @@ struct http_res {
uint32_t status_code;
std::string content_type_header;
std::string body;
bool final;
std::atomic<bool> final;

// fulfilled by an async response handler to pass control back for further writes
// use `mark_proceed` and `wait_proceed` instead of accessing this directly
Expand Down Expand Up @@ -429,5 +430,4 @@ struct http_message_dispatcher {
struct AsyncIndexArg {
http_req* req;
http_res* res;
await_t* await;
};
2 changes: 1 addition & 1 deletion include/http_server.h
Expand Up @@ -58,7 +58,7 @@ class HttpServer {

ReplicationState* replication_state;

bool exit_loop = false;
std::atomic<bool> exit_loop;

std::string version;

Expand Down
2 changes: 1 addition & 1 deletion src/auth_manager.cpp
Expand Up @@ -163,7 +163,7 @@ bool AuthManager::authenticate(const std::string& req_api_key, const std::string
return true;
}

LOG(INFO) << "api_keys.size() = " << api_keys.size();
//LOG(INFO) << "api_keys.size() = " << api_keys.size();

if(api_keys.count(req_api_key) == 0) {
return false;
Expand Down
7 changes: 5 additions & 2 deletions src/http_server.cpp
Expand Up @@ -572,9 +572,10 @@ void HttpServer::defer_processing(http_req& req, http_res& res, size_t timeout_m
auto deferred_req_res = new deferred_req_res_t{&req, &res, this};
req.defer_timer.data = deferred_req_res;
h2o_timer_init(&req.defer_timer.timer, on_deferred_process_request);
} else {
h2o_timer_unlink(&req.defer_timer.timer);
}

h2o_timer_unlink(&req.defer_timer.timer);
h2o_timer_link(ctx.loop, timeout_ms, &req.defer_timer.timer);

//LOG(INFO) << "defer_processing, exit_loop: " << exit_loop << ", res.await: " << res.await;
Expand Down Expand Up @@ -629,8 +630,10 @@ void HttpServer::response_abort(h2o_generator_t *generator, h2o_req_t *req) {

// returns control back to caller (raft replication or follower forward)
LOG(INFO) << "response_abort: fulfilling req & res proceed.";
custom_generator->response->await.notify();

// order is important!
custom_generator->request->await.notify();
custom_generator->response->await.notify();
}

void HttpServer::response_proceed(h2o_generator_t *generator, h2o_req_t *req) {
Expand Down
19 changes: 16 additions & 3 deletions src/raft_server.cpp
Expand Up @@ -158,7 +158,7 @@ void ReplicationState::write_to_leader(http_req *request, http_res *response) co
}

response->set_500("Could not find a leader.");
auto replication_arg = new AsyncIndexArg{request, response, nullptr};
auto replication_arg = new AsyncIndexArg{request, response};
replication_arg->req->route_hash = static_cast<uint64_t>(ROUTE_CODES::ALREADY_HANDLED);
return message_dispatcher->send_message(REPLICATION_MSG, replication_arg);
}
Expand Down Expand Up @@ -233,7 +233,7 @@ void ReplicationState::write_to_leader(http_req *request, http_res *response) co
response->set_500(err);
}

auto replication_arg = new AsyncIndexArg{request, response, nullptr};
auto replication_arg = new AsyncIndexArg{request, response};
replication_arg->req->route_hash = static_cast<uint64_t>(ROUTE_CODES::ALREADY_HANDLED);
message_dispatcher->send_message(REPLICATION_MSG, replication_arg);
});
Expand Down Expand Up @@ -291,7 +291,7 @@ void ReplicationState::on_apply(braft::Iterator& iter) {
// Call http server thread for write and response back to client (if `response` is NOT null)
// We use a future to block current thread until the async flow finishes
response->auto_dispose = false;
auto replication_arg = new AsyncIndexArg{request, response, nullptr};
auto replication_arg = new AsyncIndexArg{request, response};
message_dispatcher->send_message(REPLICATION_MSG, replication_arg);

//LOG(INFO) << "Raft write waiting to proceed";
Expand Down Expand Up @@ -424,6 +424,19 @@ void ReplicationState::refresh_nodes(const std::string & nodes) {
braft::Configuration new_conf;
new_conf.parse_from(nodes);

braft::NodeStatus nodeStatus;
node->get_status(&nodeStatus);

LOG(INFO) << "Term: " << nodeStatus.term
<< ", last_index index: " << nodeStatus.last_index
<< ", committed_index: " << nodeStatus.committed_index
<< ", known_applied_index: " << nodeStatus.known_applied_index
<< ", applying_index: " << nodeStatus.applying_index
<< ", pending_index: " << nodeStatus.pending_index
<< ", disk_index: " << nodeStatus.disk_index
<< ", pending_queue_size: " << nodeStatus.pending_queue_size;


if(node->is_leader()) {
RefreshNodesClosure* refresh_nodes_done = new RefreshNodesClosure;
node->change_peers(new_conf, refresh_nodes_done);
Expand Down

0 comments on commit a1be5a2

Please sign in to comment.