diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index ee9cc1b0cb71..1135648599e0 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -73,7 +73,7 @@ inet_address_vector_replica_set vnode_effective_replication_map::get_natural_end void maybe_remove_node_being_replaced(const token_metadata& tm, const abstract_replication_strategy& rs, inet_address_vector_replica_set& natural_endpoints) { - if (tm.is_any_node_being_replaced() && + if (tm.get_new()->is_any_node_being_replaced() && rs.allow_remove_node_being_replaced_from_natural_endpoints()) { // When a new node is started to replace an existing dead node, we want // to make the replacing node take writes but do not count it for @@ -87,7 +87,8 @@ void maybe_remove_node_being_replaced(const token_metadata& tm, // as the natural_endpoints and the node will not appear in the // pending_endpoints. auto it = boost::range::remove_if(natural_endpoints, [&] (gms::inet_address& p) { - return tm.is_being_replaced(p); + const auto host_id = tm.get_new()->get_host_id_if_known(p); + return host_id && tm.get_new()->is_being_replaced(*host_id); }); natural_endpoints.erase(it, natural_endpoints.end()); } @@ -350,26 +351,54 @@ abstract_replication_strategy::get_pending_address_ranges(const token_metadata_p static const auto default_replication_map_key = dht::token::from_int64(0); +template +static Result resolve_endpoints(const token_metadata2& tm, const std::unordered_set host_ids) { + Result result{}; + result.reserve(host_ids.size()); + for (const auto& host_id: host_ids) { + if (!host_id) { + // Empty host_id is used as a marker for local address. + // Two reasons for this hack: + // * We need local_strategy to work before the local host_id is + // loaded from the system.local table. + // * The code that interacts with token_metadata.topology updates it + // based on a specific workflow. In this workflow, the statuses of the + // nodes go through various stages like bootstrapping, replacing, and normal. + // Because of this, we can't simply insert the local host ID; + // it needs to be always available. + + result.insert(result.end(), utils::fb_utilities::get_broadcast_address()); + } else { + result.insert(result.end(), tm.get_endpoint_for_host_id(host_id)); + } + } + return result; +} + future calculate_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr) { replication_map replication_map; ring_mapping pending_endpoints; ring_mapping read_endpoints; const auto depend_on_token = rs->natural_endpoints_depend_on_token(); - const auto& sorted_tokens = tmptr->sorted_tokens(); + const auto& sorted_tokens = tmptr->get_new()->sorted_tokens(); replication_map.reserve(depend_on_token ? sorted_tokens.size() : 1); - if (const auto& topology_changes = tmptr->get_topology_change_info(); topology_changes) { + if (const auto& topology_changes = tmptr->get_new()->get_topology_change_info(); topology_changes) { const auto& all_tokens = topology_changes->all_tokens; - const auto& base_token_metadata = topology_changes->base_token_metadata - ? *topology_changes->base_token_metadata - : *tmptr; - const auto& current_tokens = tmptr->get_token_to_endpoint(); + const auto* base_token_metadata = tmptr.get(); + std::optional base_token_metadata_old; + if (topology_changes->base_token_metadata) { + base_token_metadata_old = token_metadata(topology_changes->base_token_metadata); + base_token_metadata = &*base_token_metadata_old; + } + const auto target_token_metadata = token_metadata(topology_changes->target_token_metadata); + const auto& current_tokens = tmptr->get_new()->get_token_to_endpoint(); for (size_t i = 0, size = all_tokens.size(); i < size; ++i) { co_await coroutine::maybe_yield(); const auto token = all_tokens[i]; - auto current_endpoints = get(co_await rs->calculate_natural_endpoints(token, base_token_metadata, false)); - auto target_endpoints = get(co_await rs->calculate_natural_endpoints(token, *topology_changes->target_token_metadata, false)); + auto current_endpoints = get(co_await rs->calculate_natural_endpoints(token, *base_token_metadata, true)); + auto target_endpoints = get(co_await rs->calculate_natural_endpoints(token, target_token_metadata, true)); auto add_mapping = [&](ring_mapping& target, std::unordered_set&& endpoints) { using interval = ring_mapping::interval_type; @@ -392,38 +421,50 @@ future calculate_effective_replicat }; { - std::unordered_set endpoints_diff; + std::unordered_set endpoints_diff; for (const auto& e: target_endpoints) { if (!current_endpoints.contains(e)) { endpoints_diff.insert(e); } } if (!endpoints_diff.empty()) { - add_mapping(pending_endpoints, std::move(endpoints_diff)); + auto resolved_endpoints = resolve_endpoints>( + *tmptr->get_new(), std::move(endpoints_diff)); + add_mapping(pending_endpoints, std::move(resolved_endpoints)); } } // in order not to waste memory, we update read_endpoints only if the // new endpoints differs from the old one if (topology_changes->read_new && target_endpoints.get_vector() != current_endpoints.get_vector()) { - add_mapping(read_endpoints, std::move(target_endpoints).extract_set()); + auto resolved_endpoints = resolve_endpoints>( + *tmptr->get_new(), target_endpoints.get_set()); + add_mapping(read_endpoints, std::move(resolved_endpoints)); } if (!depend_on_token) { - replication_map.emplace(default_replication_map_key, std::move(current_endpoints).extract_vector()); + auto resolved_current_endpoints = resolve_endpoints( + *tmptr->get_new(), current_endpoints.get_set()); + replication_map.emplace(default_replication_map_key, std::move(resolved_current_endpoints)); break; } else if (current_tokens.contains(token)) { - replication_map.emplace(token, std::move(current_endpoints).extract_vector()); + auto resolved_current_endpoints = resolve_endpoints( + *tmptr->get_new(), current_endpoints.get_set()); + replication_map.emplace(token, std::move(resolved_current_endpoints)); } } } else if (depend_on_token) { for (const auto &t : sorted_tokens) { - auto eps = get(co_await rs->calculate_natural_endpoints(t, *tmptr, false)); - replication_map.emplace(t, std::move(eps).extract_vector()); + auto eps = get(co_await rs->calculate_natural_endpoints(t, *tmptr, true)); + auto resolved_eps = resolve_endpoints( + *tmptr->get_new(), eps.get_set()); + replication_map.emplace(t, std::move(resolved_eps)); } } else { - auto eps = get(co_await rs->calculate_natural_endpoints(default_replication_map_key, *tmptr, false)); - replication_map.emplace(default_replication_map_key, std::move(eps).extract_vector()); + auto eps = get(co_await rs->calculate_natural_endpoints(default_replication_map_key, *tmptr, true)); + auto resolved_eps = resolve_endpoints( + *tmptr->get_new(), eps.get_set()); + replication_map.emplace(default_replication_map_key, std::move(resolved_eps)); } auto rf = rs->get_replication_factor(*tmptr); @@ -455,7 +496,7 @@ const inet_address_vector_replica_set& vnode_effective_replication_map::do_get_n bool is_vnode) const { const token& key_token = _rs->natural_endpoints_depend_on_token() - ? (is_vnode ? tok : _tmptr->first_token(tok)) + ? (is_vnode ? tok : _tmptr->get_new()->first_token(tok)) : default_replication_map_key; const auto it = _replication_map.find(key_token); return it->second; diff --git a/locator/everywhere_replication_strategy.cc b/locator/everywhere_replication_strategy.cc index 0acd8712942b..d350bf15b902 100644 --- a/locator/everywhere_replication_strategy.cc +++ b/locator/everywhere_replication_strategy.cc @@ -33,7 +33,7 @@ future everywhere_replication_strategy::calculate_natural_endpo } size_t everywhere_replication_strategy::get_replication_factor(const token_metadata& tm) const { - return tm.sorted_tokens().empty() ? 1 : tm.count_normal_token_owners(); + return tm.get_new()->sorted_tokens().empty() ? 1 : tm.get_new()->count_normal_token_owners(); } using registry = class_registrator; diff --git a/test/boost/token_metadata_test.cc b/test/boost/token_metadata_test.cc index 5e1ef58015c8..c55974152d6b 100644 --- a/test/boost/token_metadata_test.cc +++ b/test/boost/token_metadata_test.cc @@ -18,28 +18,32 @@ using namespace locator; namespace { const auto ks_name = sstring("test-ks"); - endpoint_dc_rack get_dc_rack(gms::inet_address) { + host_id gen_id(int id) { + return host_id{utils::UUID(0, id)}; + } + + endpoint_dc_rack get_dc_rack(host_id) { return { .dc = "unk-dc", .rack = "unk-rack" }; } - mutable_token_metadata_ptr create_token_metadata(inet_address this_endpoint) { - return make_lw_shared(token_metadata::config { + mutable_token_metadata2_ptr create_token_metadata(host_id this_host_id) { + return make_lw_shared(token_metadata::config { topology::config { - .this_endpoint = this_endpoint, - .local_dc_rack = get_dc_rack(this_endpoint) + .this_host_id = this_host_id, + .local_dc_rack = get_dc_rack(this_host_id) } }); } template - mutable_vnode_erm_ptr create_erm(mutable_token_metadata_ptr tmptr, replication_strategy_config_options opts = {}) { - dc_rack_fn get_dc_rack_fn = get_dc_rack; + mutable_vnode_erm_ptr create_erm(mutable_token_metadata2_ptr tmptr, replication_strategy_config_options opts = {}) { + dc_rack_fn get_dc_rack_fn = get_dc_rack; tmptr->update_topology_change_info(get_dc_rack_fn).get(); auto strategy = seastar::make_shared(std::move(opts)); - return calculate_effective_replication_map(std::move(strategy), std::move(tmptr)).get0(); + return calculate_effective_replication_map(std::move(strategy), make_token_metadata_ptr(tmptr)).get0(); } template @@ -52,14 +56,18 @@ namespace { SEASTAR_THREAD_TEST_CASE(test_pending_and_read_endpoints_for_everywhere_strategy) { const auto e1 = inet_address("192.168.0.1"); const auto e2 = inet_address("192.168.0.2"); + const auto e1_id = gen_id(1); + const auto e2_id = gen_id(2); const auto t1 = dht::token::from_int64(10); const auto t2 = dht::token::from_int64(20); - auto token_metadata = create_token_metadata(e1); - token_metadata->update_topology(e1, get_dc_rack(e1)); - token_metadata->update_topology(e2, get_dc_rack(e2)); - token_metadata->update_normal_tokens({t1}, e1).get(); - token_metadata->add_bootstrap_token(t2, e2); + auto token_metadata = create_token_metadata(e1_id); + token_metadata->update_host_id(e1_id, e1); + token_metadata->update_host_id(e2_id, e2); + token_metadata->update_topology(e1_id, get_dc_rack(e1_id)); + token_metadata->update_topology(e2_id, get_dc_rack(e2_id)); + token_metadata->update_normal_tokens({t1}, e1_id).get(); + token_metadata->add_bootstrap_token(t2, e2_id); token_metadata->set_read_new(token_metadata::read_new_t::yes); auto erm = create_erm(token_metadata); @@ -72,12 +80,16 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_second_node) { const auto t1 = dht::token::from_int64(1); const auto e2 = inet_address("192.168.0.2"); const auto t2 = dht::token::from_int64(100); + const auto e1_id = gen_id(1); + const auto e2_id = gen_id(2); - auto token_metadata = create_token_metadata(e1); - token_metadata->update_topology(e1, get_dc_rack(e1)); - token_metadata->update_topology(e2, get_dc_rack(e2)); - token_metadata->update_normal_tokens({t1}, e1).get(); - token_metadata->add_bootstrap_token(t2, e2); + auto token_metadata = create_token_metadata(e1_id); + token_metadata->update_host_id(e1_id, e1); + token_metadata->update_host_id(e2_id, e2); + token_metadata->update_topology(e1_id, get_dc_rack(e1_id)); + token_metadata->update_topology(e2_id, get_dc_rack(e2_id)); + token_metadata->update_normal_tokens({t1}, e1_id).get(); + token_metadata->add_bootstrap_token(t2, e2_id); auto erm = create_erm(token_metadata, {{"replication_factor", "1"}}); assert_eq(erm->get_pending_endpoints(dht::token::from_int64(0)), {}); @@ -95,14 +107,20 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_with_replicas) { const auto e1 = inet_address("192.168.0.1"); const auto e2 = inet_address("192.168.0.2"); const auto e3 = inet_address("192.168.0.3"); - - auto token_metadata = create_token_metadata(e1); - token_metadata->update_topology(e1, get_dc_rack(e1)); - token_metadata->update_topology(e2, get_dc_rack(e2)); - token_metadata->update_topology(e3, get_dc_rack(e3)); - token_metadata->update_normal_tokens({t1, t1000}, e2).get(); - token_metadata->update_normal_tokens({t10}, e3).get(); - token_metadata->add_bootstrap_token(t100, e1); + const auto e1_id = gen_id(1); + const auto e2_id = gen_id(2); + const auto e3_id = gen_id(3); + + auto token_metadata = create_token_metadata(e1_id); + token_metadata->update_host_id(e1_id, e1); + token_metadata->update_host_id(e2_id, e2); + token_metadata->update_host_id(e3_id, e3); + token_metadata->update_topology(e1_id, get_dc_rack(e1_id)); + token_metadata->update_topology(e2_id, get_dc_rack(e2_id)); + token_metadata->update_topology(e3_id, get_dc_rack(e3_id)); + token_metadata->update_normal_tokens({t1, t1000}, e2_id).get(); + token_metadata->update_normal_tokens({t10}, e3_id).get(); + token_metadata->add_bootstrap_token(t100, e1_id); auto erm = create_erm(token_metadata, {{"replication_factor", "2"}}); assert_eq(erm->get_pending_endpoints(dht::token::from_int64(1)), {}); @@ -120,15 +138,21 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_leave_with_replicas) { const auto e1 = inet_address("192.168.0.1"); const auto e2 = inet_address("192.168.0.2"); const auto e3 = inet_address("192.168.0.3"); - - auto token_metadata = create_token_metadata(e1); - token_metadata->update_topology(e1, get_dc_rack(e1)); - token_metadata->update_topology(e2, get_dc_rack(e2)); - token_metadata->update_topology(e3, get_dc_rack(e3)); - token_metadata->update_normal_tokens({t1, t1000}, e2).get(); - token_metadata->update_normal_tokens({t10}, e3).get(); - token_metadata->update_normal_tokens({t100}, e1).get(); - token_metadata->add_leaving_endpoint(e1); + const auto e1_id = gen_id(1); + const auto e2_id = gen_id(2); + const auto e3_id = gen_id(3); + + auto token_metadata = create_token_metadata(e1_id); + token_metadata->update_host_id(e1_id, e1); + token_metadata->update_host_id(e2_id, e2); + token_metadata->update_host_id(e3_id, e3); + token_metadata->update_topology(e1_id, get_dc_rack(e1_id)); + token_metadata->update_topology(e2_id, get_dc_rack(e2_id)); + token_metadata->update_topology(e3_id, get_dc_rack(e3_id)); + token_metadata->update_normal_tokens({t1, t1000}, e2_id).get(); + token_metadata->update_normal_tokens({t10}, e3_id).get(); + token_metadata->update_normal_tokens({t100}, e1_id).get(); + token_metadata->add_leaving_endpoint(e1_id); auto erm = create_erm(token_metadata, {{"replication_factor", "2"}}); assert_eq(erm->get_pending_endpoints(dht::token::from_int64(1)), {}); @@ -147,16 +171,24 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_replace_with_replicas) { const auto e2 = inet_address("192.168.0.2"); const auto e3 = inet_address("192.168.0.3"); const auto e4 = inet_address("192.168.0.4"); - - auto token_metadata = create_token_metadata(e1); - token_metadata->update_topology(e1, get_dc_rack(e1)); - token_metadata->update_topology(e2, get_dc_rack(e2)); - token_metadata->update_topology(e3, get_dc_rack(e3)); - token_metadata->update_topology(e4, get_dc_rack(e4)); - token_metadata->update_normal_tokens({t1000}, e1).get(); - token_metadata->update_normal_tokens({t1, t100}, e2).get(); - token_metadata->update_normal_tokens({t10}, e3).get(); - token_metadata->add_replacing_endpoint(e3, e4); + const auto e1_id = gen_id(1); + const auto e2_id = gen_id(2); + const auto e3_id = gen_id(3); + const auto e4_id = gen_id(4); + + auto token_metadata = create_token_metadata(e1_id); + token_metadata->update_host_id(e1_id, e1); + token_metadata->update_host_id(e2_id, e2); + token_metadata->update_host_id(e3_id, e3); + token_metadata->update_host_id(e4_id, e4); + token_metadata->update_topology(e1_id, get_dc_rack(e1_id)); + token_metadata->update_topology(e2_id, get_dc_rack(e2_id)); + token_metadata->update_topology(e3_id, get_dc_rack(e3_id)); + token_metadata->update_topology(e4_id, get_dc_rack(e4_id)); + token_metadata->update_normal_tokens({t1000}, e1_id).get(); + token_metadata->update_normal_tokens({t1, t100}, e2_id).get(); + token_metadata->update_normal_tokens({t10}, e3_id).get(); + token_metadata->add_replacing_endpoint(e3_id, e4_id); auto erm = create_erm(token_metadata, {{"replication_factor", "2"}}); assert_eq(erm->get_pending_endpoints(dht::token::from_int64(100)), {}); @@ -176,14 +208,20 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas const auto e1 = inet_address("192.168.0.1"); const auto e2 = inet_address("192.168.0.2"); const auto e3 = inet_address("192.168.0.3"); - - auto token_metadata = create_token_metadata(e1); - token_metadata->update_topology(e1, get_dc_rack(e1)); - token_metadata->update_topology(e2, get_dc_rack(e2)); - token_metadata->update_topology(e3, get_dc_rack(e3)); - token_metadata->update_normal_tokens({t1, t1000}, e2).get(); - token_metadata->update_normal_tokens({t10}, e3).get(); - token_metadata->add_bootstrap_token(t100, e1); + const auto e1_id = gen_id(1); + const auto e2_id = gen_id(2); + const auto e3_id = gen_id(3); + + auto token_metadata = create_token_metadata(e1_id); + token_metadata->update_host_id(e1_id, e1); + token_metadata->update_host_id(e2_id, e2); + token_metadata->update_host_id(e3_id, e3); + token_metadata->update_topology(e1_id, get_dc_rack(e1_id)); + token_metadata->update_topology(e2_id, get_dc_rack(e2_id)); + token_metadata->update_topology(e3_id, get_dc_rack(e3_id)); + token_metadata->update_normal_tokens({t1, t1000}, e2_id).get(); + token_metadata->update_normal_tokens({t10}, e3_id).get(); + token_metadata->add_bootstrap_token(t100, e1_id); auto check_endpoints = [](mutable_vnode_erm_ptr erm, int64_t t, const std::unordered_set& expected_replicas, @@ -223,13 +261,15 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas SEASTAR_THREAD_TEST_CASE(test_replace_node_with_same_endpoint) { const auto t1 = dht::token::from_int64(1); const auto e1 = inet_address("192.168.0.1"); + const auto e1_id = gen_id(1); - auto token_metadata = create_token_metadata(e1); - token_metadata->update_topology(e1, get_dc_rack(e1)); - token_metadata->update_normal_tokens({t1}, e1).get(); - token_metadata->add_replacing_endpoint(e1, e1); + auto token_metadata = create_token_metadata(e1_id); + token_metadata->update_host_id(e1_id, e1); + token_metadata->update_topology(e1_id, get_dc_rack(e1_id)); + token_metadata->update_normal_tokens({t1}, e1_id).get(); + token_metadata->add_replacing_endpoint(e1_id, e1_id); auto erm = create_erm(token_metadata, {{"replication_factor", "2"}}); assert_eq(erm->get_pending_endpoints(dht::token::from_int64(1)), {e1}); - BOOST_REQUIRE_EQUAL(token_metadata->get_endpoint(t1), e1); + BOOST_REQUIRE_EQUAL(token_metadata->get_endpoint(t1), e1_id); }