From d298ce4457c01b7c576c4252f3ff672b47ac5111 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 10 Nov 2022 10:22:26 +0100 Subject: [PATCH] test: reenable test_topology::test_decommission_node_add_column Also improve the test to increase the probability of reproducing #11780 by injecting sleeps in appropriate places. Without the fix for #11780 from the earlier commit, the test reproduces the issue in roughly half of all runs in dev build on my laptop. --- service/storage_service.cc | 6 ++++++ test/pylib/rest_client.py | 6 ++++++ test/topology/test_topology.py | 31 +++++++++++++++++++++++++++---- 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 03611a7ed048..7d7ae4a22b55 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -64,6 +64,7 @@ #include #include #include "utils/stall_free.hh" +#include "utils/error_injection.hh" #include #include @@ -2566,6 +2567,8 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } else if (req.cmd == node_ops_cmd::removenode_abort) { node_ops_abort(ops_uuid).get(); } else if (req.cmd == node_ops_cmd::decommission_prepare) { + utils::get_local_injector().inject( + "storage_service_decommission_prepare_handler_sleep", std::chrono::milliseconds{1500}).get(); if (req.leaving_nodes.size() > 1) { auto msg = format("decommission[{}]: Could not decommission more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes); slogger.warn("{}", msg); @@ -3595,6 +3598,9 @@ future<> storage_service::notify_joined(inet_address endpoint) { co_return; } + co_await utils::get_local_injector().inject( + "storage_service_notify_joined_sleep", std::chrono::milliseconds{500}); + co_await container().invoke_on_all([endpoint] (auto&& ss) { ss._messaging.local().remove_rpc_client_with_ignored_topology(netw::msg_addr{endpoint, 0}); return ss._lifecycle_notifier.notify_joined(endpoint); diff --git a/test/pylib/rest_client.py b/test/pylib/rest_client.py index 1211eee481f6..268e51fe5144 100644 --- a/test/pylib/rest_client.py +++ b/test/pylib/rest_client.py @@ -177,6 +177,12 @@ async def get_gossip_generation_number(self, node_ip: str, target_ip: str) -> in assert(type(data) == int) return data + async def get_joining_nodes(self, node_ip: str) -> list: + """Get the list of joining nodes according to `node_ip`.""" + data = await self.client.get_json(f"/storage_service/nodes/joining", host=node_ip) + assert(type(data) == list) + return data + async def enable_injection(self, node_ip: str, injection: str, one_shot: bool) -> None: """Enable error injection named `injection` on `node_ip`. Depending on `one_shot`, the injection will be executed only once or every time the process passes the injection point. diff --git a/test/topology/test_topology.py b/test/topology/test_topology.py index b1f80745579e..d198e3a18f1d 100644 --- a/test/topology/test_topology.py +++ b/test/topology/test_topology.py @@ -10,6 +10,9 @@ import logging import asyncio import random +import time + +from test.pylib.util import wait_for logger = logging.getLogger(__name__) @@ -57,13 +60,33 @@ async def test_remove_node_add_column(manager, random_tables): @pytest.mark.asyncio -@pytest.mark.skip(reason="Flaky due to #11780") async def test_decommission_node_add_column(manager, random_tables): """Add a node, remove an original node, add a column""" - servers = await manager.running_servers() table = await random_tables.add_table(ncolumns=5) - await manager.server_add() - await manager.decommission_node(servers[1].server_id) # Decommission [1] + servers = await manager.running_servers() + decommission_target = servers[1] + # The sleep injections significantly increase the probability of reproducing #11780: + # 1. bootstrapped_server finishes bootstrapping and enters NORMAL state + # 2. decommission_target starts storage_service::handle_state_normal(bootstrapped_server), + # enters sleep before calling storage_service::notify_joined + # 3. we start decommission on decommission_target + # 4. decommission_target sends node_ops_verb with decommission_prepare request to bootstrapped_server + # 5. bootstrapped_server receives the RPC and enters sleep + # 6. decommission_target handle_state_normal wakes up, + # calls storage_service::notify_joined which drops some RPC clients + # 7. If #11780 is not fixed, this will fail the node_ops_verb RPC, causing decommission to fail + await manager.api.enable_injection( + decommission_target.ip_addr, 'storage_service_notify_joined_sleep', one_shot=True) + bootstrapped_server = await manager.server_add() + async def no_joining_nodes(): + joining_nodes = await manager.api.get_joining_nodes(decommission_target.ip_addr) + return not joining_nodes + # Wait until decommission_target thinks that bootstrapped_server is NORMAL + # note: when this wait finishes, we're usually in the middle of storage_service::handle_state_normal + await wait_for(no_joining_nodes, time.time() + 30, period=.1) + await manager.api.enable_injection( + bootstrapped_server.ip_addr, 'storage_service_decommission_prepare_handler_sleep', one_shot=True) + await manager.decommission_node(decommission_target.server_id) await table.add_column() await random_tables.verify_schema()