Skip to content

Commit

Permalink
gossiper: modify endpoint state only via replicate
Browse files Browse the repository at this point in the history
And restrict the accessor methods to return const pointers
or refrences.

With that, the endpoint_state_ptr:s held in the _endpoint_state_map
point to immutable endpoint_state objects - with one exception:
the endpoint_state update_timestamp may be updated in place,
but the endpoint_state_map is immutable.

replicate() replaces the endpoint_state_ptr in the map
with a new one to maintain immutability.

A later change will also make this exception safe so
replicate will guarantee strong exception safety so that all shards
are updated or none of them.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
  • Loading branch information
bhalevy committed Aug 31, 2023
1 parent d00e49a commit 1d04242
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 46 deletions.
80 changes: 39 additions & 41 deletions gms/gossiper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -568,9 +568,9 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint
// If state does not exist just add it. If it does then add it if the remote generation is greater.
// If there is a generation tie, attempt to break it by heartbeat version.
auto permit = co_await this->lock_endpoint(node, null_permit_id);
auto* es = this->get_mutable_endpoint_state_ptr(node);
auto es = this->get_endpoint_state_ptr(node);
if (es) {
endpoint_state& local_state = *es;
endpoint_state local_state = *es;
auto local_generation = local_state.get_heart_beat_state().get_generation();
auto remote_generation = remote_state.get_heart_beat_state().get_generation();
logger.trace("{} local generation {}, remote generation {}", node, local_generation, remote_generation);
Expand All @@ -585,7 +585,7 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint
co_await this->handle_major_state_change(node, remote_state, permit.id());
} else {
logger.debug("Applying remote_state for node {} (remote generation > local generation)", node);
_endpoint_state_map[node] = make_endpoint_state_ptr(remote_state);
co_await replicate(node, remote_state, permit.id());
}
} else if (remote_generation == local_generation) {
if (listener_notification) {
Expand All @@ -602,6 +602,7 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint
this->mark_alive(node);
}
} else {
bool update = false;
for (const auto& item : remote_state.get_application_state_map()) {
const auto& remote_key = item.first;
const auto& remote_value = item.second;
Expand All @@ -610,10 +611,16 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint
logger.debug("Applying remote_state for node {} (remote generation = local generation), key={}, value={}",
node, remote_key, remote_value);
local_state.add_application_state(remote_key, remote_value);
update = true;
} else {
logger.debug("Ignoring remote_state for node {} (remote generation = local generation), key={}, value={}", node, remote_key, remote_value);
logger.trace("Ignoring remote_state for node {} (remote generation = local generation), key={}, value={}", node, remote_key, remote_value);
}
}
if (update) {
co_await replicate(node, std::move(local_state), permit.id());
} else {
logger.debug("Ignoring remote_state for node {} (remote generation = local generation)", node);
}
}
} else {
logger.debug("Ignoring remote generation {} < {}", remote_generation, local_generation);
Expand All @@ -623,7 +630,7 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint
co_await this->handle_major_state_change(node, remote_state, permit.id());
} else {
logger.debug("Applying remote_state for node {} (new node)", node);
_endpoint_state_map[node] = make_endpoint_state_ptr(remote_state);
co_await replicate(node, remote_state, permit.id());
}
}
}
Expand Down Expand Up @@ -1251,12 +1258,12 @@ void gossiper::make_random_gossip_digest(utils::chunked_vector<gossip_digest>& g
#endif
}

future<> gossiper::replicate(inet_address ep, const endpoint_state& es, permit_id pid) {
future<> gossiper::replicate(inet_address ep, endpoint_state es, permit_id pid) {
verify_permit(ep, pid);
return container().invoke_on_all([ep, es, orig = this_shard_id(), self = shared_from_this()] (gossiper& g) {
if (this_shard_id() != orig) {
g.get_or_create_endpoint_state(ep).add_application_state(es);
}
// FIXME: make exception safe to ensure that the state
// will end up consistent on all shards
return container().invoke_on_all([ep, es = std::move(es)] (gossiper& g) {
g._endpoint_state_map[ep] = make_endpoint_state_ptr(es);
});
}

Expand All @@ -1270,8 +1277,7 @@ future<> gossiper::advertise_token_removed(inet_address endpoint, locator::host_
eps.add_application_state(application_state::STATUS, versioned_value::removed_nonlocal(host_id, expire_time.time_since_epoch().count()));
logger.info("Completing removal of {}", endpoint);
add_expire_time_for_endpoint(endpoint, expire_time);
_endpoint_state_map[endpoint] = make_endpoint_state_ptr(std::move(eps));
co_await replicate(endpoint, eps, pid);
co_await replicate(endpoint, std::move(eps), pid);
// ensure at least one gossip round occurs before returning
co_await sleep_abortable(INTERVAL * 2, _abort_source);
}
Expand All @@ -1286,7 +1292,7 @@ future<> gossiper::assassinate_endpoint(sstring address) {
return seastar::async([&gossiper, g = gossiper.shared_from_this(), address] {
inet_address endpoint(address);
auto permit = gossiper.lock_endpoint(endpoint, null_permit_id).get0();
auto es = gossiper.get_mutable_endpoint_state_ptr(endpoint);
auto es = gossiper.get_endpoint_state_ptr(endpoint);
auto now = gossiper.now();
generation_type gen(std::chrono::duration_cast<std::chrono::seconds>((now + std::chrono::seconds(60)).time_since_epoch()).count());
version_type ver(9999);
Expand All @@ -1307,7 +1313,7 @@ future<> gossiper::assassinate_endpoint(sstring address) {
// make sure it did not change
sleep_abortable(ring_delay, gossiper._abort_source).get();

es = gossiper.get_mutable_endpoint_state_ptr(endpoint);
es = gossiper.get_endpoint_state_ptr(endpoint);
if (!es) {
logger.warn("Endpoint {} disappeared while trying to assassinate, continuing anyway", endpoint);
} else {
Expand Down Expand Up @@ -1401,16 +1407,11 @@ endpoint_state_ptr gossiper::get_endpoint_state_ptr(inet_address ep) const noexc
}
}

endpoint_state* gossiper::get_mutable_endpoint_state_ptr(inet_address ep) noexcept {
auto it = _endpoint_state_map.find(ep);
if (it == _endpoint_state_map.end()) {
return nullptr;
} else {
return const_cast<endpoint_state*>(it->second.get());
}
void gossiper::update_timestamp(const endpoint_state_ptr& eps) noexcept {
const_cast<endpoint_state&>(*eps).update_timestamp();
}

endpoint_state& gossiper::get_endpoint_state(inet_address ep) {
const endpoint_state& gossiper::get_endpoint_state(inet_address ep) const {
auto it = _endpoint_state_map.find(ep);
if (it == _endpoint_state_map.end()) {
throw std::out_of_range(format("ep={}", ep));
Expand Down Expand Up @@ -1562,7 +1563,7 @@ void gossiper::update_timestamp_for_nodes(const std::map<inet_address, endpoint_
for (const auto& x : map) {
const gms::inet_address& endpoint = x.first;
const endpoint_state& remote_endpoint_state = x.second;
auto* local_endpoint_state = get_mutable_endpoint_state_ptr(endpoint);
auto local_endpoint_state = get_endpoint_state_ptr(endpoint);
if (local_endpoint_state) {
bool update = false;
auto local_generation = local_endpoint_state->get_heart_beat_state().get_generation();
Expand All @@ -1578,7 +1579,7 @@ void gossiper::update_timestamp_for_nodes(const std::map<inet_address, endpoint_
}
if (update) {
logger.trace("Updated timestamp for node {}", endpoint);
local_endpoint_state->update_timestamp();
update_timestamp(local_endpoint_state);
}
}
}
Expand Down Expand Up @@ -1620,7 +1621,7 @@ future<> gossiper::real_mark_alive(inet_address addr) {
// After sending echo message, the Node might not be in the
// _endpoint_state_map anymore, use the reference of local_state
// might cause user-after-free
auto* es = get_mutable_endpoint_state_ptr(addr);
auto es = get_endpoint_state_ptr(addr);
if (!es) {
logger.info("Node {} is not in endpoint_state_map anymore", addr);
co_return;
Expand All @@ -1635,11 +1636,8 @@ future<> gossiper::real_mark_alive(inet_address addr) {

logger.debug("Mark Node {} alive after EchoMessage", addr);

auto& local_state = *es;
local_state.update_timestamp(); // prevents do_status_check from racing us and evicting if it was down > A_VERY_LONG_TIME

// Make a copy for endpoint_state because the code below can yield
endpoint_state state = local_state;
// prevents do_status_check from racing us and evicting if it was down > A_VERY_LONG_TIME
update_timestamp(es);

logger.debug("removing expire time for endpoint : {}", addr);
_unreachable_endpoints.erase(addr);
Expand Down Expand Up @@ -1668,8 +1666,8 @@ future<> gossiper::real_mark_alive(inet_address addr) {
logger.info("InetAddress {} is now UP, status = {}", addr, status);
}

co_await _subscribers.for_each([addr, state = std::move(state), pid = permit.id()] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) -> future<> {
co_await subscriber->on_alive(addr, state, pid);
co_await _subscribers.for_each([addr, es, pid = permit.id()] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) -> future<> {
co_await subscriber->on_alive(addr, *es, pid);
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
});
}
Expand Down Expand Up @@ -1701,7 +1699,6 @@ future<> gossiper::handle_major_state_change(inet_address ep, const endpoint_sta
}
}
logger.trace("Adding endpoint state for {}, status = {}", ep, get_gossip_status(eps));
_endpoint_state_map[ep] = make_endpoint_state_ptr(eps);
co_await replicate(ep, eps, pid);

if (is_in_shadow_round()) {
Expand Down Expand Up @@ -1968,7 +1965,7 @@ future<> gossiper::start_gossiping(gms::generation_type generation_nbr, std::map
generation_nbr = gms::generation_type(_force_gossip_generation());
logger.warn("Use the generation number provided by user: generation = {}", generation_nbr);
}
endpoint_state& local_state = my_endpoint_state();
endpoint_state local_state = my_endpoint_state();
local_state.set_heart_beat_state_and_update_timestamp(heart_beat_state(generation_nbr));
for (auto& entry : preload_local_states) {
local_state.add_application_state(entry.first, entry.second);
Expand Down Expand Up @@ -2138,10 +2135,10 @@ future<> gossiper::add_saved_endpoint(inet_address ep) {
if (host_id) {
ep_state.add_application_state(gms::application_state::HOST_ID, versioned_value::host_id(host_id.value()));
}
_endpoint_state_map[ep] = make_endpoint_state_ptr(ep_state);
auto generation = ep_state.get_heart_beat_state().get_generation();
co_await replicate(ep, ep_state, permit.id());
_unreachable_endpoints[ep] = now();
logger.trace("Adding saved endpoint {} {}", ep, ep_state.get_heart_beat_state().get_generation());
logger.trace("Adding saved endpoint {} {}", ep, generation);
}

future<> gossiper::add_local_application_state(application_state state, versioned_value value) {
Expand Down Expand Up @@ -2196,11 +2193,12 @@ future<> gossiper::add_local_application_state(std::list<std::pair<application_s
gossiper.do_before_change_notifications(ep_addr, *ep_state_before, state, value).get();
}

auto es = gossiper.get_mutable_endpoint_state_ptr(ep_addr);
auto es = gossiper.get_endpoint_state_ptr(ep_addr);
if (!es) {
return;
}

auto local_state = *es;
for (auto& p : states) {
auto& state = p.first;
auto& value = p.second;
Expand All @@ -2209,14 +2207,14 @@ future<> gossiper::add_local_application_state(std::list<std::pair<application_s
// if another value with a newer version was received in the meantime:
value = versioned_value::clone_with_higher_version(value);
// Add to local application state
es->add_application_state(state, value);
local_state.add_application_state(state, value);
}

// It is OK to replicate the new endpoint_state
// after all application states were modified as a batch.
// We guarantee that the on_change notifications
// will be called in the order given by `states` anyhow.
gossiper.replicate(ep_addr, *es, permit.id()).get();
gossiper.replicate(ep_addr, std::move(local_state), permit.id()).get();

for (auto& p : states) {
auto& state = p.first;
Expand Down Expand Up @@ -2427,9 +2425,9 @@ sstring gossiper::get_application_state_value(inet_address endpoint, application
*/
future<> gossiper::mark_as_shutdown(const inet_address& endpoint, permit_id pid) {
verify_permit(endpoint, pid);
auto es = get_mutable_endpoint_state_ptr(endpoint);
auto es = get_endpoint_state_ptr(endpoint);
if (es) {
auto& ep_state = *es;
auto ep_state = *es;
ep_state.add_application_state(application_state::STATUS, versioned_value::shutdown(true));
ep_state.get_heart_beat_state().force_highest_possible_version_unsafe();
co_await replicate(endpoint, ep_state, pid);
Expand Down
13 changes: 8 additions & 5 deletions gms/gossiper.hh
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ private:
// Replicates given endpoint_state to all other shards.
// The state state doesn't have to be kept alive around until completes.
// Must be called under lock_endpoint.
future<> replicate(inet_address, const endpoint_state&, permit_id);
future<> replicate(inet_address, endpoint_state, permit_id);
public:
explicit gossiper(abort_source& as, const locator::shared_token_metadata& stm, netw::messaging_service& ms, const db::config& cfg, gossip_config gcfg);

Expand Down Expand Up @@ -416,6 +416,8 @@ public:

// Gets a shared pointer to the endpoint_state, if exists.
// Otherwise, returns a null ptr.
// The endpoint_state is immutable (except for its update_timestamp), guaranteed not to change while
// the endpoint_state_ptr is held.
endpoint_state_ptr get_endpoint_state_ptr(inet_address ep) const noexcept;

const versioned_value* get_application_state_ptr(inet_address endpoint, application_state appstate) const noexcept;
Expand Down Expand Up @@ -465,16 +467,17 @@ public:
*/
sstring get_rpc_address(const inet_address& endpoint) const;
private:
// FIXME: for now, allow modifying the endpoint_state in place
// until all updates are applied only using replicate
// FIXME: for now, allow modifying the endpoint_state's heartbeat_state in place
// Gets or creates endpoint_state for this node
endpoint_state& get_or_create_endpoint_state(inet_address ep);
endpoint_state& my_endpoint_state() {
return get_or_create_endpoint_state(get_broadcast_address());
}

endpoint_state* get_mutable_endpoint_state_ptr(inet_address ep) noexcept;
endpoint_state& get_endpoint_state(inet_address ep);
// Use with care, as the endpoint_state_ptr in the endpoint_state_map is considered
// immutable, with one exception - the update_timestamp.
void update_timestamp(const endpoint_state_ptr& eps) noexcept;
const endpoint_state& get_endpoint_state(inet_address ep) const;

void update_timestamp_for_nodes(const std::map<inet_address, endpoint_state>& map);

Expand Down

0 comments on commit 1d04242

Please sign in to comment.