diff --git a/main.cc b/main.cc index 61b6d5d0fd38..03bc606cdc49 100644 --- a/main.cc +++ b/main.cc @@ -1206,7 +1206,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl fd.stop().get(); }); - raft_gr.start(cfg->consistent_cluster_management(), + raft_gr.start(cfg->consistent_cluster_management(), raft::server_id{cfg->host_id.id}, std::ref(raft_address_map), std::ref(messaging), std::ref(gossiper), std::ref(fd)).get(); // group0 client exists only on shard 0. @@ -1317,10 +1317,9 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl startlog.error("Bad configuration: consistent_cluster_management requires schema commit log to be enabled"); throw bad_configuration_error(); } - auto my_raft_id = raft::server_id{cfg->host_id.uuid()}; supervisor::notify("starting Raft Group Registry service"); - raft_gr.invoke_on_all([my_raft_id] (service::raft_group_registry& raft_gr) { - return raft_gr.start(my_raft_id); + raft_gr.invoke_on_all([] (service::raft_group_registry& raft_gr) { + return raft_gr.start(); }).get(); } else { if (cfg->check_experimental(db::experimental_features_t::feature::BROADCAST_TABLES)) { diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc index 9fb4f73c6355..b413fde24b96 100644 --- a/service/raft/raft_group0.cc +++ b/service/raft/raft_group0.cc @@ -112,7 +112,10 @@ class group0_rpc: public service::raft_rpc { _address_map.set_nonexpiring(addr.id); // Notify the direct failure detector that it should track // (or liveness of a specific raft server id. - _direct_fd.add_endpoint(addr.id.id); + if (addr != _my_id) { + // No need to ping self to know it's alive + _direct_fd.add_endpoint(addr.id.id); + } } for (const auto& addr: del) { // RPC 'send' may yield before resolving IP address, diff --git a/service/raft/raft_group_registry.cc b/service/raft/raft_group_registry.cc index a9d7f8f34f9f..574082ac2e95 100644 --- a/service/raft/raft_group_registry.cc +++ b/service/raft/raft_group_registry.cc @@ -27,8 +27,14 @@ logging::logger rslog("raft_group_registry"); class direct_fd_proxy : public raft::failure_detector, public direct_failure_detector::listener { std::unordered_set _alive_set; + raft::server_id _my_id; public: + direct_fd_proxy(raft::server_id my_id) + : _my_id(my_id) + { + } + future<> mark_alive(direct_failure_detector::pinger::endpoint_id id) override { static const auto msg = "marking Raft server {} as alive for raft groups"; @@ -63,7 +69,7 @@ class direct_fd_proxy : public raft::failure_detector, public direct_failure_det } bool is_alive(raft::server_id srv) override { - return _alive_set.contains(srv); + return srv == _my_id || _alive_set.contains(srv); } }; // }}} direct_fd_proxy @@ -135,7 +141,9 @@ class gossiper_state_change_subscriber_proxy: public gms::i_endpoint_state_chang // }}} gossiper_state_change_subscriber_proxy -raft_group_registry::raft_group_registry(bool is_enabled, raft_address_map& address_map, +raft_group_registry::raft_group_registry(bool is_enabled, + raft::server_id my_id, + raft_address_map& address_map, netw::messaging_service& ms, gms::gossiper& gossiper, direct_failure_detector::failure_detector& fd) : _is_enabled(is_enabled) , _ms(ms) @@ -143,7 +151,8 @@ raft_group_registry::raft_group_registry(bool is_enabled, raft_address_map& addr , _gossiper_proxy(make_shared(address_map)) , _address_map{address_map} , _direct_fd(fd) - , _direct_fd_proxy(make_shared()) + , _direct_fd_proxy(make_shared(my_id)) + , _my_id(my_id) { } @@ -152,13 +161,12 @@ void raft_group_registry::init_rpc_verbs() { const rpc::client_info& cinfo, const raft::group_id& gid, raft::server_id from, raft::server_id dst, auto handler) { constexpr bool is_one_way = std::is_void_v>; - const auto& my_id = get_my_raft_id(); - if (my_id != dst) { + if (_my_id != dst) { if constexpr (is_one_way) { - rslog.debug("Got message for server {}, but my id is {}", dst, my_id); + rslog.debug("Got message for server {}, but my id is {}", dst, _my_id); return make_ready_future(netw::messaging_service::no_wait()); } else { - throw raft_destination_id_not_correct{*_my_id, dst}; + throw raft_destination_id_not_correct{_my_id, dst}; } } @@ -270,11 +278,10 @@ void raft_group_registry::init_rpc_verbs() { [this] (const rpc::client_info&, raft::server_id dst) -> future { // XXX: update address map here as well? - const raft::server_id& my_id = get_my_raft_id(); - if (my_id != dst) { + if (_my_id != dst) { co_return direct_fd_ping_reply { .result = wrong_destination { - .reached_id = my_id, + .reached_id = _my_id, }, }; } @@ -321,11 +328,8 @@ future<> raft_group_registry::stop_servers() noexcept { co_await g.close(); } -seastar::future<> raft_group_registry::start(raft::server_id my_id) { +seastar::future<> raft_group_registry::start() { assert(_is_enabled); - assert(!_my_id); - - _my_id = my_id; _gossiper.register_(_gossiper_proxy); @@ -339,10 +343,7 @@ seastar::future<> raft_group_registry::start(raft::server_id my_id) { } const raft::server_id& raft_group_registry::get_my_raft_id() { - if (!_my_id) { - on_internal_error(rslog, "get_my_raft_id(): Raft ID not initialized"); - } - return *_my_id; + return _my_id; } seastar::future<> raft_group_registry::stop() { diff --git a/service/raft/raft_group_registry.hh b/service/raft/raft_group_registry.hh index b00a81459b6f..973e73096619 100644 --- a/service/raft/raft_group_registry.hh +++ b/service/raft/raft_group_registry.hh @@ -97,23 +97,17 @@ private: std::optional _group0_id; // My Raft ID. Shared between different Raft groups. - // Once set, must not be changed. - // - // FIXME: ideally we'd like this to be passed to the constructor. - // However storage_proxy/query_processor/system_keyspace are unavailable - // when we start raft_group_registry so we have to set it later, - // after system_keyspace is initialized. - std::optional _my_id; + raft::server_id _my_id; public: // `is_enabled` must be `true` iff the local RAFT feature is enabled. - raft_group_registry(bool is_enabled, raft_address_map&, + raft_group_registry(bool is_enabled, raft::server_id my_id, raft_address_map&, netw::messaging_service& ms, gms::gossiper& gs, direct_failure_detector::failure_detector& fd); ~raft_group_registry(); // If is_enabled(), - // Called manually at start on every shard, after system_keyspace is initialized. - seastar::future<> start(raft::server_id my_id); + // Called manually at start on every shard. + seastar::future<> start(); // Called by sharded<>::stop() seastar::future<> stop(); diff --git a/service/raft/raft_rpc.cc b/service/raft/raft_rpc.cc index 2dc9dfbd1497..52af090694b1 100644 --- a/service/raft/raft_rpc.cc +++ b/service/raft/raft_rpc.cc @@ -27,8 +27,8 @@ raft_ticker_type::time_point timeout() { } raft_rpc::raft_rpc(raft_state_machine& sm, netw::messaging_service& ms, - raft_address_map& address_map, raft::group_id gid, raft::server_id srv_id) - : _sm(sm), _group_id(std::move(gid)), _server_id(srv_id), _messaging(ms) + raft_address_map& address_map, raft::group_id gid, raft::server_id my_id) + : _sm(sm), _group_id(std::move(gid)), _my_id(my_id), _messaging(ms) , _address_map(address_map) {} @@ -43,7 +43,7 @@ raft_rpc::one_way_rpc(sloc loc, raft::server_id id, loc.file_name(), loc.line(), loc.function_name(), id); return make_ready_future<>(); } - return verb(&_messaging, netw::msg_addr(*ip_addr), timeout(), _group_id, _server_id, id, std::forward(msg)) + return verb(&_messaging, netw::msg_addr(*ip_addr), timeout(), _group_id, _my_id, id, std::forward(msg)) .handle_exception([loc = std::move(loc), id] (std::exception_ptr ex) { try { std::rethrow_exception(ex); @@ -61,13 +61,13 @@ auto raft_rpc::two_way_rpc(sloc loc, raft::server_id id, Verb&& verb, Args&&... args) { auto ip_addr = _address_map.find(id); - using Fut = decltype(verb(&_messaging, netw::msg_addr(*ip_addr), db::no_timeout, _group_id, _server_id, id, std::forward(args)...)); + using Fut = decltype(verb(&_messaging, netw::msg_addr(*ip_addr), db::no_timeout, _group_id, _my_id, id, std::forward(args)...)); using Ret = typename Fut::value_type; if (!ip_addr) { const auto msg = format("Failed to send {} {}: ip address not found", loc.function_name(), id); return make_exception_future(raft::transport_error(msg)); } - return verb(&_messaging, netw::msg_addr(*ip_addr), db::no_timeout, _group_id, _server_id, id, std::forward(args)...) + return verb(&_messaging, netw::msg_addr(*ip_addr), db::no_timeout, _group_id, _my_id, id, std::forward(args)...) .handle_exception_type([loc= std::move(loc), id] (const seastar::rpc::closed_error& e) {; const auto msg = format("Failed to execute {} on leader {}: {}", loc.function_name(), id, e); rlogger.trace(std::string_view(msg)); @@ -86,7 +86,7 @@ future<> raft_rpc::send_append_entries(raft::server_id id, const raft::append_re co_await coroutine::return_exception_ptr(std::make_exception_ptr(raft::transport_error(msg))); } co_return co_await ser::raft_rpc_verbs::send_raft_append_entries(&_messaging, netw::msg_addr(*ip_addr), - db::no_timeout, _group_id, _server_id, id, append_request); + db::no_timeout, _group_id, _my_id, id, append_request); } void raft_rpc::send_append_entries_reply(raft::server_id id, const raft::append_reply& reply) { diff --git a/service/raft/raft_rpc.hh b/service/raft/raft_rpc.hh index 9b8e8913bb2b..f84d50393e6d 100644 --- a/service/raft/raft_rpc.hh +++ b/service/raft/raft_rpc.hh @@ -25,13 +25,13 @@ class raft_rpc : public raft::rpc { protected: raft_state_machine& _sm; raft::group_id _group_id; - raft::server_id _server_id; + raft::server_id _my_id; // Raft server id of this node. netw::messaging_service& _messaging; raft_address_map& _address_map; seastar::gate _shutdown_gate; explicit raft_rpc(raft_state_machine& sm, netw::messaging_service& ms, - raft_address_map& address_map, raft::group_id gid, raft::server_id srv_id); + raft_address_map& address_map, raft::group_id gid, raft::server_id my_id); private: template void diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 325a47beee6e..d982945070ff 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -792,7 +792,9 @@ class single_node_cql_env : public cql_test_env { }); raft_gr.start(cfg->consistent_cluster_management(), - std::ref(raft_address_map), std::ref(ms), std::ref(gossiper), std::ref(fd)).get(); + raft::server_id{cfg->host_id.id}, + std::ref(raft_address_map), + std::ref(ms), std::ref(gossiper), std::ref(fd)).get(); auto stop_raft_gr = deferred_stop(raft_gr); stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(ms), std::ref(mm), std::ref(gossiper), scheduling_groups.streaming_scheduling_group).get(); @@ -865,9 +867,8 @@ class single_node_cql_env : public cql_test_env { }).get(); if (raft_gr.local().is_enabled()) { - auto my_raft_id = raft::server_id{cfg->host_id.uuid()}; - raft_gr.invoke_on_all([my_raft_id] (service::raft_group_registry& raft_gr) { - return raft_gr.start(my_raft_id); + raft_gr.invoke_on_all([] (service::raft_group_registry& raft_gr) { + return raft_gr.start(); }).get(); }