Skip to content

Commit

Permalink
Revert "Merge 'token_metadata: Do not use topology info for is_member…
Browse files Browse the repository at this point in the history
… check' from Asias He"

This reverts commit a44ca06, reversing
changes made to 2779a17.
  • Loading branch information
xemul committed Nov 23, 2022
1 parent aec9fad commit 6878de0
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 32 deletions.
2 changes: 1 addition & 1 deletion db/hints/manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ bool manager::end_point_hints_manager::sender::can_send() noexcept {
return true;
} else {
if (!_state.contains(state::ep_state_left_the_ring)) {
_state.set_if<state::ep_state_left_the_ring>(!_shard_manager.local_db().get_token_metadata().is_normal_token_owner(end_point_key()));
_state.set_if<state::ep_state_left_the_ring>(!_shard_manager.local_db().get_token_metadata().is_member(end_point_key()));
}
// send the hints out if the destination Node is part of the ring - we will send to all new replicas in this case
return _state.contains(state::ep_state_left_the_ring);
Expand Down
2 changes: 1 addition & 1 deletion db/system_keyspace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1881,7 +1881,7 @@ class cluster_status_table : public memtable_filling_virtual_table {
set_cell(cr, "host_id", hostid->uuid());
}

if (tm.is_normal_token_owner(endpoint)) {
if (tm.is_member(endpoint)) {
sstring dc = tm.get_topology().get_location(endpoint).dc;
set_cell(cr, "dc", dc);
}
Expand Down
4 changes: 2 additions & 2 deletions gms/gossiper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ future<> gossiper::do_status_check() {
// check for dead state removal
auto expire_time = get_expire_time_for_endpoint(endpoint);
if (!is_alive && (now > expire_time)
&& (!get_token_metadata_ptr()->is_normal_token_owner(endpoint))) {
&& (!get_token_metadata_ptr()->is_member(endpoint))) {
logger.debug("time is expiring for endpoint : {} ({})", endpoint, expire_time.time_since_epoch().count());
co_await evict_from_membership(endpoint);
}
Expand Down Expand Up @@ -1300,7 +1300,7 @@ bool gossiper::is_gossip_only_member(inet_address endpoint) {
if (!es) {
return false;
}
return !is_dead_state(*es) && !get_token_metadata_ptr()->is_normal_token_owner(endpoint);
return !is_dead_state(*es) && !get_token_metadata_ptr()->is_member(endpoint);
}

clk::time_point gossiper::get_expire_time_for_endpoint(inet_address endpoint) const noexcept {
Expand Down
2 changes: 1 addition & 1 deletion locator/abstract_replication_strategy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ abstract_replication_strategy::get_address_ranges(const token_metadata& tm) cons
future<std::unordered_multimap<inet_address, dht::token_range>>
abstract_replication_strategy::get_address_ranges(const token_metadata& tm, inet_address endpoint) const {
std::unordered_multimap<inet_address, dht::token_range> ret;
if (!tm.is_normal_token_owner(endpoint)) {
if (!tm.is_member(endpoint)) {
co_return ret;
}
bool is_everywhere_topology = get_type() == replication_strategy_type::everywhere_topology;
Expand Down
14 changes: 7 additions & 7 deletions locator/token_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class token_metadata_impl final {
public:
void remove_endpoint(inet_address endpoint);

bool is_normal_token_owner(inet_address endpoint) const;
bool is_member(inet_address endpoint) const;

bool is_leaving(inet_address endpoint) const;

Expand Down Expand Up @@ -420,8 +420,8 @@ future<> token_metadata_impl::update_normal_tokens(std::unordered_set<token> tok
co_return;
}

if (!_topology.has_endpoint(endpoint, topology::pending::no)) {
on_internal_error(tlogger, format("token_metadata_impl: {} must be a member of topology to update normal tokens", endpoint));
if (!is_member(endpoint)) {
on_internal_error(tlogger, format("token_metadata_impl: {} must be member to update normal tokens", endpoint));
}

bool should_sort_tokens = false;
Expand Down Expand Up @@ -554,8 +554,8 @@ const std::unordered_map<inet_address, host_id>& token_metadata_impl::get_endpoi
return _endpoint_to_host_id_map;
}

bool token_metadata_impl::is_normal_token_owner(inet_address endpoint) const {
return _normal_token_owners.contains(endpoint);
bool token_metadata_impl::is_member(inet_address endpoint) const {
return _topology.has_endpoint(endpoint, topology::pending::no);
}

void token_metadata_impl::add_bootstrap_token(token t, inet_address endpoint) {
Expand Down Expand Up @@ -1123,8 +1123,8 @@ token_metadata::remove_endpoint(inet_address endpoint) {
}

bool
token_metadata::is_normal_token_owner(inet_address endpoint) const {
return _impl->is_normal_token_owner(endpoint);
token_metadata::is_member(inet_address endpoint) const {
return _impl->is_member(endpoint);
}

bool
Expand Down
4 changes: 1 addition & 3 deletions locator/token_metadata.hh
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,7 @@ public:

void remove_endpoint(inet_address endpoint);

// Checks if the node is part of the token ring. If yes, the node is one of
// the nodes that owns the tokens and inside the set _normal_token_owners.
bool is_normal_token_owner(inet_address endpoint) const;
bool is_member(inet_address endpoint) const;

bool is_leaving(inet_address endpoint) const;

Expand Down
2 changes: 1 addition & 1 deletion repair/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1558,7 +1558,7 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
auto temp = tmptr->clone_after_all_left().get0();
// leaving_node might or might not be 'leaving'. If it was not leaving (that is, removenode
// command was used), it is still present in temp and must be removed.
if (temp.is_normal_token_owner(leaving_node)) {
if (temp.is_member(leaving_node)) {
temp.remove_endpoint(leaving_node);
}
std::unordered_map<dht::token_range, repair_neighbors> range_sources;
Expand Down
2 changes: 1 addition & 1 deletion service/migration_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1141,7 +1141,7 @@ future<> migration_manager::on_change(gms::inet_address endpoint, gms::applicati
mlogger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint);
return make_ready_future();
}
if (_storage_proxy.get_token_metadata_ptr()->is_normal_token_owner(endpoint)) {
if (_storage_proxy.get_token_metadata_ptr()->is_member(endpoint)) {
schedule_schema_pull(endpoint, *ep_state);
}
}
Expand Down
30 changes: 15 additions & 15 deletions service/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
if (!is_replacing()) {
auto tmptr = get_token_metadata_ptr();

if (tmptr->is_normal_token_owner(get_broadcast_address())) {
if (tmptr->is_member(get_broadcast_address())) {
throw std::runtime_error("This node is already a member of the token ring; bootstrap aborted. (If replacing a dead node, remove the old one from the ring first.)");
}
slogger.info("getting bootstrap token");
Expand Down Expand Up @@ -866,7 +866,7 @@ future<> storage_service::handle_state_bootstrap(inet_address endpoint) {
// continue.
auto tmlock = co_await get_token_metadata_lock();
auto tmptr = co_await get_mutable_token_metadata_ptr();
if (tmptr->is_normal_token_owner(endpoint)) {
if (tmptr->is_member(endpoint)) {
// If isLeaving is false, we have missed both LEAVING and LEFT. However, if
// isLeaving is true, we have only missed LEFT. Waiting time between completing
// leave operation and rebootstrapping is relatively short, so the latter is quite
Expand Down Expand Up @@ -895,7 +895,7 @@ future<> storage_service::handle_state_normal(inet_address endpoint) {

auto tmlock = std::make_unique<token_metadata_lock>(co_await get_token_metadata_lock());
auto tmptr = co_await get_mutable_token_metadata_ptr();
if (tmptr->is_normal_token_owner(endpoint)) {
if (tmptr->is_member(endpoint)) {
slogger.info("Node {} state jump to normal", endpoint);
}
std::unordered_set<inet_address> endpoints_to_remove;
Expand Down Expand Up @@ -973,7 +973,7 @@ future<> storage_service::handle_state_normal(inet_address endpoint) {
}
}

bool is_normal_token_owner = tmptr->is_normal_token_owner(endpoint);
bool is_member = tmptr->is_member(endpoint);
bool do_notify_joined = false;

if (endpoints_to_remove.contains(endpoint)) [[unlikely]] {
Expand All @@ -988,7 +988,7 @@ future<> storage_service::handle_state_normal(inet_address endpoint) {
// Update pending ranges after update of normal tokens immediately to avoid
// a race where natural endpoint was updated to contain node A, but A was
// not yet removed from pending endpoints
if (!is_normal_token_owner) {
if (!is_member) {
auto dc_rack = get_dc_rack_for(endpoint);
slogger.debug("handle_state_normal: update_topology: endpoint={} dc={} rack={}", endpoint, dc_rack.dc, dc_rack.rack);
tmptr->update_topology(endpoint, std::move(dc_rack));
Expand All @@ -1004,7 +1004,7 @@ future<> storage_service::handle_state_normal(inet_address endpoint) {
for (auto ep : endpoints_to_remove) {
co_await remove_endpoint(ep);
}
slogger.debug("handle_state_normal: endpoint={} is_normal_token_owner={} endpoint_to_remove={} owned_tokens={}", endpoint, is_normal_token_owner, endpoints_to_remove.contains(endpoint), owned_tokens);
slogger.debug("handle_state_normal: endpoint={} is_member={} endpoint_to_remove={} owned_tokens={}", endpoint, is_member, endpoints_to_remove.contains(endpoint), owned_tokens);
if (!owned_tokens.empty() && !endpoints_to_remove.count(endpoint)) {
co_await update_peer_info(endpoint);
try {
Expand Down Expand Up @@ -1040,7 +1040,7 @@ future<> storage_service::handle_state_leaving(inet_address endpoint) {
// leave). This way we'll get pending ranges right.
auto tmlock = co_await get_token_metadata_lock();
auto tmptr = co_await get_mutable_token_metadata_ptr();
if (!tmptr->is_normal_token_owner(endpoint)) {
if (!tmptr->is_member(endpoint)) {
// FIXME: this code should probably resolve token collisions too, like handle_state_normal
slogger.info("Node {} state jump to leaving", endpoint);

Expand Down Expand Up @@ -1107,7 +1107,7 @@ future<> storage_service::handle_state_removing(inet_address endpoint, std::vect
}
co_return;
}
if (get_token_metadata().is_normal_token_owner(endpoint)) {
if (get_token_metadata().is_member(endpoint)) {
auto state = pieces[0];
auto remove_tokens = get_token_metadata().get_tokens(endpoint);
if (sstring(gms::versioned_value::REMOVED_TOKEN) == state) {
Expand Down Expand Up @@ -1172,23 +1172,23 @@ future<> storage_service::on_join(gms::inet_address endpoint, gms::endpoint_stat

future<> storage_service::on_alive(gms::inet_address endpoint, gms::endpoint_state state) {
slogger.debug("endpoint={} on_alive", endpoint);
bool is_normal_token_owner = get_token_metadata().is_normal_token_owner(endpoint);
if (is_normal_token_owner) {
bool is_member = get_token_metadata().is_member(endpoint);
if (is_member) {
co_await notify_up(endpoint);
}
bool replacing_pending_ranges = _replacing_nodes_pending_ranges_updater.contains(endpoint);
if (replacing_pending_ranges) {
_replacing_nodes_pending_ranges_updater.erase(endpoint);
}

if (!is_normal_token_owner || replacing_pending_ranges) {
if (!is_member || replacing_pending_ranges) {
auto tmlock = co_await get_token_metadata_lock();
auto tmptr = co_await get_mutable_token_metadata_ptr();
if (replacing_pending_ranges) {
slogger.info("Trigger pending ranges updater for replacing node {}", endpoint);
co_await handle_state_replacing_update_pending_ranges(tmptr, endpoint);
}
if (!is_normal_token_owner) {
if (!is_member) {
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::topology::pending::yes);
}
co_await replicate_to_all_cores(std::move(tmptr));
Expand Down Expand Up @@ -1235,7 +1235,7 @@ future<> storage_service::on_change(inet_address endpoint, application_state sta
slogger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint);
co_return;
}
if (get_token_metadata().is_normal_token_owner(endpoint)) {
if (get_token_metadata().is_member(endpoint)) {
slogger.debug("endpoint={} on_change: updating system.peers table", endpoint);
co_await do_update_system_peers_table(endpoint, state, value);
if (state == application_state::RPC_READY) {
Expand Down Expand Up @@ -1966,7 +1966,7 @@ future<> storage_service::decommission() {
auto tmptr = ss.get_token_metadata_ptr();
auto& db = ss._db.local();
auto endpoint = ss.get_broadcast_address();
if (!tmptr->is_normal_token_owner(endpoint)) {
if (!tmptr->is_member(endpoint)) {
throw std::runtime_error("local node is not a member of the token ring yet");
}

Expand Down Expand Up @@ -2827,7 +2827,7 @@ future<std::unordered_multimap<dht::token_range, inet_address>> storage_service:

// endpoint might or might not be 'leaving'. If it was not leaving (that is, removenode
// command was used), it is still present in temp and must be removed.
if (temp.is_normal_token_owner(endpoint)) {
if (temp.is_member(endpoint)) {
temp.remove_endpoint(endpoint);
}

Expand Down

0 comments on commit 6878de0

Please sign in to comment.