From 071251a9160b1779a4769a9083603477699288eb Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 10 Nov 2022 10:22:26 +0100 Subject: [PATCH] test: reenable test_topology::test_decommission_node_add_column Also improve the test to increase the probability of reproducing #11780 by injecting sleeps in appropriate places. Without the fix for #11780 from the earlier commit, the test reproduces the issue in roughly half of all runs in dev build on my laptop. --- service/storage_service.cc | 6 ++++++ test/pylib/rest_client.py | 7 +++++++ test/topology/test_topology.py | 30 ++++++++++++++++++++++++++---- 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 03611a7ed048..7d7ae4a22b55 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -64,6 +64,7 @@ #include #include #include "utils/stall_free.hh" +#include "utils/error_injection.hh" #include #include @@ -2566,6 +2567,8 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } else if (req.cmd == node_ops_cmd::removenode_abort) { node_ops_abort(ops_uuid).get(); } else if (req.cmd == node_ops_cmd::decommission_prepare) { + utils::get_local_injector().inject( + "storage_service_decommission_prepare_handler_sleep", std::chrono::milliseconds{1500}).get(); if (req.leaving_nodes.size() > 1) { auto msg = format("decommission[{}]: Could not decommission more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes); slogger.warn("{}", msg); @@ -3595,6 +3598,9 @@ future<> storage_service::notify_joined(inet_address endpoint) { co_return; } + co_await utils::get_local_injector().inject( + "storage_service_notify_joined_sleep", std::chrono::milliseconds{500}); + co_await container().invoke_on_all([endpoint] (auto&& ss) { ss._messaging.local().remove_rpc_client_with_ignored_topology(netw::msg_addr{endpoint, 0}); return ss._lifecycle_notifier.notify_joined(endpoint); diff --git a/test/pylib/rest_client.py b/test/pylib/rest_client.py index da0202304585..d5eb13eb3c0c 100644 --- a/test/pylib/rest_client.py +++ b/test/pylib/rest_client.py @@ -182,6 +182,13 @@ async def get_gossip_generation_number(self, node_ip: str, target_ip: str) -> in assert(type(data) == int) return data + async def get_joining_nodes(self, node_ip: str) -> list: + """Get the list of joining nodes according to `node_ip`.""" + resp = await self.client.get(f"/storage_service/nodes/joining", host=node_ip) + data = await resp.json() + assert(type(data) == list) + return data + async def enable_injection(self, node_ip: str, injection: str, one_shot: bool) -> None: """Enable error injection named `injection` on `node_ip`. Depending on `one_shot`, the injection will be executed only once or every time the process passes the injection point. diff --git a/test/topology/test_topology.py b/test/topology/test_topology.py index 7c161462a615..34869e4ed36d 100644 --- a/test/topology/test_topology.py +++ b/test/topology/test_topology.py @@ -10,6 +10,9 @@ import logging import asyncio import random +import time + +from test.pylib.util import wait_for logger = logging.getLogger(__name__) @@ -58,13 +61,32 @@ async def test_remove_node_add_column(manager, random_tables): @pytest.mark.asyncio -@pytest.mark.skip(reason="Flaky due to #11780") async def test_decommission_node_add_column(manager, random_tables): """Add a node, remove an original node, add a column""" - servers = await manager.running_servers() table = await random_tables.add_table(ncolumns=5) - await manager.server_add() - await manager.decommission_node(servers[1]) # Decommission [1] + servers = await manager.running_servers() + decommission_target = servers[1] + # The sleep injections significantly increase the probability of reproducing #11780: + # 1. bootstrapped_server finishes bootstrapping and enters NORMAL state + # 2. decommission_target starts storage_service::handle_state_normal(bootstrapped_server), + # enters sleep before calling storage_service::notify_joined + # 3. we start decommission on decommission_target + # 4. decommission_target sends node_ops_verb with decommission_prepare request to bootstrapped_server + # 5. bootstrapped_server receives the RPC and enters sleep + # 6. decommission_target handle_state_normal wakes up, + # calls storage_service::notify_joined which drops some RPC clients + # 7. If #11780 is not fixed, this will fail the node_ops_verb RPC, causing decommission to fail + await manager.api.enable_injection(decommission_target, 'storage_service_notify_joined_sleep', one_shot=True) + bootstrapped_server = await manager.server_add() + async def no_joining_nodes(): + joining_nodes = await manager.api.get_joining_nodes(decommission_target) + return not joining_nodes + # Wait until decommission_target thinks that bootstrapped_server is NORMAL + # note: when this wait finishes, we're usually in the middle of storage_service::handle_state_normal + await wait_for(no_joining_nodes, time.time() + 30, period=.1) + await manager.api.enable_injection( + bootstrapped_server, 'storage_service_decommission_prepare_handler_sleep', one_shot=True) + await manager.decommission_node(decommission_target) await table.add_column() await random_tables.verify_schema()