Skip to content

Commit

Permalink
Merge 'Fix bootstrap "wait for UP/NORMAL nodes" to handle ignored nod…
Browse files Browse the repository at this point in the history
…es, recently replaced nodes, and recently changed IPs' from Kamil Braun

Before this PR, the `wait_for_normal_state_handled_on_boot` would
wait for a static set of nodes (`sync_nodes`), calculated using the
`get_nodes_to_sync_with` function and `parse_node_list`; the latter was
used to obtain a list of "nodes to ignore" (for replace operation) and
translate them, using `token_metadata`, from IP addresses to Host IDs
and vice versa. `sync_nodes` was also used in `_gossiper.wait_alive` call
which we do after `wait_for_normal_state_handled_on_boot`.

Recently we started doing these calculations and this wait very early in
the boot procedure - immediately after we start gossiping
(50e8ec7).

Unfortunately, as always with gossiper, there are complications.
In #14468 and #14487 two problems were detected:
- Gossiper may contain obsolete entries for nodes which were recently
  replaced or changed their IPs. These entries are still using status
  `NORMAL` or `shutdown` (which is treated like `NORMAL`, e.g.
  `handle_state_normal` is also called for it). The
  `_gossiper.wait_alive` call would wait for those entries too and
  eventually time out.
- Furthermore, by the time we call `parse_node_list`, `token_metadata`
  may not be populated yet, which is required to do the IP<->Host ID
  translations -- and populating `token_metadata` happens inside
  `handle_state_normal`, so we have a chicken-and-egg problem here.

It turns out that we don't need to calculate `sync_nodes` (and
hence `ignore_nodes`) in order to wait for NORMAL state handlers. We
can wait for handlers to finish for *any* `NORMAL`/`shutdown` entries
appearing in gossiper, even those that correspond to dead/ignored
nodes and obsolete IPs.  `handle_state_normal` is called, and
eventually finishes, for all of them.
`wait_for_normal_state_handled_on_boot` no longer receives a set of
nodes as parameter and is modified appropriately, it's now calculating
the necessary set of nodes on each retry (the set may shrink while
we're waiting, e.g. because an entry corresponding to a node that was
replaced is garbage-collected from gossiper state).

Thanks to this, we can now put the `sync_nodes` calculation (which is
still necessary for `_gossiper.wait_alive`), and hence the
`parse_node_list` call, *after* we wait for NORMAL state handlers,
solving the chickend-and-egg problem.

This addresses the immediate failure described in #14487, but the test
would still fail. That's because `_gossiper.wait_alive` may still receive
a too large set of nodes -- we may still include obsolete IPs or entries
corresponding to replaced nodes in the `sync_nodes` set.

We need a better way to calculate `sync_nodes` which detects ignores
obsolete IPs and nodes that are already gone but just weren't
garbage-collected from gossiper state yet.

In fact such a method was already introduced in the past:
ca61d88
but it wasn't used everywhere. There, we use `token_metadata` in which
collisions between Host IDs and tokens are resolved, so it contains only
entries that correspond to the "real" current set of NORMAL nodes.

We use this method to calculate the set of nodes passed to
`_gossiper.wait_alive`.

We also introduce regression tests with necessary extensions
to the test framework.

Fixes #14468
Fixes #14487

Closes #14507

* github.com:scylladb/scylladb:
  test: rename `test_topology_ip.py` to `test_replace.py`
  test: test bootstrap after IP change
  test: scylla_cluster: return the new IP from `change_ip` API
  test: node replace with `ignore_dead_nodes` test
  test: scylla_cluster: accept `ignore_dead_nodes` in `ReplaceConfig`
  storage_service: remove `get_nodes_to_sync_with`
  storage_service: use `token_metadata` to calculate nodes waited for to be UP
  storage_service: don't calculate `ignore_nodes` before waiting for normal handlers
  • Loading branch information
tgrabiec committed Jul 9, 2023
2 parents 1eb76d9 + 431a8f8 commit 65a5942
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 55 deletions.
103 changes: 62 additions & 41 deletions service/storage_service.cc
Expand Up @@ -82,6 +82,7 @@
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/trim_all.hpp>
#include <boost/algorithm/string/join.hpp>

using token = dht::token;
using UUID = utils::UUID;
Expand Down Expand Up @@ -1982,18 +1983,9 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
// may trivially finish without waiting for anyone.
co_await _gossiper.wait_for_live_nodes_to_show_up(2);

auto ignore_nodes = ri
? parse_node_list(_db.local().get_config().ignore_dead_nodes_for_replace(), get_token_metadata())
// TODO: specify ignore_nodes for bootstrap
: std::unordered_set<gms::inet_address>{};
auto sync_nodes = co_await get_nodes_to_sync_with(ignore_nodes);
if (ri) {
sync_nodes.erase(ri->address);
}

// Note: in Raft topology mode this is unnecessary.
// Node state changes are propagated to the cluster through explicit global barriers.
co_await wait_for_normal_state_handled_on_boot(sync_nodes);
co_await wait_for_normal_state_handled_on_boot();

// NORMAL doesn't necessarily mean UP (#14042). Wait for these nodes to be UP as well
// to reduce flakiness (we need them to be UP to perform CDC generation write and for repair/streaming).
Expand All @@ -2002,10 +1994,30 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
// has to be done based on topology state machine instead of gossiper as it is here;
// furthermore, the place in the code where we do this has to be different (it has to be coordinated
// by the topology coordinator after it joins the node to the cluster).
std::vector<gms::inet_address> sync_nodes_vec{sync_nodes.begin(), sync_nodes.end()};
slogger.info("Waiting for nodes {} to be alive", sync_nodes_vec);
co_await _gossiper.wait_alive(sync_nodes_vec, std::chrono::seconds{30});
slogger.info("Nodes {} are alive", sync_nodes_vec);
//
// We calculate nodes to wait for based on token_metadata. Previously we would use gossiper
// directly for this, but gossiper may still contain obsolete entries from 1. replaced nodes
// and 2. nodes that have changed their IPs; these entries are eventually garbage-collected,
// but here they may still be present if we're performing topology changes in quick succession.
// `token_metadata` has all host ID / token collisions resolved so in particular it doesn't contain
// these obsolete IPs. Refs: #14487, #14468
auto& tm = get_token_metadata();
auto ignore_nodes = ri
? parse_node_list(_db.local().get_config().ignore_dead_nodes_for_replace(), tm)
// TODO: specify ignore_nodes for bootstrap
: std::unordered_set<gms::inet_address>{};

std::vector<gms::inet_address> sync_nodes;
tm.get_topology().for_each_node([&] (const locator::node* np) {
auto ep = np->endpoint();
if (!ignore_nodes.contains(ep) && (!ri || ep != ri->address)) {
sync_nodes.push_back(ep);
}
});

slogger.info("Waiting for nodes {} to be alive", sync_nodes);
co_await _gossiper.wait_alive(sync_nodes, std::chrono::seconds{30});
slogger.info("Nodes {} are alive", sync_nodes);
}

assert(_group0);
Expand Down Expand Up @@ -2268,21 +2280,6 @@ std::unordered_set<gms::inet_address> storage_service::parse_node_list(sstring c
return ignore_nodes;
}

future<std::unordered_set<gms::inet_address>> storage_service::get_nodes_to_sync_with(
const std::unordered_set<gms::inet_address>& ignore_nodes) {
std::unordered_set<gms::inet_address> result;
for (const auto& node :_gossiper.get_endpoints()) {
co_await coroutine::maybe_yield();
slogger.info("Check node={}, status={}", node, _gossiper.get_gossip_status(node));
if (node != get_broadcast_address() &&
_gossiper.is_normal_ring_member(node) &&
!ignore_nodes.contains(node)) {
result.insert(node);
}
}
co_return result;
}

// Runs inside seastar::async context
future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, std::unordered_set<token>& bootstrap_tokens, std::optional<cdc::generation_id>& cdc_gen_id, const std::optional<replacement_info>& replacement_info) {
return seastar::async([this, &bootstrap_tokens, &cdc_gen_id, &cdc_gen_service, &replacement_info] {
Expand Down Expand Up @@ -5726,21 +5723,45 @@ bool storage_service::is_normal_state_handled_on_boot(gms::inet_address node) {
return _normal_state_handled_on_boot.contains(node);
}

// Wait for normal state handler to finish on boot
future<> storage_service::wait_for_normal_state_handled_on_boot(const std::unordered_set<gms::inet_address>& nodes) {
slogger.info("Started waiting for normal state handler for nodes {}", nodes);
// Wait for normal state handlers to finish on boot
future<> storage_service::wait_for_normal_state_handled_on_boot() {
static logger::rate_limit rate_limit{std::chrono::seconds{5}};
static auto fmt_nodes_with_statuses = [this] (const auto& eps) {
return boost::algorithm::join(
eps | boost::adaptors::transformed([this] (const auto& ep) {
return ::format("({}, status={})", ep, _gossiper.get_gossip_status(ep));
}), ", ");
};

slogger.info("Started waiting for normal state handlers to finish");
auto start_time = std::chrono::steady_clock::now();
for (auto& node: nodes) {
while (!is_normal_state_handled_on_boot(node)) {
slogger.debug("Waiting for normal state handler for node {}", node);
co_await sleep_abortable(std::chrono::milliseconds(100), _abort_source);
if (std::chrono::steady_clock::now() > start_time + std::chrono::seconds(60)) {
throw std::runtime_error(::format("Node {} did not finish normal state handler, reject the node ops", node));
}
std::vector<gms::inet_address> eps;
while (true) {
eps = _gossiper.get_endpoints();
auto it = std::partition(eps.begin(), eps.end(),
[this, me = get_broadcast_address()] (const gms::inet_address& ep) {
return ep == me || !_gossiper.is_normal_ring_member(ep) || is_normal_state_handled_on_boot(ep);
});

if (it == eps.end()) {
break;
}

if (std::chrono::steady_clock::now() > start_time + std::chrono::seconds(60)) {
auto err = ::format("Timed out waiting for normal state handlers to finish for nodes {}",
fmt_nodes_with_statuses(boost::make_iterator_range(it, eps.end())));
slogger.error("{}", err);
throw std::runtime_error{std::move(err)};
}

slogger.log(log_level::info, rate_limit, "Normal state handlers not yet finished for nodes {}",
fmt_nodes_with_statuses(boost::make_iterator_range(it, eps.end())));

co_await sleep_abortable(std::chrono::milliseconds{100}, _abort_source);
}
slogger.info("Finished waiting for normal state handler for nodes {}", nodes);
co_return;

slogger.info("Finished waiting for normal state handlers; endpoints observed in gossip: {}",
fmt_nodes_with_statuses(eps));
}

future<bool> storage_service::is_cleanup_allowed(sstring keyspace) {
Expand Down
4 changes: 1 addition & 3 deletions service/storage_service.hh
Expand Up @@ -293,8 +293,6 @@ private:
void run_replace_ops(std::unordered_set<token>& bootstrap_tokens, replacement_info replace_info);
void run_bootstrap_ops(std::unordered_set<token>& bootstrap_tokens);

future<std::unordered_set<gms::inet_address>> get_nodes_to_sync_with(
const std::unordered_set<gms::inet_address>& ignore_dead_nodes);
future<> wait_for_ring_to_settle();

public:
Expand Down Expand Up @@ -749,7 +747,7 @@ public:
private:
std::unordered_set<gms::inet_address> _normal_state_handled_on_boot;
bool is_normal_state_handled_on_boot(gms::inet_address);
future<> wait_for_normal_state_handled_on_boot(const std::unordered_set<gms::inet_address>& nodes);
future<> wait_for_normal_state_handled_on_boot();

friend class group0_state_machine;
bool _raft_topology_change_enabled = false;
Expand Down
6 changes: 4 additions & 2 deletions test/pylib/manager_client.py
Expand Up @@ -226,9 +226,11 @@ async def server_update_config(self, server_id: ServerNum, key: str, value: obje
await self.client.put_json(f"/cluster/server/{server_id}/update_config",
{"key": key, "value": value})

async def server_change_ip(self, server_id: ServerNum) -> None:
async def server_change_ip(self, server_id: ServerNum) -> IPAddress:
"""Change server IP address. Applicable only to a stopped server"""
await self.client.put_json(f"/cluster/server/{server_id}/change_ip", {})
ret = await self.client.put_json(f"/cluster/server/{server_id}/change_ip", {},
response_type="json")
return IPAddress(ret["ip_addr"])

async def wait_for_host_known(self, dst_server_id: str, expect_host_id: str,
deadline: Optional[float] = None) -> None:
Expand Down
19 changes: 10 additions & 9 deletions test/pylib/scylla_cluster.py
Expand Up @@ -47,6 +47,7 @@ class ReplaceConfig(NamedTuple):
replaced_id: ServerNum
reuse_ip_addr: bool
use_host_id: bool
ignore_dead_nodes: list[IPAddress | HostID] = []


def make_scylla_conf(workdir: pathlib.Path, host_addr: str, seed_addrs: List[str], cluster_name: str) -> dict[str, object]:
Expand Down Expand Up @@ -679,6 +680,9 @@ async def add_server(self, replace_cfg: Optional[ReplaceConfig] = None, cmdline:
else:
extra_config['replace_address_first_boot'] = replaced_srv.ip_addr

if replace_cfg.ignore_dead_nodes:
extra_config['ignore_dead_nodes_for_replace'] = ','.join(replace_cfg.ignore_dead_nodes)

assert replaced_id not in self.removed, \
f"add_server: cannot replace removed server {replaced_srv}"
assert replaced_id in self.stopped, \
Expand Down Expand Up @@ -902,24 +906,23 @@ def setLogger(self, logger: logging.LoggerAdapter):
for srv in self.servers.values():
srv.setLogger(self.logger)

async def change_ip(self, server_id: ServerNum) -> ActionReturn:
async def change_ip(self, server_id: ServerNum) -> IPAddress:
"""Lease a new IP address and update conf/scylla.yaml with it. The
original IP is released at the end of the test to avoid an
immediate recycle within the same cluster. The server must be
stopped before its ip is changed."""
if server_id not in self.servers:
return ScyllaCluster.ActionReturn(success=False, msg=f"Server {server_id} unknown")
raise RuntimeError(f"Server {server_id} unknown")
server = self.servers[server_id]
if server.is_running:
msg = f"Server {server_id} is running: stop it first and then change its ip"
return ScyllaCluster.ActionReturn(success=False, msg=msg)
raise RuntimeError(f"Server {server_id} is running: stop it first and then change its ip")
self.is_dirty = True
ip_addr = IPAddress(await self.host_registry.lease_host())
self.leased_ips.add(ip_addr)
logging.info("Cluster %s changed server %s IP from %s to %s", self.name,
server_id, server.ip_addr, ip_addr)
server.change_ip(ip_addr)
return ScyllaCluster.ActionReturn(success=True)
return ip_addr


class ScyllaClusterManager:
Expand Down Expand Up @@ -1231,10 +1234,8 @@ async def _server_change_ip(self, request: aiohttp.web.Request) -> aiohttp.web.R
"""Pass change_ip command for the given server to the cluster"""
assert self.cluster
server_id = ServerNum(int(request.match_info["server_id"]))
ret = await self.cluster.change_ip(server_id)
if not ret.success:
return aiohttp.web.Response(status=404, text=ret.msg)
return aiohttp.web.Response()
ip_addr = await self.cluster.change_ip(server_id)
return aiohttp.web.json_response({"ip_addr": ip_addr})


@asynccontextmanager
Expand Down
File renamed without changes.
2 changes: 2 additions & 0 deletions test/topology_custom/suite.yaml
Expand Up @@ -7,5 +7,7 @@ extra_scylla_config_options:
authorizer: AllowAllAuthorizer
skip_in_release:
- test_shutdown_hang
- test_replace_ignore_nodes
skip_in_debug:
- test_shutdown_hang
- test_replace_ignore_nodes
52 changes: 52 additions & 0 deletions test/topology_custom/test_boot_after_ip_change.py
@@ -0,0 +1,52 @@
#
# Copyright (C) 2023-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
import time
import pytest
import logging

from test.pylib.internal_types import IPAddress, HostID
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.manager_client import ManagerClient
from test.topology.util import wait_for_token_ring_and_group0_consistency


logger = logging.getLogger(__name__)


@pytest.mark.asyncio
async def test_boot_after_ip_change(manager: ManagerClient) -> None:
"""Bootstrap a new node after existing one changed its IP.
Regression test for #14468. Does not apply to Raft-topology mode.
"""
cfg = {'experimental_features': list[str]()}
logger.info(f"Booting initial cluster")
servers = [await manager.server_add(config=cfg) for _ in range(2)]
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)

logger.info(f"Stopping server {servers[1]}")
await manager.server_stop_gracefully(servers[1].server_id)

logger.info(f"Changing IP of server {servers[1]}")
new_ip = await manager.server_change_ip(servers[1].server_id)
servers[1] = servers[1]._replace(ip_addr = new_ip)
logger.info(f"New IP: {new_ip}")

logger.info(f"Restarting server {servers[1]}")
await manager.server_start(servers[1].server_id)

# We need to do this wait before we boot a new node.
# Otherwise the newly booting node may contact servers[0] even before servers[0]
# saw the new IP of servers[1], and then the booting node will try to wait
# for servers[1] to be alive using its old IP (and eventually time out).
#
# Note that this still acts as a regression test for #14468.
# In #14468, the problem was that a booting node would try to wait for the old IP
# of servers[0] even after all existing servers saw the IP change.
logger.info(f"Wait until {servers[0]} sees the new IP of {servers[1]}")
await manager.server_sees_other_server(servers[0].ip_addr, servers[1].ip_addr)

logger.info(f"Booting new node")
await manager.server_add(config=cfg)
56 changes: 56 additions & 0 deletions test/topology_custom/test_replace_ignore_nodes.py
@@ -0,0 +1,56 @@
#
# Copyright (C) 2023-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
import time
import pytest
import logging

from test.pylib.internal_types import IPAddress, HostID
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.manager_client import ManagerClient
from test.topology.util import wait_for_token_ring_and_group0_consistency


logger = logging.getLogger(__name__)


@pytest.mark.asyncio
async def test_replace_ignore_nodes(manager: ManagerClient) -> None:
"""Replace a node in presence of multiple dead nodes.
Regression test for #14487. Does not apply to Raft-topology mode.
This is a slow test with a 7 node cluster any 3 replace operations,
we don't want to run it in debug mode.
Preferably run it only in one mode e.g. dev.
"""
cfg = {'experimental_features': list[str]()}
logger.info(f"Booting initial cluster")
servers = [await manager.server_add(config=cfg) for _ in range(7)]
s2_id = await manager.get_host_id(servers[2].server_id)
logger.info(f"Stopping servers {servers[:3]}")
await manager.server_stop(servers[0].server_id)
await manager.server_stop(servers[1].server_id)
await manager.server_stop_gracefully(servers[2].server_id)

# The parameter accepts both IP addrs with host IDs.
# We must be able to resolve them in both ways.
ignore_dead: list[IPAddress | HostID] = [servers[1].ip_addr, s2_id]
logger.info(f"Replacing {servers[0]}, ignore_dead_nodes = {ignore_dead}")
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = False,
ignore_dead_nodes = ignore_dead)
await manager.server_add(replace_cfg=replace_cfg, config=cfg)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)

ignore_dead = [servers[2].ip_addr]
logger.info(f"Replacing {servers[1]}, ignore_dead_nodes = {ignore_dead}")
replace_cfg = ReplaceConfig(replaced_id = servers[1].server_id, reuse_ip_addr = False, use_host_id = False,
ignore_dead_nodes = ignore_dead)
await manager.server_add(replace_cfg=replace_cfg, config=cfg)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)

logger.info(f"Replacing {servers[2]}")
replace_cfg = ReplaceConfig(replaced_id = servers[2].server_id, reuse_ip_addr = False, use_host_id = False)
await manager.server_add(replace_cfg=replace_cfg, config=cfg)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)

0 comments on commit 65a5942

Please sign in to comment.