Skip to content

Commit

Permalink
Merge 'raft: do not ping self in direct failure detector' from Konsta…
Browse files Browse the repository at this point in the history
…ntin Osipov

Avoid pinging self in direct failure detector, this adds confusing noise and adds constant overhead.
Fixes #14388

Closes #14558

* github.com:scylladb/scylladb:
  direct_fd: do not ping self
  raft: initialize raft_group_registry with host id early
  raft: code cleanup
  • Loading branch information
kbr-scylla committed Jul 7, 2023
2 parents 799ae97 + ff41ea8 commit f9cfd7e
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 45 deletions.
7 changes: 3 additions & 4 deletions main.cc
Expand Up @@ -1206,7 +1206,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
fd.stop().get();
});

raft_gr.start(cfg->consistent_cluster_management(),
raft_gr.start(cfg->consistent_cluster_management(), raft::server_id{cfg->host_id.id},
std::ref(raft_address_map), std::ref(messaging), std::ref(gossiper), std::ref(fd)).get();

// group0 client exists only on shard 0.
Expand Down Expand Up @@ -1317,10 +1317,9 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
startlog.error("Bad configuration: consistent_cluster_management requires schema commit log to be enabled");
throw bad_configuration_error();
}
auto my_raft_id = raft::server_id{cfg->host_id.uuid()};
supervisor::notify("starting Raft Group Registry service");
raft_gr.invoke_on_all([my_raft_id] (service::raft_group_registry& raft_gr) {
return raft_gr.start(my_raft_id);
raft_gr.invoke_on_all([] (service::raft_group_registry& raft_gr) {
return raft_gr.start();
}).get();
} else {
if (cfg->check_experimental(db::experimental_features_t::feature::BROADCAST_TABLES)) {
Expand Down
5 changes: 4 additions & 1 deletion service/raft/raft_group0.cc
Expand Up @@ -112,7 +112,10 @@ class group0_rpc: public service::raft_rpc {
_address_map.set_nonexpiring(addr.id);
// Notify the direct failure detector that it should track
// (or liveness of a specific raft server id.
_direct_fd.add_endpoint(addr.id.id);
if (addr != _my_id) {
// No need to ping self to know it's alive
_direct_fd.add_endpoint(addr.id.id);
}
}
for (const auto& addr: del) {
// RPC 'send' may yield before resolving IP address,
Expand Down
37 changes: 19 additions & 18 deletions service/raft/raft_group_registry.cc
Expand Up @@ -27,8 +27,14 @@ logging::logger rslog("raft_group_registry");

class direct_fd_proxy : public raft::failure_detector, public direct_failure_detector::listener {
std::unordered_set<raft::server_id> _alive_set;
raft::server_id _my_id;

public:
direct_fd_proxy(raft::server_id my_id)
: _my_id(my_id)
{
}

future<> mark_alive(direct_failure_detector::pinger::endpoint_id id) override {
static const auto msg = "marking Raft server {} as alive for raft groups";

Expand Down Expand Up @@ -63,7 +69,7 @@ class direct_fd_proxy : public raft::failure_detector, public direct_failure_det
}

bool is_alive(raft::server_id srv) override {
return _alive_set.contains(srv);
return srv == _my_id || _alive_set.contains(srv);
}
};
// }}} direct_fd_proxy
Expand Down Expand Up @@ -135,15 +141,18 @@ class gossiper_state_change_subscriber_proxy: public gms::i_endpoint_state_chang

// }}} gossiper_state_change_subscriber_proxy

raft_group_registry::raft_group_registry(bool is_enabled, raft_address_map& address_map,
raft_group_registry::raft_group_registry(bool is_enabled,
raft::server_id my_id,
raft_address_map& address_map,
netw::messaging_service& ms, gms::gossiper& gossiper, direct_failure_detector::failure_detector& fd)
: _is_enabled(is_enabled)
, _ms(ms)
, _gossiper(gossiper)
, _gossiper_proxy(make_shared<gossiper_state_change_subscriber_proxy>(address_map))
, _address_map{address_map}
, _direct_fd(fd)
, _direct_fd_proxy(make_shared<direct_fd_proxy>())
, _direct_fd_proxy(make_shared<direct_fd_proxy>(my_id))
, _my_id(my_id)
{
}

Expand All @@ -152,13 +161,12 @@ void raft_group_registry::init_rpc_verbs() {
const rpc::client_info& cinfo,
const raft::group_id& gid, raft::server_id from, raft::server_id dst, auto handler) {
constexpr bool is_one_way = std::is_void_v<std::invoke_result_t<decltype(handler), raft_rpc&>>;
const auto& my_id = get_my_raft_id();
if (my_id != dst) {
if (_my_id != dst) {
if constexpr (is_one_way) {
rslog.debug("Got message for server {}, but my id is {}", dst, my_id);
rslog.debug("Got message for server {}, but my id is {}", dst, _my_id);
return make_ready_future<rpc::no_wait_type>(netw::messaging_service::no_wait());
} else {
throw raft_destination_id_not_correct{*_my_id, dst};
throw raft_destination_id_not_correct{_my_id, dst};
}
}

Expand Down Expand Up @@ -270,11 +278,10 @@ void raft_group_registry::init_rpc_verbs() {
[this] (const rpc::client_info&, raft::server_id dst) -> future<direct_fd_ping_reply> {
// XXX: update address map here as well?

const raft::server_id& my_id = get_my_raft_id();
if (my_id != dst) {
if (_my_id != dst) {
co_return direct_fd_ping_reply {
.result = wrong_destination {
.reached_id = my_id,
.reached_id = _my_id,
},
};
}
Expand Down Expand Up @@ -321,11 +328,8 @@ future<> raft_group_registry::stop_servers() noexcept {
co_await g.close();
}

seastar::future<> raft_group_registry::start(raft::server_id my_id) {
seastar::future<> raft_group_registry::start() {
assert(_is_enabled);
assert(!_my_id);

_my_id = my_id;

_gossiper.register_(_gossiper_proxy);

Expand All @@ -339,10 +343,7 @@ seastar::future<> raft_group_registry::start(raft::server_id my_id) {
}

const raft::server_id& raft_group_registry::get_my_raft_id() {
if (!_my_id) {
on_internal_error(rslog, "get_my_raft_id(): Raft ID not initialized");
}
return *_my_id;
return _my_id;
}

seastar::future<> raft_group_registry::stop() {
Expand Down
14 changes: 4 additions & 10 deletions service/raft/raft_group_registry.hh
Expand Up @@ -97,23 +97,17 @@ private:
std::optional<raft::group_id> _group0_id;

// My Raft ID. Shared between different Raft groups.
// Once set, must not be changed.
//
// FIXME: ideally we'd like this to be passed to the constructor.
// However storage_proxy/query_processor/system_keyspace are unavailable
// when we start raft_group_registry so we have to set it later,
// after system_keyspace is initialized.
std::optional<raft::server_id> _my_id;
raft::server_id _my_id;

public:
// `is_enabled` must be `true` iff the local RAFT feature is enabled.
raft_group_registry(bool is_enabled, raft_address_map&,
raft_group_registry(bool is_enabled, raft::server_id my_id, raft_address_map&,
netw::messaging_service& ms, gms::gossiper& gs, direct_failure_detector::failure_detector& fd);
~raft_group_registry();

// If is_enabled(),
// Called manually at start on every shard, after system_keyspace is initialized.
seastar::future<> start(raft::server_id my_id);
// Called manually at start on every shard.
seastar::future<> start();
// Called by sharded<>::stop()
seastar::future<> stop();

Expand Down
12 changes: 6 additions & 6 deletions service/raft/raft_rpc.cc
Expand Up @@ -27,8 +27,8 @@ raft_ticker_type::time_point timeout() {
}

raft_rpc::raft_rpc(raft_state_machine& sm, netw::messaging_service& ms,
raft_address_map& address_map, raft::group_id gid, raft::server_id srv_id)
: _sm(sm), _group_id(std::move(gid)), _server_id(srv_id), _messaging(ms)
raft_address_map& address_map, raft::group_id gid, raft::server_id my_id)
: _sm(sm), _group_id(std::move(gid)), _my_id(my_id), _messaging(ms)
, _address_map(address_map)
{}

Expand All @@ -43,7 +43,7 @@ raft_rpc::one_way_rpc(sloc loc, raft::server_id id,
loc.file_name(), loc.line(), loc.function_name(), id);
return make_ready_future<>();
}
return verb(&_messaging, netw::msg_addr(*ip_addr), timeout(), _group_id, _server_id, id, std::forward<Msg>(msg))
return verb(&_messaging, netw::msg_addr(*ip_addr), timeout(), _group_id, _my_id, id, std::forward<Msg>(msg))
.handle_exception([loc = std::move(loc), id] (std::exception_ptr ex) {
try {
std::rethrow_exception(ex);
Expand All @@ -61,13 +61,13 @@ auto
raft_rpc::two_way_rpc(sloc loc, raft::server_id id,
Verb&& verb, Args&&... args) {
auto ip_addr = _address_map.find(id);
using Fut = decltype(verb(&_messaging, netw::msg_addr(*ip_addr), db::no_timeout, _group_id, _server_id, id, std::forward<Args>(args)...));
using Fut = decltype(verb(&_messaging, netw::msg_addr(*ip_addr), db::no_timeout, _group_id, _my_id, id, std::forward<Args>(args)...));
using Ret = typename Fut::value_type;
if (!ip_addr) {
const auto msg = format("Failed to send {} {}: ip address not found", loc.function_name(), id);
return make_exception_future<Ret>(raft::transport_error(msg));
}
return verb(&_messaging, netw::msg_addr(*ip_addr), db::no_timeout, _group_id, _server_id, id, std::forward<Args>(args)...)
return verb(&_messaging, netw::msg_addr(*ip_addr), db::no_timeout, _group_id, _my_id, id, std::forward<Args>(args)...)
.handle_exception_type([loc= std::move(loc), id] (const seastar::rpc::closed_error& e) {;
const auto msg = format("Failed to execute {} on leader {}: {}", loc.function_name(), id, e);
rlogger.trace(std::string_view(msg));
Expand All @@ -86,7 +86,7 @@ future<> raft_rpc::send_append_entries(raft::server_id id, const raft::append_re
co_await coroutine::return_exception_ptr(std::make_exception_ptr(raft::transport_error(msg)));
}
co_return co_await ser::raft_rpc_verbs::send_raft_append_entries(&_messaging, netw::msg_addr(*ip_addr),
db::no_timeout, _group_id, _server_id, id, append_request);
db::no_timeout, _group_id, _my_id, id, append_request);
}

void raft_rpc::send_append_entries_reply(raft::server_id id, const raft::append_reply& reply) {
Expand Down
4 changes: 2 additions & 2 deletions service/raft/raft_rpc.hh
Expand Up @@ -25,13 +25,13 @@ class raft_rpc : public raft::rpc {
protected:
raft_state_machine& _sm;
raft::group_id _group_id;
raft::server_id _server_id;
raft::server_id _my_id; // Raft server id of this node.
netw::messaging_service& _messaging;
raft_address_map& _address_map;
seastar::gate _shutdown_gate;

explicit raft_rpc(raft_state_machine& sm, netw::messaging_service& ms,
raft_address_map& address_map, raft::group_id gid, raft::server_id srv_id);
raft_address_map& address_map, raft::group_id gid, raft::server_id my_id);

private:
template <typename Verb, typename Msg> void
Expand Down
9 changes: 5 additions & 4 deletions test/lib/cql_test_env.cc
Expand Up @@ -792,7 +792,9 @@ class single_node_cql_env : public cql_test_env {
});

raft_gr.start(cfg->consistent_cluster_management(),
std::ref(raft_address_map), std::ref(ms), std::ref(gossiper), std::ref(fd)).get();
raft::server_id{cfg->host_id.id},
std::ref(raft_address_map),
std::ref(ms), std::ref(gossiper), std::ref(fd)).get();
auto stop_raft_gr = deferred_stop(raft_gr);

stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(ms), std::ref(mm), std::ref(gossiper), scheduling_groups.streaming_scheduling_group).get();
Expand Down Expand Up @@ -865,9 +867,8 @@ class single_node_cql_env : public cql_test_env {
}).get();

if (raft_gr.local().is_enabled()) {
auto my_raft_id = raft::server_id{cfg->host_id.uuid()};
raft_gr.invoke_on_all([my_raft_id] (service::raft_group_registry& raft_gr) {
return raft_gr.start(my_raft_id);
raft_gr.invoke_on_all([] (service::raft_group_registry& raft_gr) {
return raft_gr.start();
}).get();
}

Expand Down

0 comments on commit f9cfd7e

Please sign in to comment.