Skip to content

Commit

Permalink
Merge 'storage_service: wait for normal state handlers earlier in the…
Browse files Browse the repository at this point in the history
… boot procedure' from Kamil Braun

The `wait_for_normal_state_handled_on_boot` function waits until
`handle_state_normal` finishes for the given set of nodes. It was used
in `run_bootstrap_ops` and `run_replace_ops` to wait until NORMAL states
of existing nodes in the cluster are processed by the joining node
before continuing the joining process. One reason to do it is because at
the end of `handle_state_normal` the joining node might drop connections
to the NORMAL nodes in order to reestablish new connections using
correct encryption settings. In tests we observed that the connection
drop was happening in the middle of repair/streaming, causing
repair/streaming to abort.

Unfortunately, calling `wait_for_normal_state_handled_on_boot` in
`run_bootstrap_ops`/`run_replace_ops` is too late to fix all problems.
Before either of these two functions, we create a new CDC generation and
write the data to `system_distributed_everywhere.cdc_generation_descriptions_v2`.
In tests, the connections were sometimes dropped while this write was
in-flight. This would cause the write to never arrive to other nodes,
and the joining node would timeout waiting for confirmations.

To fix this, call `wait_for_normal_state_handled_on_boot` earlier in the
boot procedure, before `make_new_generation` call which does the write.

Fixes: #13302

Closes #13317

* github.com:scylladb/scylladb:
  storage_service: wait for normal state handlers earlier in the boot procedure
  storage_service: bootstrap: wait for normal tokens to arrive in all cases
  storage_service: extract get_nodes_to_sync_with helper
  storage_service: return unordered_set from get_ignore_dead_nodes_for_replace
  • Loading branch information
tgrabiec committed Mar 27, 2023
2 parents cd282cf + 0b19a61 commit 79ee381
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 36 deletions.
78 changes: 44 additions & 34 deletions service/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1332,9 +1332,9 @@ future<> storage_service::mark_existing_views_as_built(sharded<db::system_distri
});
}

std::list<gms::inet_address> storage_service::get_ignore_dead_nodes_for_replace(const token_metadata& tm) {
std::unordered_set<gms::inet_address> storage_service::get_ignore_dead_nodes_for_replace(const token_metadata& tm) {
std::vector<sstring> ignore_nodes_strs;
std::list<gms::inet_address> ignore_nodes;
std::unordered_set<gms::inet_address> ignore_nodes;
boost::split(ignore_nodes_strs, _db.local().get_config().ignore_dead_nodes_for_replace(), boost::is_any_of(","));
for (std::string n : ignore_nodes_strs) {
try {
Expand All @@ -1343,7 +1343,7 @@ std::list<gms::inet_address> storage_service::get_ignore_dead_nodes_for_replace(
boost::trim_all(n);
if (!n.empty()) {
auto ep_and_id = tm.parse_host_id_and_endpoint(n);
ignore_nodes.push_back(ep_and_id.endpoint);
ignore_nodes.insert(ep_and_id.endpoint);
}
} catch (...) {
throw std::runtime_error(format("Failed to parse --ignore-dead-nodes-for-replace parameter: ignore_nodes={}, node={}: {}", ignore_nodes_strs, n, std::current_exception()));
Expand All @@ -1352,6 +1352,21 @@ std::list<gms::inet_address> storage_service::get_ignore_dead_nodes_for_replace(
return ignore_nodes;
}

future<std::unordered_set<gms::inet_address>> storage_service::get_nodes_to_sync_with(
const std::unordered_set<gms::inet_address>& ignore_nodes) {
std::unordered_set<gms::inet_address> result;
for (const auto& [node, _] :_gossiper.get_endpoint_states()) {
co_await coroutine::maybe_yield();
slogger.info("Check node={}, status={}", node, _gossiper.get_gossip_status(node));
if (node != get_broadcast_address() &&
_gossiper.is_normal_ring_member(node) &&
!ignore_nodes.contains(node)) {
result.insert(node);
}
}
co_return result;
}

// Runs inside seastar::async context
future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, std::unordered_set<token>& bootstrap_tokens, std::optional<cdc::generation_id>& cdc_gen_id, const std::optional<replacement_info>& replacement_info) {
return seastar::async([this, &bootstrap_tokens, &cdc_gen_id, &cdc_gen_service, &replacement_info] {
Expand All @@ -1370,7 +1385,7 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st
}
}).get();

if (!replacement_info) {
{
int retry = 0;
while (get_token_metadata_ptr()->count_normal_token_owners() == 0) {
if (retry++ < 500) {
Expand All @@ -1381,13 +1396,27 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st
// We've waited for tokens to arrive.
// But we didn't see any normal token owners. Something's wrong, we cannot proceed.
throw std::runtime_error{
"Failed to learn about other nodes' tokens during bootstrap. Make sure that:\n"
"Failed to learn about other nodes' tokens during bootstrap or replace. Make sure that:\n"
" - the node can contact other nodes in the cluster,\n"
" - the `ring_delay` parameter is large enough (the 30s default should be enough for small-to-middle-sized clusters),\n"
" - a node with this IP didn't recently leave the cluster. If it did, wait for some time first (the IP is quarantined),\n"
"and retry the bootstrap."};
"and retry the bootstrap/replace."};
}
}

{
// Wait for normal state handler to finish for existing nodes in the cluster.
auto ignore_nodes = replacement_info ? get_ignore_dead_nodes_for_replace(get_token_metadata())
// TODO: specify ignore_nodes for bootstrap
: std::unordered_set<gms::inet_address>{};
auto sync_nodes = get_nodes_to_sync_with(ignore_nodes).get();
if (replacement_info) {
sync_nodes.erase(replacement_info->address);
}
wait_for_normal_state_handled_on_boot(sync_nodes).get();
}

if (!replacement_info) {
// Even if we reached this point before but crashed, we will make a new CDC generation.
// It doesn't hurt: other nodes will (potentially) just do more generation switches.
// We do this because with this new attempt at bootstrapping we picked a different set of tokens.
Expand Down Expand Up @@ -3033,18 +3062,8 @@ void storage_service::run_bootstrap_ops(std::unordered_set<token>& bootstrap_tok

auto start_time = std::chrono::steady_clock::now();
for (;;) {
ctl.sync_nodes.clear();
// Step 1: Decide who needs to sync data for bootstrap operation
for (const auto& [node, eps] :_gossiper.get_endpoint_states()) {
seastar::thread::maybe_yield();
slogger.info("bootstrap[{}]: Check node={}, status={}", uuid, node, _gossiper.get_gossip_status(node));
if (node != get_broadcast_address() &&
_gossiper.is_normal_ring_member(node) &&
!ctl.ignore_nodes.contains(node)) {
ctl.sync_nodes.insert(node);
}
}
wait_for_normal_state_handled_on_boot(ctl.sync_nodes, "bootstrap", uuid).get();
ctl.sync_nodes = get_nodes_to_sync_with(ctl.ignore_nodes).get();
ctl.sync_nodes.insert(get_broadcast_address());

// Step 2: Wait until no pending node operations
Expand Down Expand Up @@ -3103,19 +3122,10 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
const auto& uuid = ctl.uuid();
gms::inet_address replace_address = replace_info.address;
auto tmptr = get_token_metadata_ptr();
ctl.ignore_nodes = boost::copy_range<std::unordered_set<inet_address>>(get_ignore_dead_nodes_for_replace(*tmptr));
ctl.ignore_nodes = get_ignore_dead_nodes_for_replace(*tmptr);
// Step 1: Decide who needs to sync data for replace operation
for (const auto& [node, eps] :_gossiper.get_endpoint_states()) {
seastar::thread::maybe_yield();
slogger.debug("replace[{}]: Check node={}, status={}", uuid, node, _gossiper.get_gossip_status(node));
if (node != get_broadcast_address() &&
node != replace_address &&
_gossiper.is_normal_ring_member(node) &&
!ctl.ignore_nodes.contains(node)) {
ctl.sync_nodes.insert(node);
}
}
wait_for_normal_state_handled_on_boot(ctl.sync_nodes, "replace", uuid).get();
ctl.sync_nodes = get_nodes_to_sync_with(ctl.ignore_nodes).get();
ctl.sync_nodes.erase(replace_address);
ctl.sync_nodes.insert(get_broadcast_address());

ctl.start("replace");
Expand Down Expand Up @@ -4783,19 +4793,19 @@ bool storage_service::is_normal_state_handled_on_boot(gms::inet_address node) {
}

// Wait for normal state handler to finish on boot
future<> storage_service::wait_for_normal_state_handled_on_boot(const std::unordered_set<gms::inet_address>& nodes, sstring ops, node_ops_id uuid) {
slogger.info("{}[{}]: Started waiting for normal state handler for nodes {}", ops, uuid, nodes);
future<> storage_service::wait_for_normal_state_handled_on_boot(const std::unordered_set<gms::inet_address>& nodes) {
slogger.info("Started waiting for normal state handler for nodes {}", nodes);
auto start_time = std::chrono::steady_clock::now();
for (auto& node: nodes) {
while (!is_normal_state_handled_on_boot(node)) {
slogger.debug("{}[{}]: Waiting for normal state handler for node {}", ops, uuid, node);
slogger.debug("Waiting for normal state handler for node {}", node);
co_await sleep_abortable(std::chrono::milliseconds(100), _abort_source);
if (std::chrono::steady_clock::now() > start_time + std::chrono::seconds(60)) {
throw std::runtime_error(format("{}[{}]: Node {} did not finish normal state handler, reject the node ops", ops, uuid, node));
throw std::runtime_error(format("Node {} did not finish normal state handler, reject the node ops", node));
}
}
}
slogger.info("{}[{}]: Finished waiting for normal state handler for nodes {}", ops, uuid, nodes);
slogger.info("Finished waiting for normal state handler for nodes {}", nodes);
co_return;
}

Expand Down
6 changes: 4 additions & 2 deletions service/storage_service.hh
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,9 @@ private:
void run_replace_ops(std::unordered_set<token>& bootstrap_tokens, replacement_info replace_info);
void run_bootstrap_ops(std::unordered_set<token>& bootstrap_tokens);

std::list<gms::inet_address> get_ignore_dead_nodes_for_replace(const locator::token_metadata& tm);
std::unordered_set<gms::inet_address> get_ignore_dead_nodes_for_replace(const locator::token_metadata& tm);
future<std::unordered_set<gms::inet_address>> get_nodes_to_sync_with(
const std::unordered_set<gms::inet_address>& ignore_dead_nodes);
future<> wait_for_ring_to_settle(std::chrono::milliseconds delay);

public:
Expand Down Expand Up @@ -758,7 +760,7 @@ public:
private:
std::unordered_set<gms::inet_address> _normal_state_handled_on_boot;
bool is_normal_state_handled_on_boot(gms::inet_address);
future<> wait_for_normal_state_handled_on_boot(const std::unordered_set<gms::inet_address>& nodes, sstring ops, node_ops_id uuid);
future<> wait_for_normal_state_handled_on_boot(const std::unordered_set<gms::inet_address>& nodes);

friend class group0_state_machine;
bool _raft_topology_change_enabled = false;
Expand Down

0 comments on commit 79ee381

Please sign in to comment.