Skip to content

Commit

Permalink
gossiper: replicate: make exception safe
Browse files Browse the repository at this point in the history
First replicate the new endpoint_state on all shards
before applying the replicated endpoint_state objects
to _endpoint_state_map.

Fixes scylladb#14794

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
  • Loading branch information
bhalevy committed Aug 31, 2023
1 parent c16ec87 commit 38c2347
Showing 1 changed file with 28 additions and 8 deletions.
36 changes: 28 additions & 8 deletions gms/gossiper.cc
Expand Up @@ -1260,11 +1260,31 @@ void gossiper::make_random_gossip_digest(utils::chunked_vector<gossip_digest>& g

future<> gossiper::replicate(inet_address ep, endpoint_state es, permit_id pid) {
verify_permit(ep, pid);
// FIXME: make exception safe to ensure that the state
// will end up consistent on all shards
return container().invoke_on_all([ep, es = std::move(es)] (gossiper& g) {
g._endpoint_state_map[ep] = make_endpoint_state_ptr(es);
});
// First pass: replicate the new endpoint_state on all shards.
// Use foreign_ptr<std::unique_ptr> to ensure destroy on remote shards on exception
std::vector<foreign_ptr<endpoint_state_ptr>> ep_states;
ep_states.resize(smp::count);
es.update_is_normal();
auto p = make_foreign(make_endpoint_state_ptr(std::move(es)));
const auto *eps = p.get();
ep_states[this_shard_id()] = std::move(p);
co_await coroutine::parallel_for_each(boost::irange(0u, smp::count), [&, orig = this_shard_id()] (auto shard) -> future<> {
if (shard != orig) {
ep_states[shard] = co_await smp::submit_to(shard, [eps] {
return make_foreign(make_endpoint_state_ptr(*eps));
});
}
});
// Second pass: set replicated endpoint_state on all shards
// Must not throw
try {
co_return co_await container().invoke_on_all([&] (gossiper& g) {
auto eps = ep_states[this_shard_id()].release();
g._endpoint_state_map[ep] = std::move(eps);
});
} catch (...) {
on_fatal_internal_error(logger, fmt::format("Failed to replicate endpoint_state: {}", std::current_exception()));
}
}

future<> gossiper::advertise_token_removed(inet_address endpoint, locator::host_id host_id, permit_id pid) {
Expand Down Expand Up @@ -1970,7 +1990,7 @@ future<> gossiper::start_gossiping(gms::generation_type generation_nbr, std::map

auto generation = local_state.get_heart_beat_state().get_generation();

co_await replicate(get_broadcast_address(), local_state, permit.id());
co_await replicate(get_broadcast_address(), std::move(local_state), permit.id());

logger.trace("gossip started with generation {}", generation);
_enabled = true;
Expand Down Expand Up @@ -2133,7 +2153,7 @@ future<> gossiper::add_saved_endpoint(inet_address ep) {
ep_state.add_application_state(gms::application_state::HOST_ID, versioned_value::host_id(host_id.value()));
}
auto generation = ep_state.get_heart_beat_state().get_generation();
co_await replicate(ep, ep_state, permit.id());
co_await replicate(ep, std::move(ep_state), permit.id());
_unreachable_endpoints[ep] = now();
logger.trace("Adding saved endpoint {} {}", ep, generation);
}
Expand Down Expand Up @@ -2427,7 +2447,7 @@ future<> gossiper::mark_as_shutdown(const inet_address& endpoint, permit_id pid)
auto ep_state = *es;
ep_state.add_application_state(application_state::STATUS, versioned_value::shutdown(true));
ep_state.get_heart_beat_state().force_highest_possible_version_unsafe();
co_await replicate(endpoint, ep_state, pid);
co_await replicate(endpoint, std::move(ep_state), pid);
co_await mark_dead(endpoint, get_endpoint_state_ptr(endpoint), pid);
}
}
Expand Down

0 comments on commit 38c2347

Please sign in to comment.