Skip to content

Commit

Permalink
Merge 'Wait for other nodes to be UP and NORMAL on bootstrap right af…
Browse files Browse the repository at this point in the history
…ter enabling gossiping' from Kamil Braun

`handle_state_normal` may drop connections to the handled node. This
causes spurious failures if there's an ongoing concurrent operation.
This problem was already solved twice in the past in different contexts:
first in 5363616, then in
79ee381.

Time to fix it for the third time. Now we do this right after enabling
gossiping, so hopefully it's the last time.

This time it's causing snapshot transfer failures in group 0. Although
the transfer is retried and eventually succeeds, the failed transfer is
wasted work and causes an annoying ERROR message in the log which
dtests, SCT, and I don't like.

The fix is done by moving the `wait_for_normal_state_handled_on_boot()`
call before `setup_group0()`. But for the wait to work correctly we must
first ensure that gossiper sees an alive node, so we precede it with
`wait_for_live_node_to_show_up()` (before this commit, the call site of
`wait_for_normal_state_handled_on_boot` was already after this wait).

There is another problem: the bootstrap procedure is racing with gossiper
marking nodes as UP, and waiting for other nodes to be NORMAL doesn't guarantee
that they are also UP. If gossiper is quick enough, everything will be fine.
If not, problems may arise such as streaming or repair failing due to nodes
still being marked as DOWN, or the CDC generation write failing.

In general, we need all NORMAL nodes to be up for bootstrap to proceed.
One exception is replace where we ignore the replaced node. The
`sync_nodes` set constructed for `wait_for_normal_state_handled_on_boot`
takes this into account, so we also use it to wait for nodes to be UP.

As explained in commit messages and comments, we only do these
waits outside raft-based-topology mode.

This should improve CI stability.
Fixes: #12972
Refs: #14042

Closes #14354

* github.com:scylladb/scylladb:
  messaging_service: print which connections are dropped due to missing topology info
  storage_service: wait for nodes to be UP on bootstrap
  storage_service: wait for NORMAL state handler before `setup_group0()`
  storage_service: extract `gossiper::wait_for_live_nodes_to_show_up()`
  • Loading branch information
tgrabiec committed Jun 28, 2023
2 parents f6f974c + 1fa9678 commit 50e8ec7
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 34 deletions.
24 changes: 24 additions & 0 deletions gms/gossiper.cc
Expand Up @@ -2223,6 +2223,30 @@ future<> gossiper::wait_alive(std::vector<gms::inet_address> nodes, std::chrono:
}
}

future<> gossiper::wait_for_live_nodes_to_show_up(size_t n) {
logger::rate_limit rate_limit{std::chrono::seconds{5}};
#ifdef SEASTAR_DEBUG
// Account for debug slowness. 3 minutes is probably overkill but we don't want flaky tests.
constexpr auto timeout_delay = std::chrono::minutes{3};
#else
constexpr auto timeout_delay = std::chrono::seconds{30};
#endif
auto timeout = gossiper::clk::now() + timeout_delay;
while (get_live_members().size() < n) {
if (timeout <= gossiper::clk::now()) {
auto err = ::format("Timed out waiting for {} live nodes to show up in gossip", n);
logger.error("{}", err);
throw std::runtime_error{std::move(err)};
}

logger.log(log_level::info, rate_limit,
"Waiting for {} live nodes to show up in gossip, currently {} present...",
n, get_live_members().size());
co_await sleep_abortable(std::chrono::milliseconds(10), _abort_source);
}
logger.info("Live nodes seen in gossip: {}", get_live_members());
}

const versioned_value* gossiper::get_application_state_ptr(inet_address endpoint, application_state appstate) const noexcept {
auto* eps = get_endpoint_state_for_endpoint_ptr(std::move(endpoint));
if (!eps) {
Expand Down
3 changes: 3 additions & 0 deletions gms/gossiper.hh
Expand Up @@ -428,6 +428,9 @@ public:
// Wait for nodes to be alive on all shards
future<> wait_alive(std::vector<gms::inet_address> nodes, std::chrono::milliseconds timeout);

// Wait for `n` live nodes to show up in gossip (including ourself).
future<> wait_for_live_nodes_to_show_up(size_t n);

// Get live members synchronized to all shards
future<std::set<inet_address>> get_live_members_synchronized();

Expand Down
7 changes: 6 additions & 1 deletion message/messaging_service.cc
Expand Up @@ -933,7 +933,12 @@ void messaging_service::remove_rpc_client(msg_addr id) {

void messaging_service::remove_rpc_client_with_ignored_topology(msg_addr id) {
for (auto& c : _clients) {
find_and_remove_client(c, id, [] (const auto& s) { return s.topology_ignored; });
find_and_remove_client(c, id, [id] (const auto& s) {
if (s.topology_ignored) {
mlogger.info("Dropping connection to {} because it was created without topology information", id.addr);
}
return s.topology_ignored;
});
}
}

Expand Down
72 changes: 39 additions & 33 deletions service/storage_service.cc
Expand Up @@ -252,29 +252,11 @@ static future<> set_gossip_tokens(gms::gossiper& g,
*
* This function must only be called if we're not the first node
* (i.e. booting into existing cluster).
*
* Precondition: gossiper observed at least one other live node;
* see `gossiper::wait_for_live_nodes_to_show_up()`.
*/
future<> storage_service::wait_for_ring_to_settle() {
// Make sure we see at least one other node.
logger::rate_limit rate_limit{std::chrono::seconds{5}};
#ifdef SEASTAR_DEBUG
// Account for debug slowness. 3 minutes is probably overkill but we don't want flaky tests.
constexpr auto timeout_delay = std::chrono::minutes{3};
#else
constexpr auto timeout_delay = std::chrono::seconds{30};
#endif
auto timeout = gms::gossiper::clk::now() + timeout_delay;
while (_gossiper.get_live_members().size() < 2) {
if (timeout <= gms::gossiper::clk::now()) {
auto err = ::format("Timed out waiting for other live nodes to show up in gossip during initial boot");
slogger.error("{}", err);
throw std::runtime_error{std::move(err)};
}

slogger.log(log_level::info, rate_limit, "No other live nodes seen yet in gossip during initial boot...");
co_await sleep_abortable(std::chrono::milliseconds(10), _abort_source);
}
slogger.info("Live nodes seen in gossip during initial boot: {}", _gossiper.get_live_members());

auto t = gms::gossiper::clk::now();
while (true) {
slogger.info("waiting for schema information to complete");
Expand Down Expand Up @@ -1961,6 +1943,42 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
auto advertise = gms::advertise_myself(!replacing_a_node_with_same_ip);
co_await _gossiper.start_gossiping(generation_number, app_states, advertise);

if (!_raft_topology_change_enabled && should_bootstrap()) {
// Wait for NORMAL state handlers to finish for existing nodes now, so that connection dropping
// (happening at the end of `handle_state_normal`: `notify_joined`) doesn't interrupt
// group 0 joining or repair. (See #12764, #12956, #12972, #13302)
//
// But before we can do that, we must make sure that gossip sees at least one other node
// and fetches the list of peers from it; otherwise `wait_for_normal_state_handled_on_boot`
// may trivially finish without waiting for anyone.
co_await _gossiper.wait_for_live_nodes_to_show_up(2);

auto ignore_nodes = ri
? parse_node_list(_db.local().get_config().ignore_dead_nodes_for_replace(), get_token_metadata())
// TODO: specify ignore_nodes for bootstrap
: std::unordered_set<gms::inet_address>{};
auto sync_nodes = co_await get_nodes_to_sync_with(ignore_nodes);
if (ri) {
sync_nodes.erase(ri->address);
}

// Note: in Raft topology mode this is unnecessary.
// Node state changes are propagated to the cluster through explicit global barriers.
co_await wait_for_normal_state_handled_on_boot(sync_nodes);

// NORMAL doesn't necessarily mean UP (#14042). Wait for these nodes to be UP as well
// to reduce flakiness (we need them to be UP to perform CDC generation write and for repair/streaming).
//
// This could be done in Raft topology mode as well, but the calculation of nodes to sync with
// has to be done based on topology state machine instead of gossiper as it is here;
// furthermore, the place in the code where we do this has to be different (it has to be coordinated
// by the topology coordinator after it joins the node to the cluster).
std::vector<gms::inet_address> sync_nodes_vec{sync_nodes.begin(), sync_nodes.end()};
slogger.info("Waiting for nodes {} to be alive", sync_nodes_vec);
co_await _gossiper.wait_alive(sync_nodes_vec, std::chrono::seconds{30});
slogger.info("Nodes {} are alive", sync_nodes_vec);
}

assert(_group0);
// if the node is bootstrapped the functin will do nothing since we already created group0 in main.cc
co_await _group0->setup_group0(_sys_ks.local(), initial_contact_nodes, raft_replace_info, *this, qp, _migration_manager.local(), cdc_gen_service);
Expand Down Expand Up @@ -2273,18 +2291,6 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st
}
}

{
// Wait for normal state handler to finish for existing nodes in the cluster.
auto ignore_nodes = replacement_info ? parse_node_list(_db.local().get_config().ignore_dead_nodes_for_replace(), get_token_metadata())
// TODO: specify ignore_nodes for bootstrap
: std::unordered_set<gms::inet_address>{};
auto sync_nodes = get_nodes_to_sync_with(ignore_nodes).get();
if (replacement_info) {
sync_nodes.erase(replacement_info->address);
}
wait_for_normal_state_handled_on_boot(sync_nodes).get();
}

if (!replacement_info) {
// Even if we reached this point before but crashed, we will make a new CDC generation.
// It doesn't hurt: other nodes will (potentially) just do more generation switches.
Expand Down

0 comments on commit 50e8ec7

Please sign in to comment.