Skip to content

Commit

Permalink
gossip: Relax failure detector update
Browse files Browse the repository at this point in the history
We currently only update the failure detector for a node when a higher
version of application state is received. Since gossip syn messages do
not contain application state, so this means we do not update the
failure detector upon receiving gossip syn messages, even if a message
from peer node is received which implies the peer node is alive.

This patch relaxes the failure detector update rule to update the
failure detector for the sender of gossip messages directly.

Refs #8296

Closes #8476
  • Loading branch information
asias authored and psarna committed Apr 14, 2021
1 parent 320f6bf commit 9ea57df
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 8 deletions.
15 changes: 11 additions & 4 deletions gms/gossiper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ future<> gossiper::handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg) {
return make_ready_future<>();
}

fd().report(from.addr);

syn_msg_pending& p = _syn_handlers[from.addr];
if (p.pending) {
// The latest syn message from peer has the latest infomation, so
Expand Down Expand Up @@ -303,6 +305,8 @@ future<> gossiper::handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg) {
return make_ready_future<>();
}

fd().report(id.addr);

msg_proc_guard mp(*this);

auto g_digest_list = ack_msg.get_gossip_digest_list();
Expand Down Expand Up @@ -394,12 +398,14 @@ future<> gossiper::do_send_ack2_msg(msg_addr from, utils::chunked_vector<gossip_
// - on_restart callbacks
// - on_join callbacks
// - on_alive callbacks
future<> gossiper::handle_ack2_msg(gossip_digest_ack2 msg) {
future<> gossiper::handle_ack2_msg(msg_addr from, gossip_digest_ack2 msg) {
logger.trace("handle_ack2_msg():msg={}", msg);
if (!is_enabled()) {
return make_ready_future<>();
}

fd().report(from.addr);

msg_proc_guard mp(*this);

auto& remote_ep_state_map = msg.get_endpoint_state_map();
Expand Down Expand Up @@ -496,10 +502,11 @@ future<> gossiper::init_messaging_service_handler(bind_messaging_port do_bind) {
});
return messaging_service::no_wait();
});
_messaging.register_gossip_digest_ack2([] (gossip_digest_ack2 msg) {
_messaging.register_gossip_digest_ack2([] (const rpc::client_info& cinfo, gossip_digest_ack2 msg) {
auto from = netw::messaging_service::get_source(cinfo);
// In a new fiber.
(void)smp::submit_to(0, [msg = std::move(msg)] () mutable {
return gms::get_local_gossiper().handle_ack2_msg(std::move(msg));
(void)smp::submit_to(0, [from, msg = std::move(msg)] () mutable {
return gms::get_local_gossiper().handle_ack2_msg(from, std::move(msg));
}).handle_exception([] (auto ep) {
logger.warn("Fail to handle GOSSIP_DIGEST_ACK2: {}", ep);
});
Expand Down
2 changes: 1 addition & 1 deletion gms/gossiper.hh
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private:
future<> uninit_messaging_service_handler();
future<> handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg);
future<> handle_ack_msg(msg_addr from, gossip_digest_ack ack_msg);
future<> handle_ack2_msg(gossip_digest_ack2 msg);
future<> handle_ack2_msg(msg_addr from, gossip_digest_ack2 msg);
future<> handle_echo_msg(inet_address from, std::optional<int64_t> generation_number_opt);
future<> handle_shutdown_msg(inet_address from);
future<> do_send_ack_msg(msg_addr from, gossip_digest_syn syn_msg);
Expand Down
2 changes: 1 addition & 1 deletion message/messaging_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ future<> messaging_service::send_gossip_digest_ack(msg_addr id, gossip_digest_ac
}

// gossip ack2
void messaging_service::register_gossip_digest_ack2(std::function<rpc::no_wait_type (gossip_digest_ack2)>&& func) {
void messaging_service::register_gossip_digest_ack2(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, gossip_digest_ack2)>&& func) {
register_handler(this, messaging_verb::GOSSIP_DIGEST_ACK2, std::move(func));
}
future<> messaging_service::unregister_gossip_digest_ack2() {
Expand Down
2 changes: 1 addition & 1 deletion message/messaging_service.hh
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ public:
future<> send_gossip_digest_ack(msg_addr id, gms::gossip_digest_ack msg);

// Wrapper for GOSSIP_DIGEST_ACK2
void register_gossip_digest_ack2(std::function<rpc::no_wait_type (gms::gossip_digest_ack2)>&& func);
void register_gossip_digest_ack2(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, gms::gossip_digest_ack2)>&& func);
future<> unregister_gossip_digest_ack2();
future<> send_gossip_digest_ack2(msg_addr id, gms::gossip_digest_ack2 msg);

Expand Down
2 changes: 1 addition & 1 deletion test/manual/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class tester {
return messaging_service::no_wait();
});

ms.register_gossip_digest_ack2([] (gms::gossip_digest_ack2 msg) {
ms.register_gossip_digest_ack2([] (const rpc::client_info& cinfo, gms::gossip_digest_ack2 msg) {
fmt::print("Server got ack2 msg = {}\n", msg);
return messaging_service::no_wait();
});
Expand Down

0 comments on commit 9ea57df

Please sign in to comment.