Skip to content

Commit

Permalink
Merge 'message: messaging_service: fix topology_ignored for pending e…
Browse files Browse the repository at this point in the history
…ndpoints in get_rpc_client' from Kamil Braun

`get_rpc_client` calculates a `topology_ignored` field when creating a
client which says whether the client's endpoint had topology information
when this client was created. This is later used to check if that client
needs to be dropped and replaced with a new client which uses the
correct topology information.

The `topology_ignored` field was incorrectly calculated as `true` for
pending endpoints even though we had topology information for them. This
would lead to unnecessary drops of RPC clients later. Fix this.

Remove the default parameter for `with_pending` from
`topology::has_endpoint` to avoid similar bugs in the future.

Apparently this fixes #11780. The verbs used by decommission operation
use RPC client index 1 (see `do_get_rpc_client_idx` in
message/messaging_service.cc). From local testing with additional
logging I found that by the time this client is created (i.e. the first
verb in this group is used), we already know the topology. The node is
pending at that point - hence the bug would cause us to assume we don't
know the topology, leading us to dropping the RPC client later, possibly
in the middle of a decommission operation.

Fixes: #11780

Closes #11942

* github.com:scylladb/scylladb:
  test: reenable test_topology::test_decommission_node_add_column
  test/pylib: util: configurable period in wait_for
  message: messaging_service: fix topology_ignored for pending endpoints in get_rpc_client
  • Loading branch information
xemul committed Nov 14, 2022
2 parents da6c472 + d298ce4 commit 2759649
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 10 deletions.
2 changes: 1 addition & 1 deletion locator/token_metadata.cc
Expand Up @@ -562,7 +562,7 @@ const std::unordered_map<inet_address, host_id>& token_metadata_impl::get_endpoi
}

bool token_metadata_impl::is_member(inet_address endpoint) const {
return _topology.has_endpoint(endpoint);
return _topology.has_endpoint(endpoint, topology::pending::no);
}

void token_metadata_impl::add_bootstrap_token(token t, inet_address endpoint) {
Expand Down
5 changes: 3 additions & 2 deletions locator/token_metadata.hh
Expand Up @@ -67,9 +67,10 @@ public:
void remove_endpoint(inet_address ep);

/**
* Returns true iff contains given endpoint
* Returns true iff contains given endpoint.
* Excludes pending endpoints if `with_pending == pending::no`.
*/
bool has_endpoint(inet_address, pending with_pending = pending::no) const;
bool has_endpoint(inet_address, pending with_pending) const;

const std::unordered_map<sstring,
std::unordered_set<inet_address>>&
Expand Down
4 changes: 3 additions & 1 deletion message/messaging_service.cc
Expand Up @@ -710,7 +710,9 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
std::optional<bool> topology_status;
auto has_topology = [&] {
if (!topology_status.has_value()) {
topology_status = _token_metadata ? _token_metadata->get()->get_topology().has_endpoint(id.addr) : false;
topology_status = _token_metadata
? _token_metadata->get()->get_topology().has_endpoint(id.addr, locator::topology::pending::yes)
: false;
}
return *topology_status;
};
Expand Down
6 changes: 6 additions & 0 deletions service/storage_service.cc
Expand Up @@ -64,6 +64,7 @@
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include "utils/stall_free.hh"
#include "utils/error_injection.hh"

#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
Expand Down Expand Up @@ -2584,6 +2585,8 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
} else if (req.cmd == node_ops_cmd::removenode_abort) {
node_ops_abort(ops_uuid).get();
} else if (req.cmd == node_ops_cmd::decommission_prepare) {
utils::get_local_injector().inject(
"storage_service_decommission_prepare_handler_sleep", std::chrono::milliseconds{1500}).get();
if (req.leaving_nodes.size() > 1) {
auto msg = format("decommission[{}]: Could not decommission more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes);
slogger.warn("{}", msg);
Expand Down Expand Up @@ -3613,6 +3616,9 @@ future<> storage_service::notify_joined(inet_address endpoint) {
co_return;
}

co_await utils::get_local_injector().inject(
"storage_service_notify_joined_sleep", std::chrono::milliseconds{500});

co_await container().invoke_on_all([endpoint] (auto&& ss) {
ss._messaging.local().remove_rpc_client_with_ignored_topology(netw::msg_addr{endpoint, 0});
return ss._lifecycle_notifier.notify_joined(endpoint);
Expand Down
6 changes: 6 additions & 0 deletions test/pylib/rest_client.py
Expand Up @@ -177,6 +177,12 @@ async def get_gossip_generation_number(self, node_ip: str, target_ip: str) -> in
assert(type(data) == int)
return data

async def get_joining_nodes(self, node_ip: str) -> list:
"""Get the list of joining nodes according to `node_ip`."""
data = await self.client.get_json(f"/storage_service/nodes/joining", host=node_ip)
assert(type(data) == list)
return data

async def enable_injection(self, node_ip: str, injection: str, one_shot: bool) -> None:
"""Enable error injection named `injection` on `node_ip`. Depending on `one_shot`,
the injection will be executed only once or every time the process passes the injection point.
Expand Down
6 changes: 4 additions & 2 deletions test/pylib/util.py
Expand Up @@ -20,13 +20,15 @@ def unique_name():
return unique_name_prefix + str(current_ms)


async def wait_for(pred: Callable[[], Awaitable[Optional[T]]], deadline: float) -> T:
async def wait_for(
pred: Callable[[], Awaitable[Optional[T]]],
deadline: float, period: float = 1) -> T:
while True:
assert(time.time() < deadline), "Deadline exceeded, failing test."
res = await pred()
if res is not None:
return res
await asyncio.sleep(1)
await asyncio.sleep(period)


unique_name.last_ms = 0
31 changes: 27 additions & 4 deletions test/topology/test_topology.py
Expand Up @@ -10,6 +10,9 @@
import logging
import asyncio
import random
import time

from test.pylib.util import wait_for

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -57,13 +60,33 @@ async def test_remove_node_add_column(manager, random_tables):


@pytest.mark.asyncio
@pytest.mark.skip(reason="Flaky due to #11780")
async def test_decommission_node_add_column(manager, random_tables):
"""Add a node, remove an original node, add a column"""
servers = await manager.running_servers()
table = await random_tables.add_table(ncolumns=5)
await manager.server_add()
await manager.decommission_node(servers[1].server_id) # Decommission [1]
servers = await manager.running_servers()
decommission_target = servers[1]
# The sleep injections significantly increase the probability of reproducing #11780:
# 1. bootstrapped_server finishes bootstrapping and enters NORMAL state
# 2. decommission_target starts storage_service::handle_state_normal(bootstrapped_server),
# enters sleep before calling storage_service::notify_joined
# 3. we start decommission on decommission_target
# 4. decommission_target sends node_ops_verb with decommission_prepare request to bootstrapped_server
# 5. bootstrapped_server receives the RPC and enters sleep
# 6. decommission_target handle_state_normal wakes up,
# calls storage_service::notify_joined which drops some RPC clients
# 7. If #11780 is not fixed, this will fail the node_ops_verb RPC, causing decommission to fail
await manager.api.enable_injection(
decommission_target.ip_addr, 'storage_service_notify_joined_sleep', one_shot=True)
bootstrapped_server = await manager.server_add()
async def no_joining_nodes():
joining_nodes = await manager.api.get_joining_nodes(decommission_target.ip_addr)
return not joining_nodes
# Wait until decommission_target thinks that bootstrapped_server is NORMAL
# note: when this wait finishes, we're usually in the middle of storage_service::handle_state_normal
await wait_for(no_joining_nodes, time.time() + 30, period=.1)
await manager.api.enable_injection(
bootstrapped_server.ip_addr, 'storage_service_decommission_prepare_handler_sleep', one_shot=True)
await manager.decommission_node(decommission_target.server_id)
await table.add_column()
await random_tables.verify_schema()

Expand Down

0 comments on commit 2759649

Please sign in to comment.