### Imports

In [None]:
import os
import re
import random
import pprint
from sdcm.utils.raft.common import get_topology_coordinator_node
from sdcm.utils.nemesis_utils.node_operations import is_node_seen_as_down
from sdcm.wait import wait_for
from sdcm.utils.raft.common import get_topology_coordinator_node
from sdcm import sct_abs_path
from sdcm.nemesis import Nemesis
from sdcm.sct_config import SCTConfiguration
from sdcm.utils.common import ParallelObject

from longevity_test import LongevityTest



### Cluster configuration 
better to use Env variables

In [None]:
os.environ["BUILD_USER_EMAIL"] = "alex.bykov@scylladb.com"
os.environ["BUILD_ID"] = "555"
os.environ["SCT_CLUSTER_BACKEND"] = "aws"
os.environ["SCT_CONFIG_FILES"] = "test-cases/longevity/longevity-100gb-4h.yaml"
os.environ["SCT_SCYLLA_VERSION"] = ""
# os.environ["SCT_AMI_ID_DB_SCYLLA"] = "ami-0b7362e1db3807a21"  # custom ami with mv building
os.environ["SCT_SCYLLA_VERSION"] = "master:latest"
os.environ["SCT_REGION_NAME"] = "eu-west-1 us-east-1"
os.environ["SCT_AVAILABILITY_ZONE"] = "a"
os.environ["SCT_USE_MGMT"] = "False"
os.environ["SCT_N_DB_NODES"] = "4 4"
os.environ["SCT_SIMULATED_RACKS"] = "2"
os.environ["SCT_N_LOADERS"] = "1"
os.environ["SCT_N_MONITORS_NODES"] = "1"
os.environ["SCT_INSTANCE_PROVISION"] = "on_demand"
os.environ["SCT_ENABLE_ARGUS"] = "False"
os.environ['SCT_INSTANCE_TYPE_DB'] = 'i4i.2xlarge'
os.environ["SCT_NEMESIS_CLASS_NAME"] = "NoOpMonkey"
os.environ["SCT_STRESS_CMD"] = ""
os.environ['SCT_PREPARE_WRITE_CMD'] = ""
os.environ['SCT_IP_SSH_CONNECTIONS'] = 'public'

assert os.environ["BUILD_USER_EMAIL"] != "sct.tester@scylladb.com", "please use your own email so resources are tracked properly"

# logging configuration, for jupyter only (sct logs are intact)
import logging

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
consoleHandler = logging.StreamHandler()
consoleHandler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
consoleHandler.setFormatter(formatter)
LOGGER.addHandler(consoleHandler)

sct_config = SCTConfiguration()
sct_config.verify_configuration()


### Define additional functions
Add several functions for node operations


In [None]:

from typing import DefaultDict
from dataclasses import dataclass

@dataclass
class VoterState:
    name: str
    ip_address: str
    host_id: str
    is_voter: bool
    

def get_current_voters_states(nodes: list["BaseNode"], verification_node: "BaseNode"):
    voters_state = DefaultDict()
    group0_members = verification_node.raft.get_group0_members()
    nodes = nodes or verification_node.parent_cluster.get_nodes_up_and_normal(verification_node)
    hostid_node_map = {node.host_id: node for node in nodes}
    for member in group0_members:
        if node := hostid_node_map.get(member['host_id']):
            voters_state.setdefault(node.region, []).append(
                VoterState(node.name, node.ip_address,  member['host_id'], member['voter']))
    LOGGER.debug("Voters per region: %s", voters_state)
    return voters_state


def stop_scylla_server_not_gently(node: "BaseNode"):
    node.remoter.sudo("sed -i 's/Restart=on-abnormal/#Restart=on-abnormal/' /usr/lib/systemd/system/scylla-server.service")
    node.remoter.sudo("sudo systemctl daemon-reload")
    node.remoter.sudo("pkill -9 scylla")


def replace_cluster_node(cluster, verification_node: "BaseNode",
                            host_id: str | None = None,
                            dc_idx: int = 0,
                            rack: int = 0,
                            ignore_dead_node_host_ids: str = "",
                            timeout: int | float = 3600 * 8) -> "BaseNode":
    """When old_node_ip or host_id are not None then replacement node procedure is initiated"""
    cluster.log.info("Adding new node to cluster...")
    new_node: "BaseNode" = cluster.add_nodes(count=1, dc_idx=dc_idx, rack=rack, enable_auto_bootstrap=True)[0]
    cluster.monitor.reconfigure_scylla_monitoring()
    new_node.remoter.sudo(f"""echo 'ignore_dead_nodes_for_replace: {ignore_dead_node_host_ids}' | sudo tee -a  /etc/scylla/scylla.yaml""")
    new_node.replacement_host_id = host_id

    try:
        cluster.wait_for_init(node_list=[new_node], timeout=timeout, check_node_health=False)
        cluster.clean_replacement_node_options(new_node)
        cluster.set_seeds()
        cluster.update_seed_provider()
    except Exception:
        cluster.log.warning("TestConfig of the '%s' failed, removing it from list of nodes" % new_node)
        cluster.nodes.remove(new_node)
        cluster.log.warning("Node will not be terminated. Please terminate manually!!!")
        raise

    cluster.wait_for_nodes_up_and_normal(nodes=[new_node], verification_node=verification_node)
    new_node.wait_node_fully_start()
    new_node.remoter.sudo(f"""sed -i 's/ignore_dead_nodes_for_replace: {ignore_dead_node_host_ids}/# ignore_dead_nodes_for_replace:/' /etc/scylla/scylla.yaml""")

    return new_node


def replace_nodes_by_host_id(cluster, dead_node_mapping: dict[str, "BaseNode"], verification_node: "BaseNode"):
    host_ids = list(dead_node_mapping.keys())
    new_nodes = []
    for i, host_id in enumerate(host_ids):
        cluster.log.info("Replace node %s with host_id: %s", dead_node_mapping[host_id].name, host_id)
        new_nodes.append(replace_cluster_node(cluster, verification_node,
                                                    dc_idx=dead_node_mapping[host_id].dc_idx,
                                                    rack=dead_node_mapping[host_id].rack,
                                                    host_id=host_id,
                                                    ignore_dead_node_host_ids=",".join(host_ids[i+1:])))

    for node in dead_node_mapping.values():
        cluster.terminate_node(node)

    cluster.wait_all_nodes_un()


def add_node_to_dc(cluster, dc_idx: int = 0, rack: int = 0) -> "BaseNode":
    cluster.log.info("Adding new node")
    new_node = cluster.add_nodes(1, dc_idx=dc_idx, rack=rack, enable_auto_bootstrap=True)[0]
    cluster.wait_for_init(node_list=[new_node], timeout=900,
                                    check_node_health=True)
    cluster.wait_for_nodes_up_and_normal(nodes=[new_node])
    cluster.monitor.reconfigure_scylla_monitoring()

    return new_node



### Initialize clsuter
Create a cluster and add 2 new attribures for easy usage

In [None]:
from sdcm.sct_events.setup import start_events_device


LOGGER.info("Start cluster init")
os.chdir(sct_abs_path(relative_filename=""))
tester_inst = LongevityTest()
tester_inst.setUpClass()
tester_inst._init_logging()
start_events_device(log_dir=tester_inst.logdir,
                            _registry=getattr(tester_inst, "_registry", None) or tester_inst.events_processes_registry)
tester_inst.setUp()
cluster = tester_inst.db_cluster
setattr(cluster, "monitor", tester_inst.monitors)
setattr(cluster, "loaders", tester_inst.loaders)

In [None]:
print(cluster.nodes[0].run_nodetool("status"))

## Define required variables

with limited voters, need to find dc wilt most number of voters with next kill it

In [None]:

voters_status = get_current_voters_states(cluster.nodes, cluster.nodes[0])
alive_dc_region, dead_dc_region = sorted(voters_status, key=lambda region: len(list(filter(lambda v: v.is_voter, voters_status[region]))))
nodes_by_region = cluster.nodes_by_region(cluster.nodes)
host_id_node_map = {node.host_id: node for node in cluster.nodes}
dead_node_host_ids_maps = {node.host_id: node for node in nodes_by_region[dead_dc_region]}
dead_node_host_ids = list(dead_node_host_ids_maps.keys())
verification_node = nodes_by_region[alive_dc_region][0]


### Stop all nodes in DC with largest number of voters.

Need stop them very fast better in parallel, so voters was not reassigned to another dc


In [None]:

parallel_stop = ParallelObject(nodes_by_region[dead_dc_region], num_workers=len(nodes_by_region[dead_dc_region]), timeout = 600)
parallel_stop.run(stop_scylla_server_not_gently)
    

# GET CLUSTER STATUS

This cell could be used to validate current status of cluster with `nodetool status`

In [None]:
verification_node = nodes_by_region[alive_dc_region][0]
print(verification_node.run_nodetool("status"))

### Validate that raft quorum lost
Try to remove node dead node
Call read barrier.

Both operations should failed with exception

In [None]:

try:
    verification_node.run_nodetool(f"removenode {dead_node_host_ids[0]} --ignore-dead-nodes {','.join(dead_node_host_ids[1:])}")
except Exception as exc:
    print(f"Expected to fail. Error: {exc}")

try:
    verification_node.raft.call_read_barrier()
except Exception as exc:
    print(f"Expected to fail. Error: {exc}")


# Recovery procedure
Execute each step one by one
Use 'GET CLUSTER STATUS' cell to check cluster status between steps if need

## Step 2.
Stop one of alive node in alive dc.
Assume that it is the latest node in list

In [None]:
stopped_alive_node = nodes_by_region[alive_dc_region][-1]
stopped_node_host_id = stopped_alive_node.host_id
stopped_alive_node.stop_scylla()



## Step 4.
Choose alive nodes which will be used for cluster restore
Rolling restart of alive nodes

In [None]:
base_alive_nodes = nodes_by_region[alive_dc_region][:-1]
cluster.restart_scylla(nodes=base_alive_nodes)


## Step 6. 
Find recovery coordinator

In [None]:
result = verification_node.run_cqlsh("select value from system.scylla_local where key = 'raft_group0_id'", split=True)
group0_id = result[3]
commit_idx = 0
recovery_coordinator = None
for node in base_alive_nodes:
    cidx = verification_node.run_cqlsh(f"select commit_idx from system.raft where group_id = {group0_id}", split=True)[3]
    if int(cidx) > commit_idx:
        commit_idx = int(cidx)
        recovery_coordinator = node

print(recovery_coordinator.name, recovery_coordinator.ip_address, recovery_coordinator.host_id)

## Step 7. 
Remove raft_group_id from system.scylla_local and truncate system.discovery on each live node.

In [None]:
for node in base_alive_nodes:
    node.run_cqlsh("DELETE value FROM system.scylla_local WHERE key = 'raft_group0_id'")
    node.run_cqlsh("TRUNCATE system.discovery")
    

## Step 8.
Set the new scylla.yaml parameter, recovery_leader, to Host ID of the recovery leader on each live node

In [None]:
recovery_host_id = recovery_coordinator.host_id
for node in base_alive_nodes:
    node.remoter.sudo(f"""echo 'recovery_leader: {recovery_host_id}' | sudo tee -a /etc/scylla/scylla.yaml""")



## Step 9.
Rolling restart all live nodes, but the recovery leader must be restarted first.

In [None]:

cluster.restart_scylla(nodes=[recovery_coordinator])
cluster.restart_scylla(nodes=[node for node in base_alive_nodes if node != recovery_coordinator])


## Step 10.
Remove/replace all dead nodes


### step 10.1
Remove stopped node in alive dc. it is possible because each rack had 2 nodes

In [None]:
verification_node.run_nodetool(f"removenode {stopped_node_host_id} --ignore-dead-nodes {','.join(dead_node_host_ids)}")
cluster.nodes.remove(host_id_node_map[stopped_node_host_id])


### step 10.2
Replace all nodes in dead dc

In [None]:

replace_nodes_by_host_id(cluster, dead_node_host_ids_maps, verification_node)

#### Step 10.3
Add new nodes to racks


In [None]:
dc_idx = cluster.params.region_names.index(dead_dc_region)
for rack in range(2):
    new_node = add_node_to_dc(cluster, dc_idx, rack)



### 10.4
Add new node instead of remove node in alive dc

In [None]:
dc_idx = cluster.params.region_names.index(alive_dc_region)
new_node_instead_of_stopped = add_node_to_dc(cluster, dc_idx, rack=stopped_alive_node.rack)



## Step 11.
Unset recovery_leader on all nodes.

In [None]:
for node in base_alive_nodes:
    node.remoter.sudo(f"""sed -i  's/recovery_leader:.*/#recovery_leader:/' /etc/scylla/scylla.yaml""")


## Step 12.
Delete data of the old group 0 from system.raft, system.raft_snaphots, and system.raft_snapshot_config.

```CQL
DELETE FROM system.raft WHERE group_id = <old group 0 id>
DELETE FROM system.raft_snapshots WHERE group_id = <old group 0 id>
DELETE FROM system.raft_snapshot_config WHERE group_id = <old group 0 id>
```

In [None]:
for node in base_alive_nodes:
    node.run_cqlsh(f"DELETE FROM system.raft WHERE group_id = {group0_id}")
    node.run_cqlsh(f"DELETE FROM system.raft_snapshots WHERE group_id = {group0_id}")
    node.run_cqlsh(f"DELETE FROM system.raft_snapshot_config WHERE group_id = {group0_id}")

## Get cluster status


In [None]:
verification_node = nodes_by_region[alive_dc_region][0]
print(verification_node.run_nodetool("status"))
cluster.check_nodes_up_and_normal(verification_node=verification_node)

### Clean the cluster

In [None]:
tester_inst.params['post_behavior_db_nodes'] = 'destroy'
tester_inst.params['post_behavior_loader_nodes'] = 'destroy'
tester_inst.params['post_behavior_monitor_nodes'] = 'destroy'
tester_inst.params['execute_post_behavior'] = True

tester_inst.clean_resources()