diff --git a/api/storage_service.cc b/api/storage_service.cc index 6f3e1c0a3963..f5328be3713a 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -103,7 +103,7 @@ static bool any_of_keyspaces_use_tablets(const http_context& ctx) { locator::host_id validate_host_id(const sstring& param) { auto hoep = locator::host_id_or_endpoint(param, locator::host_id_or_endpoint::param_type::host_id); - return hoep.id; + return hoep.id(); } bool validate_bool(const sstring& param) { diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 1d77fcf47f99..4b30cff35df9 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1511,25 +1511,6 @@ future<> system_keyspace::peers_table_read_fixup() { } } -future> system_keyspace::load_dc_rack_info() { - co_await peers_table_read_fixup(); - - const auto msg = co_await execute_cql(format("SELECT peer, data_center, rack from system.{}", PEERS)); - - std::unordered_map ret; - for (const auto& row : *msg) { - if (!row.has("data_center") || !row.has("rack")) { - continue; - } - ret.emplace(row.get_as("peer"), locator::endpoint_dc_rack { - row.get_as("data_center"), - row.get_as("rack") - }); - } - - co_return ret; -} - future<> system_keyspace::build_bootstrap_info() { sstring req = format("SELECT bootstrapped FROM system.{} WHERE key = ? ", LOCAL); return execute_cql(req, sstring(LOCAL)).then([this] (auto msg) { @@ -1810,6 +1791,50 @@ future> system_keyspace: co_return ret; } +future> system_keyspace::load_endpoint_state() { + co_await peers_table_read_fixup(); + + const auto msg = co_await execute_cql(format("SELECT peer, host_id, tokens, data_center, rack from system.{}", PEERS)); + + std::unordered_map ret; + for (const auto& row : *msg) { + gms::loaded_endpoint_state st; + auto ep = row.get_as("peer"); + if (!row.has("host_id")) { + // Must never happen after `peers_table_read_fixup` call above + on_internal_error_noexcept(slogger, format("load_endpoint_state: node {} has no host_id in system.{}", ep, PEERS)); + } + auto host_id = locator::host_id(row.get_as("host_id")); + if (row.has("tokens")) { + st.tokens = decode_tokens(deserialize_set_column(*peers(), row, "tokens")); + if (st.tokens.empty()) { + slogger.error("load_endpoint_state: node {}/{} has tokens column present but tokens are empty", host_id, ep); + continue; + } + } else { + slogger.warn("Endpoint {} has no tokens in system.{}", ep, PEERS); + } + if (row.has("data_center") && row.has("rack")) { + st.opt_dc_rack.emplace(locator::endpoint_dc_rack { + row.get_as("data_center"), + row.get_as("rack") + }); + if (st.opt_dc_rack->dc.empty() || st.opt_dc_rack->rack.empty()) { + slogger.error("load_endpoint_state: node {}/{} has empty dc={} or rack={}", host_id, ep, st.opt_dc_rack->dc, st.opt_dc_rack->rack); + continue; + } + } else { + slogger.warn("Endpoint {} has no {} in system.{}", ep, + !row.has("data_center") && !row.has("rack") ? "data_center nor rack" : !row.has("data_center") ? "data_center" : "rack", + PEERS); + } + st.endpoint = ep; + ret.emplace(host_id, std::move(st)); + } + + co_return ret; +} + future> system_keyspace::load_peers() { co_await peers_table_read_fixup(); diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index d397ab8a735a..0bca62aaf7e9 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -15,6 +15,7 @@ #include #include #include "db/system_auth_keyspace.hh" +#include "gms/gossiper.hh" #include "schema/schema_fwd.hh" #include "utils/UUID.hh" #include "query-result-set.hh" @@ -352,9 +353,9 @@ public: /** - * Return a map of IP addresses containing a map of dc and rack info + * Return a map of nodes and their loaded_endpoint_state */ - future> load_dc_rack_info(); + future> load_endpoint_state(); enum class bootstrap_state { NEEDS_BOOTSTRAP, diff --git a/gms/endpoint_state.cc b/gms/endpoint_state.cc index 1ec29546d35c..db87e633ef8e 100644 --- a/gms/endpoint_state.cc +++ b/gms/endpoint_state.cc @@ -12,9 +12,12 @@ #include "gms/i_endpoint_state_change_subscriber.hh" #include #include +#include "log.hh" namespace gms { +logging::logger logger("endpoint_state"); + static_assert(std::is_default_constructible_v); static_assert(std::is_nothrow_copy_constructible_v); static_assert(std::is_nothrow_move_constructible_v); @@ -51,10 +54,40 @@ bool endpoint_state::is_cql_ready() const noexcept { } locator::host_id endpoint_state::get_host_id() const noexcept { + locator::host_id host_id; if (auto app_state = get_application_state_ptr(application_state::HOST_ID)) { - return locator::host_id(utils::UUID(app_state->value())); + host_id = locator::host_id(utils::UUID(app_state->value())); + if (!host_id) { + on_internal_error_noexcept(logger, format("Node has null host_id")); + } + } + return host_id; +} + +std::optional endpoint_state::get_dc_rack() const { + if (auto app_state = get_application_state_ptr(application_state::DC)) { + std::optional ret; + ret->dc = app_state->value(); + if ((app_state = get_application_state_ptr(application_state::RACK))) { + ret->rack = app_state->value(); + if (ret->dc.empty() || ret->rack.empty()) { + on_internal_error_noexcept(logger, format("Node {} has empty dc={} or rack={}", get_host_id(), ret->dc, ret->rack)); + } + return ret; + } + } + return std::nullopt; +} + +std::unordered_set endpoint_state::get_tokens() const { + std::unordered_set ret; + if (auto app_state = get_application_state_ptr(application_state::TOKENS)) { + ret = versioned_value::tokens_from_string(app_state->value()); + if (ret.empty()) { + on_internal_error_noexcept(logger, format("Node {} has empty tokens state", get_host_id())); + } } - return locator::host_id::create_null_id(); + return ret; } future<> i_endpoint_state_change_subscriber::on_application_state_change(inet_address endpoint, diff --git a/gms/endpoint_state.hh b/gms/endpoint_state.hh index 9ea2bf71ac3e..10b545b42661 100644 --- a/gms/endpoint_state.hh +++ b/gms/endpoint_state.hh @@ -14,6 +14,7 @@ #include "gms/application_state.hh" #include "gms/versioned_value.hh" #include "locator/host_id.hh" +#include "locator/types.hh" namespace gms { @@ -152,6 +153,12 @@ public: // or a null host_id if the application state is not found. locator::host_id get_host_id() const noexcept; + std::optional get_dc_rack() const; + + // Return the value of the TOKENS application state + // or an empty set if the application state is not found. + std::unordered_set get_tokens() const; + friend fmt::formatter; }; diff --git a/gms/gossiper.cc b/gms/gossiper.cc index e8c08e91ecbd..d2b57b2a4df3 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -2020,11 +2020,9 @@ future<> gossiper::start_gossiping(gms::generation_type generation_nbr, applicat local_state.add_application_state(entry.first, entry.second); } - auto generation = local_state.get_heart_beat_state().get_generation(); + co_await replicate(get_broadcast_address(), local_state, permit.id()); - co_await replicate(get_broadcast_address(), std::move(local_state), permit.id()); - - logger.trace("gossip started with generation {}", generation); + logger.info("Gossip started with local state: {}", local_state); _enabled = true; _nr_run = 0; _scheduled_gossip_task.arm(INTERVAL); @@ -2130,11 +2128,21 @@ void gossiper::build_seeds_list() { } } -future<> gossiper::add_saved_endpoint(inet_address ep, permit_id pid) { - if (ep == get_broadcast_address()) { +future<> gossiper::add_saved_endpoint(locator::host_id host_id, gms::loaded_endpoint_state st, permit_id pid) { + if (host_id == my_host_id()) { logger.debug("Attempt to add self as saved endpoint"); co_return; } + const auto& ep = st.endpoint; + if (!host_id) { + on_internal_error(logger, format("Attempt to add {} with null host_id as saved endpoint", ep)); + } + if (ep == inet_address{}) { + on_internal_error(logger, format("Attempt to add {} with null inet_address as saved endpoint", host_id)); + } + if (ep == get_broadcast_address()) { + on_internal_error(logger, format("Attempt to add {} with broadcast_address {} as saved endpoint", host_id, ep)); + } auto permit = co_await lock_endpoint(ep, pid); @@ -2155,14 +2163,18 @@ future<> gossiper::add_saved_endpoint(inet_address ep, permit_id pid) { // It will get updated as a whole by handle_major_state_change // via do_apply_state_locally when (remote_generation > local_generation) const auto tmptr = get_token_metadata_ptr(); - auto host_id = tmptr->get_host_id_if_known(ep); - if (host_id) { - ep_state.add_application_state(gms::application_state::HOST_ID, versioned_value::host_id(host_id.value())); - auto tokens = tmptr->get_tokens(*host_id); - if (!tokens.empty()) { - std::unordered_set tokens_set(tokens.begin(), tokens.end()); - ep_state.add_application_state(gms::application_state::TOKENS, versioned_value::tokens(tokens_set)); - } + ep_state.add_application_state(gms::application_state::HOST_ID, versioned_value::host_id(host_id)); + auto tokens = tmptr->get_tokens(host_id); + if (!tokens.empty()) { + std::unordered_set tokens_set(tokens.begin(), tokens.end()); + ep_state.add_application_state(gms::application_state::TOKENS, versioned_value::tokens(tokens_set)); + } + if (st.opt_dc_rack) { + ep_state.add_application_state(gms::application_state::DC, gms::versioned_value::datacenter(st.opt_dc_rack->dc)); + ep_state.add_application_state(gms::application_state::RACK, gms::versioned_value::datacenter(st.opt_dc_rack->rack)); + } + if (st.opt_status) { + ep_state.add_application_state(gms::application_state::STATUS, std::move(*st.opt_status)); } auto generation = ep_state.get_heart_beat_state().get_generation(); co_await replicate(ep, std::move(ep_state), permit.id()); @@ -2683,4 +2695,16 @@ locator::token_metadata_ptr gossiper::get_token_metadata_ptr() const noexcept { return _shared_token_metadata.get(); } +std::ostream& operator<<(std::ostream& os, const loaded_endpoint_state& st) { + fmt::print(os, "{}", st); + return os; +} + } // namespace gms + +auto fmt::formatter::format(const gms::loaded_endpoint_state& st, fmt::format_context& ctx) const -> decltype(ctx.out()) { + return fmt::format_to(ctx.out(), "{{ endpoint={} dc={} rack={} tokens={} }}", st.endpoint, + st.opt_dc_rack ? st.opt_dc_rack->dc : "", + st.opt_dc_rack ? st.opt_dc_rack->rack : "", + st.tokens); +} diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 92bb24877078..b7425499a2fa 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -36,6 +36,7 @@ #include #include #include "locator/token_metadata.hh" +#include "locator/types.hh" namespace db { class config; @@ -76,6 +77,15 @@ struct gossip_config { uint32_t skip_wait_for_gossip_to_settle = -1; }; +struct loaded_endpoint_state { + gms::inet_address endpoint; + std::unordered_set tokens; + std::optional opt_dc_rack; + std::optional opt_status; +}; + +std::ostream& operator<<(std::ostream& os, const loaded_endpoint_state& st); + /** * This module is responsible for Gossiping information for the local endpoint. This abstraction * maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module @@ -139,6 +149,9 @@ public: return _gcfg.partitioner; } + locator::host_id my_host_id() const noexcept { + return get_token_metadata_ptr()->get_topology().my_host_id(); + } inet_address get_broadcast_address() const noexcept { return get_token_metadata_ptr()->get_topology().my_address(); } @@ -608,7 +621,7 @@ public: /** * Add an endpoint we knew about previously, but whose state is unknown */ - future<> add_saved_endpoint(inet_address ep, permit_id); + future<> add_saved_endpoint(locator::host_id host_id, loaded_endpoint_state st, permit_id); future<> add_local_application_state(application_state state, versioned_value value); @@ -704,3 +717,9 @@ struct gossip_get_endpoint_states_response { }; } // namespace gms + +template <> +struct fmt::formatter { + constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); } + auto format(const gms::loaded_endpoint_state&, fmt::format_context& ctx) const -> decltype(ctx.out()); +}; diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 34d7fbf26822..14fd0dfb200a 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -975,12 +975,6 @@ token_metadata::get_endpoint_for_host_id(host_id host_id) const { return _impl->get_endpoint_for_host_id(host_id); } -host_id_or_endpoint token_metadata::parse_host_id_and_endpoint(const sstring& host_id_string) const { - auto res = host_id_or_endpoint(host_id_string); - res.resolve(*this); - return res; -} - std::unordered_map token_metadata::get_endpoint_to_host_id_map_for_reading() const { return _impl->get_endpoint_to_host_id_map_for_reading(); @@ -1250,24 +1244,24 @@ host_id_or_endpoint::host_id_or_endpoint(const sstring& s, param_type restrict) switch (restrict) { case param_type::host_id: try { - id = host_id(utils::UUID(s)); + _value = host_id(utils::UUID(s)); } catch (const marshal_exception& e) { throw std::invalid_argument(format("Invalid host_id {}: {}", s, e.what())); } break; case param_type::endpoint: try { - endpoint = gms::inet_address(s); + _value = gms::inet_address(s); } catch (std::invalid_argument& e) { throw std::invalid_argument(format("Invalid inet_address {}: {}", s, e.what())); } break; case param_type::auto_detect: try { - id = host_id(utils::UUID(s)); + _value = host_id(utils::UUID(s)); } catch (const marshal_exception& e) { try { - endpoint = gms::inet_address(s); + _value = gms::inet_address(s); } catch (std::invalid_argument& e) { throw std::invalid_argument(format("Invalid host_id or inet_address {}", s)); } @@ -1275,20 +1269,26 @@ host_id_or_endpoint::host_id_or_endpoint(const sstring& s, param_type restrict) } } -void host_id_or_endpoint::resolve(const token_metadata& tm) { - if (id) { - auto endpoint_opt = tm.get_endpoint_for_host_id_if_known(id); - if (!endpoint_opt) { - throw std::runtime_error(format("Host ID {} not found in the cluster", id)); - } - endpoint = *endpoint_opt; - } else { - auto opt_id = tm.get_host_id_if_known(endpoint); - if (!opt_id) { - throw std::runtime_error(format("Host inet address {} not found in the cluster", endpoint)); - } - id = *opt_id; +host_id host_id_or_endpoint::resolve_id(const token_metadata& tm) const { + if (has_host_id()) { + return id(); + } + auto opt_id = tm.get_host_id_if_known(endpoint()); + if (!opt_id) { + throw std::runtime_error(format("Host inet address {} not found in the cluster", endpoint())); + } + return *opt_id; +} + +gms::inet_address host_id_or_endpoint::resolve_endpoint(const token_metadata& tm) const { + if (has_endpoint()) { + return endpoint(); + } + auto endpoint_opt = tm.get_endpoint_for_host_id_if_known(id()); + if (!endpoint_opt) { + throw std::runtime_error(format("Host ID {} not found in the cluster", id())); } + return *endpoint_opt; } } // namespace locator diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 3c1dc11f0aa1..a04519851446 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -45,8 +45,7 @@ class token_metadata; class tablet_metadata; struct host_id_or_endpoint { - host_id id; - gms::inet_address endpoint; + std::variant _value; enum class param_type { host_id, @@ -57,16 +56,25 @@ struct host_id_or_endpoint { host_id_or_endpoint(const sstring& s, param_type restrict = param_type::auto_detect); bool has_host_id() const noexcept { - return bool(id); + return _value.index() == 0; } bool has_endpoint() const noexcept { - return endpoint != gms::inet_address(); + return _value.index() == 1; } - // Map the host_id to endpoint based on whichever of them is set, - // using the token_metadata - void resolve(const token_metadata& tm); + host_id id() const { + return std::get(_value); + }; + + gms::inet_address endpoint() const { + return std::get(_value); + }; + + // Map the host_id to endpoint or vice verse, using the token_metadata. + // Throws runtime error if failed to resolve. + host_id resolve_id(const token_metadata&) const; + gms::inet_address resolve_endpoint(const token_metadata&) const; }; class token_metadata_impl; @@ -221,10 +229,6 @@ public: /** Return the end-point for a unique host ID */ inet_address get_endpoint_for_host_id(locator::host_id host_id) const; - /// Parses the \c host_id_string either as a host uuid or as an ip address and returns the mapping. - /// Throws std::invalid_argument on parse error or std::runtime_error if the host_id wasn't found. - host_id_or_endpoint parse_host_id_and_endpoint(const sstring& host_id_string) const; - /** @return a copy of the endpoint-to-id map for read-only operations */ std::unordered_map get_endpoint_to_host_id_map_for_reading() const; diff --git a/service/storage_service.cc b/service/storage_service.cc index 5170774041fd..8a9c80d11b4d 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -17,6 +17,7 @@ #include "service/qos/raft_service_level_distributed_data_accessor.hh" #include "service/qos/service_level_controller.hh" #include "service/qos/standard_service_level_distributed_data_accessor.hh" +#include "locator/token_metadata.hh" #include "service/topology_guard.hh" #include "service/session.hh" #include "dht/boot_strapper.hh" @@ -32,6 +33,7 @@ #include "db/consistency_level.hh" #include "seastar/core/when_all.hh" #include "service/tablet_allocator.hh" +#include "locator/types.hh" #include "locator/tablets.hh" #include "locator/tablet_metadata_guard.hh" #include "replica/tablet_mutation_builder.hh" @@ -699,7 +701,13 @@ future<> storage_service::topology_state_load() { if (is_me(e)) { continue; } - const auto ep = tmptr->get_endpoint_for_host_id(e); + const auto& topo = tmptr->get_topology(); + const auto* node = topo.find_node(e); + // node must exist in topology if it's in tmptr->get_all_endpoints + if (!node) { + on_internal_error(slogger, format("Found no node for {} in topology", e)); + } + const auto& ep = node->endpoint(); if (ep == inet_address{}) { continue; } @@ -708,7 +716,16 @@ future<> storage_service::topology_state_load() { // since it is not loaded in join_cluster in the // raft_topology_change_enabled() case. if (!_gossiper.get_endpoint_state_ptr(ep)) { - co_await _gossiper.add_saved_endpoint(ep, permit.id()); + gms::loaded_endpoint_state st; + st.endpoint = ep; + st.tokens = boost::copy_range>(tmptr->get_tokens(e)); + st.opt_dc_rack = node->dc_rack(); + // Save tokens, not needed for raft topology management, but needed by legacy + // Also ip -> id mapping is needed for address map recreation on reboot + if (node->is_this_node() && !st.tokens.empty()) { + st.opt_status = gms::versioned_value::normal(st.tokens); + } + co_await _gossiper.add_saved_endpoint(e, std::move(st), permit.id()); } } @@ -1058,11 +1075,11 @@ std::unordered_set storage_service::find_raft_nodes_from_hoeps( for (const auto& hoep : hoeps) { std::optional id; if (hoep.has_host_id()) { - id = raft::server_id{hoep.id.uuid()}; + id = raft::server_id{hoep.id().uuid()}; } else { - id = _group0->address_map().find_by_addr(hoep.endpoint); + id = _group0->address_map().find_by_addr(hoep.endpoint()); if (!id) { - throw std::runtime_error(::format("Cannot find a mapping to IP {}", hoep.endpoint)); + throw std::runtime_error(::format("Cannot find a mapping to IP {}", hoep.endpoint())); } } if (!_topology_state_machine._topology.find(*id)) { @@ -1396,7 +1413,7 @@ future<> storage_service::join_token_ring(sharded& proxy, sharded& gossiper, std::unordered_set initial_contact_nodes, - std::unordered_set loaded_endpoints, + std::unordered_map loaded_endpoints, std::unordered_map loaded_peer_features, std::chrono::milliseconds delay, start_hint_manager start_hm, @@ -1450,6 +1467,20 @@ future<> storage_service::join_token_ring(shardedupdate_host_id(ri->host_id, *replace_address); replaced_host_id = ri->host_id; + + // With gossip, after a full cluster restart, the ignored nodes + // state is loaded from system.peers with no STATUS state, + // therefore we need to "inject" their state here after we + // learn about them in the shadow round initiated in `prepare_replacement_info`. + for (const auto& [host_id, st] : ri->ignore_nodes) { + tmptr->update_host_id(host_id, st.endpoint); + if (st.opt_dc_rack) { + tmptr->update_topology(host_id, st.opt_dc_rack); + } + if (!st.tokens.empty()) { + co_await tmptr->update_normal_tokens(st.tokens, host_id); + } + } } } else if (should_bootstrap()) { co_await check_for_endpoint_collision(initial_contact_nodes, loaded_peer_features); @@ -1469,10 +1500,10 @@ future<> storage_service::join_token_ring(sharded storage_service::join_token_ring(sharded{}; std::vector sync_nodes; tm.get_topology().for_each_node([&] (const locator::node* np) { auto ep = np->endpoint(); - if (!ignore_nodes.contains(ep) && (!ri || ep != ri->address)) { + const auto& host_id = np->host_id(); + if (!ri || (host_id != ri->host_id && !ri->ignore_nodes.contains(host_id))) { sync_nodes.push_back(ep); } }); @@ -1976,13 +2004,12 @@ future<> storage_service::track_upgrade_progress_to_topology_coordinator(sharded } } -std::unordered_set storage_service::parse_node_list(sstring comma_separated_list, const token_metadata& tm) { +std::list storage_service::parse_node_list(sstring comma_separated_list) { std::vector ignore_nodes_strs = utils::split_comma_separated_list(std::move(comma_separated_list)); - std::unordered_set ignore_nodes; + std::list ignore_nodes; for (const sstring& n : ignore_nodes_strs) { try { - auto ep_and_id = tm.parse_host_id_and_endpoint(n); - ignore_nodes.insert(ep_and_id.endpoint); + ignore_nodes.push_back(locator::host_id_or_endpoint(n)); } catch (...) { throw std::runtime_error(::format("Failed to parse node list: {}: invalid node={}: {}", ignore_nodes_strs, n, std::current_exception())); } @@ -2742,52 +2769,36 @@ future<> storage_service::join_cluster(sharded& set_mode(mode::STARTING); - std::unordered_set loaded_endpoints; + std::unordered_map loaded_endpoints; if (_db.local().get_config().load_ring_state() && !raft_topology_change_enabled()) { slogger.info("Loading persisted ring state"); - auto loaded_tokens = co_await _sys_ks.local().load_tokens(); - auto loaded_host_ids = co_await _sys_ks.local().load_host_ids(); - auto loaded_dc_rack = co_await _sys_ks.local().load_dc_rack_info(); - - auto get_dc_rack = [&loaded_dc_rack] (inet_address ep) { - if (loaded_dc_rack.contains(ep)) { - return loaded_dc_rack[ep]; - } else { - return locator::endpoint_dc_rack::default_location; - } - }; - - if (slogger.is_enabled(logging::log_level::debug)) { - for (auto& x : loaded_tokens) { - slogger.debug("Loaded tokens: endpoint={}, tokens={}", x.first, x.second); - } - - for (auto& x : loaded_host_ids) { - slogger.debug("Loaded host_id: endpoint={}, uuid={}", x.first, x.second); - } - } + loaded_endpoints = co_await _sys_ks.local().load_endpoint_state(); auto tmlock = co_await get_token_metadata_lock(); auto tmptr = co_await get_mutable_token_metadata_ptr(); - for (auto x : loaded_tokens) { - auto ep = x.first; - auto tokens = x.second; - if (ep == get_broadcast_address()) { + for (auto& [host_id, st] : loaded_endpoints) { + if (st.endpoint == get_broadcast_address()) { // entry has been mistakenly added, delete it - co_await _sys_ks.local().remove_endpoint(ep); + slogger.warn("Loaded saved endpoint={}/{} has my broadcast address. Deleting it", host_id, st.endpoint); + co_await _sys_ks.local().remove_endpoint(st.endpoint); + } else if (st.tokens.empty()) { + slogger.debug("Not loading saved endpoint={}/{} since it owns no tokens", host_id, st.endpoint); } else { - const auto dc_rack = get_dc_rack(ep); - const auto hostIdIt = loaded_host_ids.find(ep); - if (hostIdIt == loaded_host_ids.end()) { - on_internal_error(slogger, format("can't find host_id for ep {}", ep)); + if (host_id == my_host_id()) { + on_internal_error(slogger, format("Loaded saved endpoint {} with my host_id={}", st.endpoint, host_id)); } - tmptr->update_topology(hostIdIt->second, dc_rack, locator::node::state::normal); - co_await tmptr->update_normal_tokens(tokens, hostIdIt->second); - tmptr->update_host_id(hostIdIt->second, ep); - loaded_endpoints.insert(ep); + if (!st.opt_dc_rack) { + st.opt_dc_rack = locator::endpoint_dc_rack::default_location; + slogger.warn("Loaded no dc/rack for saved endpoint={}/{}. Set to default={}/{}", host_id, st.endpoint, st.opt_dc_rack->dc, st.opt_dc_rack->rack); + } + const auto& dc_rack = *st.opt_dc_rack; + slogger.debug("Loaded tokens: endpoint={}/{} dc={} rack={} tokens={}", host_id, st.endpoint, dc_rack.dc, dc_rack.rack, st.tokens); + tmptr->update_topology(host_id, dc_rack, locator::node::state::normal); + co_await tmptr->update_normal_tokens(st.tokens, host_id); + tmptr->update_host_id(host_id, st.endpoint); // gossiping hasn't started yet // so no need to lock the endpoint - co_await _gossiper.add_saved_endpoint(ep, gms::null_permit_id); + co_await _gossiper.add_saved_endpoint(host_id, st, gms::null_permit_id); } } co_await replicate_to_all_cores(std::move(tmptr)); @@ -2800,10 +2811,12 @@ future<> storage_service::join_cluster(sharded& auto seeds = _gossiper.get_seeds(); auto initial_contact_nodes = loaded_endpoints.empty() ? std::unordered_set(seeds.begin(), seeds.end()) : - loaded_endpoints; + boost::copy_range>(loaded_endpoints | boost::adaptors::transformed([] (const auto& x) { + return x.second.endpoint; + })); auto loaded_peer_features = co_await _sys_ks.local().load_peer_features(); slogger.info("initial_contact_nodes={}, loaded_endpoints={}, loaded_peer_features={}", - initial_contact_nodes, loaded_endpoints, loaded_peer_features.size()); + initial_contact_nodes, loaded_endpoints | boost::adaptors::map_keys, loaded_peer_features.size()); for (auto& x : loaded_peer_features) { slogger.info("peer={}, supported_features={}", x.first, x.second); } @@ -3135,7 +3148,7 @@ storage_service::prepare_replacement_info(std::unordered_set std::unordered_set tokens; if (!raft_topology_change_enabled()) { - tokens = get_tokens_for(replace_address); + tokens = state->get_tokens(); if (tokens.empty()) { throw std::runtime_error(::format("Could not find tokens for {} to replace", replace_address)); } @@ -3146,15 +3159,52 @@ storage_service::prepare_replacement_info(std::unordered_set if (!replace_host_id) { replace_host_id = _gossiper.get_host_id(replace_address); } - slogger.info("Host {}/{} is replacing {}/{}", get_token_metadata().get_my_id(), get_broadcast_address(), replace_host_id, replace_address); - co_await _gossiper.reset_endpoint_state_map(); - co_return replacement_info { + auto ri = replacement_info { .tokens = std::move(tokens), .dc_rack = std::move(dc_rack), .host_id = std::move(replace_host_id), .address = replace_address, }; + + for (auto& hoep : parse_node_list(_db.local().get_config().ignore_dead_nodes_for_replace())) { + locator::host_id host_id; + gms::loaded_endpoint_state st; + // Resolve both host_id and endpoint + if (hoep.has_endpoint()) { + st.endpoint = hoep.endpoint(); + } else { + host_id = hoep.id(); + auto res = _gossiper.get_nodes_with_host_id(host_id); + if (res.size() == 0) { + throw std::runtime_error(::format("Could not find ignored node with host_id {}", host_id)); + } else if (res.size() > 1) { + throw std::runtime_error(::format("Found multiple nodes to ignore with host_id {}: {}", host_id, res)); + } + st.endpoint = *res.begin(); + } + auto esp = _gossiper.get_endpoint_state_ptr(st.endpoint); + if (!esp) { + throw std::runtime_error(::format("Ignore node {}/{} has no endpoint state", host_id, st.endpoint)); + } + if (!host_id) { + host_id = esp->get_host_id(); + if (!host_id) { + throw std::runtime_error(::format("Could not find host_id for ignored node {}", st.endpoint)); + } + } + st.tokens = esp->get_tokens(); + st.opt_dc_rack = esp->get_dc_rack(); + ri.ignore_nodes.emplace(host_id, std::move(st)); + } + + slogger.info("Host {}/{} is replacing {}/{} ignore_nodes={}", get_token_metadata().get_my_id(), get_broadcast_address(), replace_host_id, replace_address, + fmt::join(ri.ignore_nodes | boost::adaptors::transformed ([] (const auto& x) { + return fmt::format("{}/{}", x.first, x.second.endpoint); + }), ",")); + co_await _gossiper.reset_endpoint_state_map(); + + co_return ri; } future> storage_service::get_ownership() { @@ -3660,7 +3710,9 @@ void storage_service::run_replace_ops(std::unordered_set& bootstrap_token auto stop_ctl = deferred_stop(ctl); const auto& uuid = ctl.uuid(); gms::inet_address replace_address = replace_info.address; - ctl.ignore_nodes = parse_node_list(_db.local().get_config().ignore_dead_nodes_for_replace(), *ctl.tmptr); + ctl.ignore_nodes = boost::copy_range>(replace_info.ignore_nodes | boost::adaptors::transformed([] (const auto& x) { + return x.second.endpoint; + })); // Step 1: Decide who needs to sync data for replace operation // The replacing node is not a normal token owner yet // Add it back explicitly after checking all other nodes. @@ -3842,8 +3894,7 @@ future<> storage_service::removenode(locator::host_id host_id, std::list ignore_nodes; }; future prepare_replacement_info(std::unordered_set initial_contact_nodes, const std::unordered_map& loaded_peer_features); @@ -352,7 +354,7 @@ private: public: - static std::unordered_set parse_node_list(sstring comma_separated_list, const locator::token_metadata& tm); + static std::list parse_node_list(sstring comma_separated_list); future<> check_for_endpoint_collision(std::unordered_set initial_contact_nodes, const std::unordered_map& loaded_peer_features); @@ -381,7 +383,7 @@ private: sharded& proxy, sharded& gossiper, std::unordered_set initial_contact_nodes, - std::unordered_set loaded_endpoints, + std::unordered_map loaded_endpoints, std::unordered_map loaded_peer_features, std::chrono::milliseconds, start_hint_manager start_hm,