diff --git a/gms/gossiper.cc b/gms/gossiper.cc index e7895c076792..1338f69f86bd 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1260,11 +1260,31 @@ void gossiper::make_random_gossip_digest(utils::chunked_vector& g future<> gossiper::replicate(inet_address ep, endpoint_state es, permit_id pid) { verify_permit(ep, pid); - // FIXME: make exception safe to ensure that the state - // will end up consistent on all shards - return container().invoke_on_all([ep, es = std::move(es)] (gossiper& g) { - g._endpoint_state_map[ep] = make_endpoint_state_ptr(es); - }); + // First pass: replicate the new endpoint_state on all shards. + // Use foreign_ptr to ensure destroy on remote shards on exception + std::vector> ep_states; + ep_states.resize(smp::count); + es.update_is_normal(); + auto p = make_foreign(make_endpoint_state_ptr(std::move(es))); + const auto *eps = p.get(); + ep_states[this_shard_id()] = std::move(p); + co_await coroutine::parallel_for_each(boost::irange(0u, smp::count), [&, orig = this_shard_id()] (auto shard) -> future<> { + if (shard != orig) { + ep_states[shard] = co_await smp::submit_to(shard, [eps] { + return make_foreign(make_endpoint_state_ptr(*eps)); + }); + } + }); + // Second pass: set replicated endpoint_state on all shards + // Must not throw + try { + co_return co_await container().invoke_on_all([&] (gossiper& g) { + auto eps = ep_states[this_shard_id()].release(); + g._endpoint_state_map[ep] = std::move(eps); + }); + } catch (...) { + on_fatal_internal_error(logger, fmt::format("Failed to replicate endpoint_state: {}", std::current_exception())); + } } future<> gossiper::advertise_token_removed(inet_address endpoint, locator::host_id host_id, permit_id pid) { @@ -1970,7 +1990,7 @@ future<> gossiper::start_gossiping(gms::generation_type generation_nbr, std::map auto generation = local_state.get_heart_beat_state().get_generation(); - co_await replicate(get_broadcast_address(), local_state, permit.id()); + co_await replicate(get_broadcast_address(), std::move(local_state), permit.id()); logger.trace("gossip started with generation {}", generation); _enabled = true; @@ -2133,7 +2153,7 @@ future<> gossiper::add_saved_endpoint(inet_address ep) { ep_state.add_application_state(gms::application_state::HOST_ID, versioned_value::host_id(host_id.value())); } auto generation = ep_state.get_heart_beat_state().get_generation(); - co_await replicate(ep, ep_state, permit.id()); + co_await replicate(ep, std::move(ep_state), permit.id()); _unreachable_endpoints[ep] = now(); logger.trace("Adding saved endpoint {} {}", ep, generation); } @@ -2427,7 +2447,7 @@ future<> gossiper::mark_as_shutdown(const inet_address& endpoint, permit_id pid) auto ep_state = *es; ep_state.add_application_state(application_state::STATUS, versioned_value::shutdown(true)); ep_state.get_heart_beat_state().force_highest_possible_version_unsafe(); - co_await replicate(endpoint, ep_state, pid); + co_await replicate(endpoint, std::move(ep_state), pid); co_await mark_dead(endpoint, get_endpoint_state_ptr(endpoint), pid); } }