Skip to content

Commit

Permalink
Merge 'Fix issues with endpoint state replication to other shards' fr…
Browse files Browse the repository at this point in the history
…om Tomasz

Fixes #3798
Fixes #3694

Tests:

  unit(release), dtest([new] cql_tests.py:TruncateTester.truncate_after_restart_test)

* tag 'fix-gossip-shard-replication-v1' of github.com:tgrabiec/scylla:
  gms/gossiper: Replicate enpoint states in add_saved_endpoint()
  gms/gossiper: Make reset_endpoint_state_map() have effect on all shards
  gms/gossiper: Replicate STATUS change from mark_as_shutdown() to other shards
  gms/gossiper: Always override states from older generations
  • Loading branch information
duarten committed Oct 8, 2018
2 parents 4b16867 + 3c7de9f commit 48ebe65
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 32 deletions.
22 changes: 2 additions & 20 deletions gms/endpoint_state.hh
Expand Up @@ -129,26 +129,8 @@ public:
update_is_normal();
}

void apply_application_state(application_state key, versioned_value&& value) {
auto&& e = _application_state[key];
if (e.version < value.version) {
e = std::move(value);
}
update_is_normal();
}

void apply_application_state(application_state key, const versioned_value& value) {
auto&& e = _application_state[key];
if (e.version < value.version) {
e = value;
}
update_is_normal();
}

void apply_application_state(const endpoint_state& es) {
for (auto&& e : es._application_state) {
apply_application_state(e.first, e.second);
}
void add_application_state(const endpoint_state& es) {
_application_state = es._application_state;
update_is_normal();
}

Expand Down
15 changes: 10 additions & 5 deletions gms/gossiper.cc
Expand Up @@ -930,7 +930,7 @@ void gossiper::make_random_gossip_digest(utils::chunked_vector<gossip_digest>& g
future<> gossiper::replicate(inet_address ep, const endpoint_state& es) {
return container().invoke_on_all([ep, es, orig = engine().cpu_id(), self = shared_from_this()] (gossiper& g) {
if (engine().cpu_id() != orig) {
g.endpoint_state_map[ep].apply_application_state(es);
g.endpoint_state_map[ep].add_application_state(es);
}
});
}
Expand All @@ -939,7 +939,7 @@ future<> gossiper::replicate(inet_address ep, const std::map<application_state,
return container().invoke_on_all([ep, &src, &changed, orig = engine().cpu_id(), self = shared_from_this()] (gossiper& g) {
if (engine().cpu_id() != orig) {
for (auto&& key : changed) {
g.endpoint_state_map[ep].apply_application_state(key, src.at(key));
g.endpoint_state_map[ep].add_application_state(key, src.at(key));
}
}
});
Expand All @@ -948,7 +948,7 @@ future<> gossiper::replicate(inet_address ep, const std::map<application_state,
future<> gossiper::replicate(inet_address ep, application_state key, const versioned_value& value) {
return container().invoke_on_all([ep, key, &value, orig = engine().cpu_id(), self = shared_from_this()] (gossiper& g) {
if (engine().cpu_id() != orig) {
g.endpoint_state_map[ep].apply_application_state(key, value);
g.endpoint_state_map[ep].add_application_state(key, value);
}
});
}
Expand Down Expand Up @@ -1175,11 +1175,13 @@ stdx::optional<endpoint_state> gossiper::get_endpoint_state_for_endpoint(inet_ad
}
}

void gossiper::reset_endpoint_state_map() {
endpoint_state_map.clear();
future<> gossiper::reset_endpoint_state_map() {
_unreachable_endpoints.clear();
_live_endpoints.clear();
_live_endpoints_just_added.clear();
return container().invoke_on_all([] (gossiper& g) {
g.endpoint_state_map.clear();
});
}

std::unordered_map<inet_address, endpoint_state>& gms::gossiper::get_endpoint_states() {
Expand Down Expand Up @@ -1662,6 +1664,7 @@ void gossiper::maybe_initialize_local_state(int generation_nbr) {
}
}

// Runs inside seastar::async context
void gossiper::add_saved_endpoint(inet_address ep) {
if (ep == get_broadcast_address()) {
logger.debug("Attempt to add self as saved endpoint");
Expand All @@ -1687,6 +1690,7 @@ void gossiper::add_saved_endpoint(inet_address ep) {
}
ep_state.mark_dead();
endpoint_state_map[ep] = ep_state;
replicate(ep, ep_state).get();
_unreachable_endpoints[ep] = now();
logger.trace("Adding saved endpoint {} {}", ep, ep_state.get_heart_beat_state().get_generation());
}
Expand Down Expand Up @@ -1924,6 +1928,7 @@ void gossiper::mark_as_shutdown(const inet_address& endpoint) {
auto& ep_state = *es;
ep_state.add_application_state(application_state::STATUS, storage_service_value_factory().shutdown(true));
ep_state.get_heart_beat_state().force_highest_possible_version_unsafe();
replicate(endpoint, ep_state).get();
mark_dead(endpoint, ep_state);
get_local_failure_detector().force_conviction(endpoint);
}
Expand Down
2 changes: 1 addition & 1 deletion gms/gossiper.hh
Expand Up @@ -417,7 +417,7 @@ public:
stdx::optional<endpoint_state> get_endpoint_state_for_endpoint(inet_address ep) const;

// removes ALL endpoint states; should only be called after shadow gossip
void reset_endpoint_state_map();
future<> reset_endpoint_state_map();

std::unordered_map<inet_address, endpoint_state>& get_endpoint_states();

Expand Down
13 changes: 7 additions & 6 deletions service/storage_service.cc
Expand Up @@ -353,7 +353,7 @@ void storage_service::prepare_to_join(std::vector<inet_address> loaded_endpoints
gossiper.check_knows_remote_features(local_features, peer_features);
}

gossiper.reset_endpoint_state_map();
gossiper.reset_endpoint_state_map().get();
for (auto ep : loaded_endpoints) {
gossiper.add_saved_endpoint(ep);
}
Expand All @@ -367,7 +367,7 @@ void storage_service::prepare_to_join(std::vector<inet_address> loaded_endpoints
slogger.info("Checking remote features with gossip");
gossiper.do_shadow_round().get();
gossiper.check_knows_remote_features(local_features);
gossiper.reset_endpoint_state_map();
gossiper.reset_endpoint_state_map().get();
for (auto ep : loaded_endpoints) {
gossiper.add_saved_endpoint(ep);
}
Expand Down Expand Up @@ -1570,7 +1570,7 @@ future<> storage_service::check_for_endpoint_collision() {
throw std::runtime_error("Other bootstrapping/leaving/moving nodes detected, cannot bootstrap while consistent_rangemovement is true (check_for_endpoint_collision)");
} else {
gossiper.goto_shadow_round();
gossiper.reset_endpoint_state_map();
gossiper.reset_endpoint_state_map().get();
found_bootstrapping_node = true;
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(gms::gossiper::clk::now() - t).count();
slogger.info("Checking bootstrapping/leaving/moving nodes: node={}, status={}, sleep 1 second and check again ({} seconds elapsed) (check_for_endpoint_collision)", addr, state, elapsed);
Expand All @@ -1582,7 +1582,7 @@ future<> storage_service::check_for_endpoint_collision() {
}
} while (found_bootstrapping_node);
slogger.info("Checking bootstrapping/leaving/moving nodes: ok (check_for_endpoint_collision)");
gossiper.reset_endpoint_state_map();
gossiper.reset_endpoint_state_map().get();
});
}

Expand Down Expand Up @@ -1632,8 +1632,9 @@ future<std::unordered_set<token>> storage_service::prepare_replacement_info() {
auto tokens = get_tokens_for(replace_address);
// use the replacee's host Id as our own so we receive hints, etc
return db::system_keyspace::set_local_host_id(host_id).discard_result().then([replace_address, tokens = std::move(tokens)] {
gms::get_local_gossiper().reset_endpoint_state_map(); // clean up since we have what we need
return make_ready_future<std::unordered_set<token>>(std::move(tokens));
return gms::get_local_gossiper().reset_endpoint_state_map().then([tokens = std::move(tokens)] { // clean up since we have what we need
return make_ready_future<std::unordered_set<token>>(std::move(tokens));
});
});
});
}
Expand Down

0 comments on commit 48ebe65

Please sign in to comment.