Skip to content

Commit

Permalink
calculate_effective_replication_map: use new token_metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
Petr Gusev committed Oct 31, 2023
1 parent e4efe83 commit 4026819
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 79 deletions.
81 changes: 61 additions & 20 deletions locator/abstract_replication_strategy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ inet_address_vector_replica_set vnode_effective_replication_map::get_natural_end
void maybe_remove_node_being_replaced(const token_metadata& tm,
const abstract_replication_strategy& rs,
inet_address_vector_replica_set& natural_endpoints) {
if (tm.is_any_node_being_replaced() &&
if (tm.get_new()->is_any_node_being_replaced() &&
rs.allow_remove_node_being_replaced_from_natural_endpoints()) {
// When a new node is started to replace an existing dead node, we want
// to make the replacing node take writes but do not count it for
Expand All @@ -87,7 +87,8 @@ void maybe_remove_node_being_replaced(const token_metadata& tm,
// as the natural_endpoints and the node will not appear in the
// pending_endpoints.
auto it = boost::range::remove_if(natural_endpoints, [&] (gms::inet_address& p) {
return tm.is_being_replaced(p);
const auto host_id = tm.get_new()->get_host_id_if_known(p);
return host_id && tm.get_new()->is_being_replaced(*host_id);
});
natural_endpoints.erase(it, natural_endpoints.end());
}
Expand Down Expand Up @@ -350,26 +351,54 @@ abstract_replication_strategy::get_pending_address_ranges(const token_metadata_p

static const auto default_replication_map_key = dht::token::from_int64(0);

template <typename Result>
static Result resolve_endpoints(const token_metadata2& tm, const std::unordered_set<locator::host_id> host_ids) {
Result result{};
result.reserve(host_ids.size());
for (const auto& host_id: host_ids) {
if (!host_id) {
// Empty host_id is used as a marker for local address.
// Two reasons for this hack:
// * We need local_strategy to work before the local host_id is
// loaded from the system.local table.
// * The code that interacts with token_metadata.topology updates it
// based on a specific workflow. In this workflow, the statuses of the
// nodes go through various stages like bootstrapping, replacing, and normal.
// Because of this, we can't simply insert the local host ID;
// it needs to be always available.

result.insert(result.end(), utils::fb_utilities::get_broadcast_address());
} else {
result.insert(result.end(), tm.get_endpoint_for_host_id(host_id));
}
}
return result;
}

future<mutable_vnode_effective_replication_map_ptr> calculate_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr) {
replication_map replication_map;
ring_mapping pending_endpoints;
ring_mapping read_endpoints;
const auto depend_on_token = rs->natural_endpoints_depend_on_token();
const auto& sorted_tokens = tmptr->sorted_tokens();
const auto& sorted_tokens = tmptr->get_new()->sorted_tokens();
replication_map.reserve(depend_on_token ? sorted_tokens.size() : 1);
if (const auto& topology_changes = tmptr->get_topology_change_info(); topology_changes) {
if (const auto& topology_changes = tmptr->get_new()->get_topology_change_info(); topology_changes) {
const auto& all_tokens = topology_changes->all_tokens;
const auto& base_token_metadata = topology_changes->base_token_metadata
? *topology_changes->base_token_metadata
: *tmptr;
const auto& current_tokens = tmptr->get_token_to_endpoint();
const auto* base_token_metadata = tmptr.get();
std::optional<token_metadata> base_token_metadata_old;
if (topology_changes->base_token_metadata) {
base_token_metadata_old = token_metadata(topology_changes->base_token_metadata);
base_token_metadata = &*base_token_metadata_old;
}
const auto target_token_metadata = token_metadata(topology_changes->target_token_metadata);
const auto& current_tokens = tmptr->get_new()->get_token_to_endpoint();
for (size_t i = 0, size = all_tokens.size(); i < size; ++i) {
co_await coroutine::maybe_yield();

const auto token = all_tokens[i];

auto current_endpoints = get<endpoint_set>(co_await rs->calculate_natural_endpoints(token, base_token_metadata, false));
auto target_endpoints = get<endpoint_set>(co_await rs->calculate_natural_endpoints(token, *topology_changes->target_token_metadata, false));
auto current_endpoints = get<host_id_set>(co_await rs->calculate_natural_endpoints(token, *base_token_metadata, true));
auto target_endpoints = get<host_id_set>(co_await rs->calculate_natural_endpoints(token, target_token_metadata, true));

auto add_mapping = [&](ring_mapping& target, std::unordered_set<inet_address>&& endpoints) {
using interval = ring_mapping::interval_type;
Expand All @@ -392,38 +421,50 @@ future<mutable_vnode_effective_replication_map_ptr> calculate_effective_replicat
};

{
std::unordered_set<inet_address> endpoints_diff;
std::unordered_set<locator::host_id> endpoints_diff;
for (const auto& e: target_endpoints) {
if (!current_endpoints.contains(e)) {
endpoints_diff.insert(e);
}
}
if (!endpoints_diff.empty()) {
add_mapping(pending_endpoints, std::move(endpoints_diff));
auto resolved_endpoints = resolve_endpoints<std::unordered_set<inet_address>>(
*tmptr->get_new(), std::move(endpoints_diff));
add_mapping(pending_endpoints, std::move(resolved_endpoints));
}
}

// in order not to waste memory, we update read_endpoints only if the
// new endpoints differs from the old one
if (topology_changes->read_new && target_endpoints.get_vector() != current_endpoints.get_vector()) {
add_mapping(read_endpoints, std::move(target_endpoints).extract_set());
auto resolved_endpoints = resolve_endpoints<std::unordered_set<inet_address>>(
*tmptr->get_new(), target_endpoints.get_set());
add_mapping(read_endpoints, std::move(resolved_endpoints));
}

if (!depend_on_token) {
replication_map.emplace(default_replication_map_key, std::move(current_endpoints).extract_vector());
auto resolved_current_endpoints = resolve_endpoints<inet_address_vector_replica_set>(
*tmptr->get_new(), current_endpoints.get_set());
replication_map.emplace(default_replication_map_key, std::move(resolved_current_endpoints));
break;
} else if (current_tokens.contains(token)) {
replication_map.emplace(token, std::move(current_endpoints).extract_vector());
auto resolved_current_endpoints = resolve_endpoints<inet_address_vector_replica_set>(
*tmptr->get_new(), current_endpoints.get_set());
replication_map.emplace(token, std::move(resolved_current_endpoints));
}
}
} else if (depend_on_token) {
for (const auto &t : sorted_tokens) {
auto eps = get<endpoint_set>(co_await rs->calculate_natural_endpoints(t, *tmptr, false));
replication_map.emplace(t, std::move(eps).extract_vector());
auto eps = get<host_id_set>(co_await rs->calculate_natural_endpoints(t, *tmptr, true));
auto resolved_eps = resolve_endpoints<inet_address_vector_replica_set>(
*tmptr->get_new(), eps.get_set());
replication_map.emplace(t, std::move(resolved_eps));
}
} else {
auto eps = get<endpoint_set>(co_await rs->calculate_natural_endpoints(default_replication_map_key, *tmptr, false));
replication_map.emplace(default_replication_map_key, std::move(eps).extract_vector());
auto eps = get<host_id_set>(co_await rs->calculate_natural_endpoints(default_replication_map_key, *tmptr, true));
auto resolved_eps = resolve_endpoints<inet_address_vector_replica_set>(
*tmptr->get_new(), eps.get_set());
replication_map.emplace(default_replication_map_key, std::move(resolved_eps));
}

auto rf = rs->get_replication_factor(*tmptr);
Expand Down Expand Up @@ -455,7 +496,7 @@ const inet_address_vector_replica_set& vnode_effective_replication_map::do_get_n
bool is_vnode) const
{
const token& key_token = _rs->natural_endpoints_depend_on_token()
? (is_vnode ? tok : _tmptr->first_token(tok))
? (is_vnode ? tok : _tmptr->get_new()->first_token(tok))
: default_replication_map_key;
const auto it = _replication_map.find(key_token);
return it->second;
Expand Down
2 changes: 1 addition & 1 deletion locator/everywhere_replication_strategy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ future<natural_ep_type> everywhere_replication_strategy::calculate_natural_endpo
}

size_t everywhere_replication_strategy::get_replication_factor(const token_metadata& tm) const {
return tm.sorted_tokens().empty() ? 1 : tm.count_normal_token_owners();
return tm.get_new()->sorted_tokens().empty() ? 1 : tm.get_new()->count_normal_token_owners();
}

using registry = class_registrator<abstract_replication_strategy, everywhere_replication_strategy, const replication_strategy_config_options&>;
Expand Down

0 comments on commit 4026819

Please sign in to comment.