Skip to content

Commit

Permalink
feature(adaptive-timeouts): calculate timeouts based on node load
Browse files Browse the repository at this point in the history
During testing scenarios with different machine types and load it's
impossible to have one timeout value that fits all. Various operations
take more time when node performance is lower or data load is higher.

Implement `adaptive_timeout` context manager that will calculate right
timeout value for given operation based on node load (performance, data
size, and other factors). It is not raising any exceptions, just returns
timeout value that may be used within context. When context takes longer
duration than it, publish `SoftTimeoutEvent` with `Error` severity.

Because calculating timeout is not straightforward, collect various
'load metrics' to Elasticearch db for future reference and testing.

`SoftTimeoutEvent` has been refactored - dropped the context-manager
related code as it is no longer needed.

Supported `adaptive_timout` operations:
- `DECOMMISSION` - timeout for nodetool decommision operation.
Currently simple implementation, used for publishing `SoftTimeoutEvent`
- `SOFT_TIMEOUT` - passing timeout provided in args.

Other operations (like REPAIR, REBUILD, FLUSH etc. are created
only for collecting timeout data in ES for future reference when
designing timeout calcluators).
  • Loading branch information
soyacz authored and fruch committed Apr 4, 2023
1 parent 630b8f6 commit 9697300
Show file tree
Hide file tree
Showing 8 changed files with 529 additions and 97 deletions.
7 changes: 4 additions & 3 deletions sdcm/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
from sdcm.sct_config import SCTConfiguration
from sdcm.sct_events.continuous_event import ContinuousEventsRegistry
from sdcm.utils import properties
from sdcm.utils.adaptive_timeouts import Operations, adaptive_timeout
from sdcm.utils.benchmarks import ScyllaClusterBenchmarkManager
from sdcm.utils.common import (
S3Storage,
Expand Down Expand Up @@ -106,7 +107,7 @@
from sdcm.sct_events import Severity
from sdcm.sct_events.base import LogEvent, add_severity_limit_rules, max_severity
from sdcm.sct_events.health import ClusterHealthValidatorEvent
from sdcm.sct_events.system import TestFrameworkEvent, INSTANCE_STATUS_EVENTS_PATTERNS, InfoEvent, SoftTimeoutEvent
from sdcm.sct_events.system import TestFrameworkEvent, INSTANCE_STATUS_EVENTS_PATTERNS, InfoEvent
from sdcm.sct_events.grafana import set_grafana_url
from sdcm.sct_events.database import SYSTEM_ERROR_EVENTS_PATTERNS, ScyllaHelpErrorEvent, ScyllaYamlUpdateEvent, SYSTEM_ERROR_EVENTS
from sdcm.sct_events.nodetool import NodetoolEvent
Expand Down Expand Up @@ -4577,8 +4578,8 @@ def get_node_ip_list(verification_node):
self.terminate_node(node) # pylint: disable=no-member
self.test_config.tester_obj().monitors.reconfigure_scylla_monitoring()

def decommission(self, node: BaseNode, timeout: int | float = None, soft_timeout: int | float = None):
with SoftTimeoutEvent(soft_timeout=soft_timeout, operation="decommission"):
def decommission(self, node: BaseNode, timeout: int | float = None):
with adaptive_timeout(operation=Operations.DECOMMISSION, node=node):
node.run_nodetool("decommission", timeout=timeout)
self.verify_decommission(node)

Expand Down
16 changes: 8 additions & 8 deletions sdcm/cluster_k8s/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from sdcm.sct_events.system import TestFrameworkEvent
from sdcm.utils import properties
import sdcm.utils.sstable.load_inventory as datasets
from sdcm.utils.adaptive_timeouts import adaptive_timeout, Operations
from sdcm.utils.common import download_from_github, shorten_cluster_name, walk_thru_data
from sdcm.utils.k8s import (
add_pool_node_affinity,
Expand Down Expand Up @@ -2679,7 +2680,7 @@ def terminate_node(self, node: BasePodContainer, scylla_shards=""): # pylint: d
self.nodes.remove(node)
node.destroy()

def decommission(self, node: BaseScyllaPodContainer, timeout: int | float = None, soft_timeout: int | float = None):
def decommission(self, node: BaseScyllaPodContainer, timeout: int | float = None):
rack = node.rack
rack_nodes = self.get_rack_nodes(rack)
assert rack_nodes[-1] == node, "Can withdraw the last node only"
Expand All @@ -2690,13 +2691,12 @@ def decommission(self, node: BaseScyllaPodContainer, timeout: int | float = None
# node deletion using "terminate_node" command.
scylla_shards = node.scylla_shards

# NOTE: on k8s we treat soft_timeout as the "hard" timeout,
# at least until we'll have case with bigger data sets in it
timeout = timeout or soft_timeout or (node.pod_terminate_timeout * 60)
self.replace_scylla_cluster_value(f"/spec/datacenter/racks/{rack}/members", current_members - 1)
self.k8s_cluster.kubectl(f"wait --timeout={timeout}s --for=delete pod {node.name}",
namespace=self.namespace,
timeout=timeout + 10)
timeout = timeout or (node.pod_terminate_timeout * 60)
with adaptive_timeout(operation=Operations.DECOMMISSION, node=node):
self.replace_scylla_cluster_value(f"/spec/datacenter/racks/{rack}/members", current_members - 1)
self.k8s_cluster.kubectl(f"wait --timeout={timeout}s --for=delete pod {node.name}",
namespace=self.namespace,
timeout=timeout + 10)
self.terminate_node(node, scylla_shards=scylla_shards)
if current_members == 1:
self._delete_k8s_rack(rack)
Expand Down
60 changes: 37 additions & 23 deletions sdcm/nemesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
MAX_TIME_WAIT_FOR_DECOMMISSION,
NodeSetupFailed,
NodeSetupTimeout,
NodeStayInClusterAfterDecommission,
NodeStayInClusterAfterDecommission, HOUR_IN_SEC,
)
from sdcm.cluster_k8s import (
KubernetesOps,
Expand Down Expand Up @@ -84,6 +84,7 @@
from sdcm.sct_events.system import InfoEvent
from sdcm.sla.sla_tests import SlaTests
from sdcm.utils import cdc
from sdcm.utils.adaptive_timeouts import adaptive_timeout, Operations
from sdcm.utils.common import (get_db_tables, generate_random_string,
update_certificates, reach_enospc_on_node, clean_enospc_on_node,
parse_nodetool_listsnapshots,
Expand Down Expand Up @@ -1202,7 +1203,8 @@ def _add_and_init_new_cluster_node(self, old_node_ip=None, host_id=None,
else:
new_node.replacement_node_ip = old_node_ip
try:
self.cluster.wait_for_init(node_list=[new_node], timeout=timeout, check_node_health=False)
with adaptive_timeout(Operations.NEW_NODE, node=self.cluster.nodes[0], timeout=timeout):
self.cluster.wait_for_init(node_list=[new_node], timeout=timeout, check_node_health=False)
self.cluster.clean_replacement_node_options(new_node)
except (NodeSetupFailed, NodeSetupTimeout):
self.log.warning("TestConfig of the '%s' failed, removing it from list of nodes" % new_node)
Expand All @@ -1222,7 +1224,7 @@ def _nodetool_decommission(self, add_node=True):
if self._is_it_on_kubernetes():
self.set_target_node(allow_only_last_node_in_rack=True)
target_is_seed = self.target_node.is_seed
self.cluster.decommission(self.target_node, soft_timeout=MAX_TIME_WAIT_FOR_DECOMMISSION)
self.cluster.decommission(self.target_node)
new_node = None
if add_node:
# When adding node after decommission the node is declared as up only after it completed bootstrapping,
Expand All @@ -1237,8 +1239,9 @@ def _nodetool_decommission(self, add_node=True):
try:
test_keyspaces = self.cluster.get_test_keyspaces()
for node in self.cluster.nodes:
for keyspace in test_keyspaces:
node.run_nodetool(sub_cmd='cleanup', args=keyspace)
with adaptive_timeout(Operations.CLEANUP, node=node, timeout=HOUR_IN_SEC * 48):
for keyspace in test_keyspaces:
node.run_nodetool(sub_cmd='cleanup', args=keyspace)
finally:
self.unset_current_running_nemesis(new_node)
return new_node
Expand Down Expand Up @@ -1432,7 +1435,7 @@ def _disrupt_kubernetes_then_decommission_and_add_scylla_node(self, disruption_m
getattr(node, disruption_method)()

self.log.info('Decommission %s', node)
self.cluster.decommission(node, soft_timeout=MAX_TIME_WAIT_FOR_DECOMMISSION)
self.cluster.decommission(node, timeout=MAX_TIME_WAIT_FOR_DECOMMISSION)

new_node = self.add_new_node(rack=node.rack)
self.unset_current_running_nemesis(new_node)
Expand Down Expand Up @@ -1542,7 +1545,8 @@ def disrupt_no_corrupt_repair(self):
thread.result()

def disrupt_major_compaction(self):
self.target_node.run_nodetool("compact")
with adaptive_timeout(Operations.MAJOR_COMPACT, self.target_node, timeout=7200):
self.target_node.run_nodetool("compact")

def disrupt_load_and_stream(self):
# Checking the columns number of keyspace1.standard1
Expand Down Expand Up @@ -1815,18 +1819,21 @@ def call_next_nemesis(self):
@latency_calculator_decorator(legend="Run repair process with nodetool repair")
def repair_nodetool_repair(self, node=None, publish_event=True):
node = node if node else self.target_node
node.run_nodetool(sub_cmd="repair", publish_event=publish_event)
with adaptive_timeout(Operations.REPAIR, node, timeout=HOUR_IN_SEC * 48):
node.run_nodetool(sub_cmd="repair", publish_event=publish_event)

def repair_nodetool_rebuild(self):
self.target_node.run_nodetool('rebuild')
with adaptive_timeout(Operations.REBUILD, self.target_node, timeout=HOUR_IN_SEC * 48):
self.target_node.run_nodetool('rebuild')

def disrupt_nodetool_cleanup(self):
# This fix important when just user profile is run in the test and "keyspace1" doesn't exist.
test_keyspaces = self.cluster.get_test_keyspaces()
for node in self.cluster.nodes:
InfoEvent('NodetoolCleanupMonkey %s' % node).publish()
for keyspace in test_keyspaces:
node.run_nodetool(sub_cmd="cleanup", args=keyspace)
with adaptive_timeout(Operations.CLEANUP, node, timeout=HOUR_IN_SEC * 48):
for keyspace in test_keyspaces:
node.run_nodetool(sub_cmd="cleanup", args=keyspace)

def _prepare_test_table(self, ks='keyspace1', table=None):
ks_cfs = self.cluster.get_non_system_ks_cf_list(db_node=self.target_node)
Expand Down Expand Up @@ -1858,7 +1865,8 @@ def disrupt_truncate(self):
self._prepare_test_table(ks=keyspace_truncate)

# In order to workaround issue #4924 when truncate timeouts, we try to flush before truncate.
self.target_node.run_nodetool("flush")
with adaptive_timeout(Operations.FLUSH, self.target_node, timeout=HOUR_IN_SEC * 2):
self.target_node.run_nodetool("flush")
# do the actual truncation
truncate_timeout = 600
truncate_cmd_timeout_suffix = self._truncate_cmd_timeout_suffix(truncate_timeout)
Expand All @@ -1883,7 +1891,8 @@ def disrupt_truncate_large_partition(self):
self.tester.verify_stress_thread(bench_thread)

# In order to workaround issue #4924 when truncate timeouts, we try to flush before truncate.
self.target_node.run_nodetool("flush")
with adaptive_timeout(Operations.FLUSH, self.target_node, timeout=HOUR_IN_SEC * 2):
self.target_node.run_nodetool("flush")
# do the actual truncation
truncate_timeout = 600
truncate_cmd_timeout_suffix = self._truncate_cmd_timeout_suffix(truncate_timeout)
Expand Down Expand Up @@ -3073,7 +3082,8 @@ def remove_node():
rnd_node = random.choice([n for n in self.cluster.nodes if n is not self.target_node])
self.log.info("Running removenode command on {}, Removing node with the following host_id: {}"
.format(rnd_node.ip_address, host_id))
res = rnd_node.run_nodetool("removenode {}".format(host_id), ignore_status=True, verbose=True)
with adaptive_timeout(Operations.REMOVE_NODE, rnd_node, timeout=HOUR_IN_SEC * 48):
res = rnd_node.run_nodetool("removenode {}".format(host_id), ignore_status=True, verbose=True)
if res.failed and re.match(removenode_reject_msg, res.stdout + res.stderr):
raise Exception(f"Removenode was rejected {res.stdout}\n{res.stderr}")

Expand Down Expand Up @@ -3128,8 +3138,9 @@ def remove_node():
try:
test_keyspaces = self.cluster.get_test_keyspaces()
for node in self.cluster.nodes:
for keyspace in test_keyspaces:
node.run_nodetool(sub_cmd='cleanup', args=keyspace)
with adaptive_timeout(Operations.CLEANUP, node, timeout=HOUR_IN_SEC * 48):
for keyspace in test_keyspaces:
node.run_nodetool(sub_cmd='cleanup', args=keyspace)
finally:
self.unset_current_running_nemesis(new_node)

Expand Down Expand Up @@ -3445,7 +3456,8 @@ def start_and_interrupt_repair_streaming(self):
delay=1
)
ParallelObject(objects=[trigger, watcher], timeout=timeout).call_objects()
self.target_node.run_nodetool("rebuild")
with adaptive_timeout(Operations.REBUILD, self.target_node, timeout=HOUR_IN_SEC * 48):
self.target_node.run_nodetool("rebuild")

def start_and_interrupt_rebuild_streaming(self):
"""
Expand Down Expand Up @@ -3475,7 +3487,8 @@ def start_and_interrupt_rebuild_streaming(self):
)
ParallelObject(objects=[trigger, watcher], timeout=timeout + 60).call_objects()
self.target_node.wait_db_up(timeout=300)
self.target_node.run_nodetool("rebuild")
with adaptive_timeout(Operations.REBUILD, self.target_node, timeout=HOUR_IN_SEC * 48):
self.target_node.run_nodetool("rebuild")

def disrupt_decommission_streaming_err(self):
"""
Expand Down Expand Up @@ -3523,7 +3536,7 @@ def disrupt_corrupt_then_scrub(self):
will be skipped.
"""
self.log.debug("Rebuild sstables by scrub with `--skip-corrupted`, corrupted partitions will be skipped.")
with ignore_scrub_invalid_errors():
with ignore_scrub_invalid_errors(), adaptive_timeout(Operations.SCRUB, self.target_node, timeout=HOUR_IN_SEC * 48):
for ks in self.cluster.get_test_keyspaces():
self.target_node.run_nodetool("scrub", args=f"--skip-corrupted {ks}")

Expand All @@ -3535,7 +3548,7 @@ def add_new_node(self, rack=0):

@latency_calculator_decorator(legend="Decommission node: remove node from cluster")
def decommission_node(self, node):
self.cluster.decommission(node, soft_timeout=MAX_TIME_WAIT_FOR_DECOMMISSION)
self.cluster.decommission(node)

def decommission_nodes(self, add_nodes_number, rack, is_seed: Optional[Union[bool, DefaultValue]] = DefaultValue,
dc_idx: Optional[int] = None):
Expand Down Expand Up @@ -3914,7 +3927,7 @@ def disrupt_add_remove_dc(self) -> None:
cluster_node.run_nodetool(sub_cmd="repair -pr", publish_event=True)
datacenters = list(self.tester.db_cluster.get_nodetool_status().keys())
self._write_read_data_to_multi_dc_keyspace(datacenters)
self.cluster.decommission(new_node, soft_timeout=MAX_TIME_WAIT_FOR_DECOMMISSION)
self.cluster.decommission(new_node)
node_added = False
datacenters = list(self.tester.db_cluster.get_nodetool_status().keys())
assert not [dc for dc in datacenters if dc.endswith("_nemesis_dc")], "new datacenter was not unregistered"
Expand All @@ -3923,7 +3936,7 @@ def disrupt_add_remove_dc(self) -> None:
with self.cluster.cql_connection_patient(node) as session:
session.execute('DROP KEYSPACE IF EXISTS keyspace_new_dc')
if node_added:
self.cluster.decommission(new_node, soft_timeout=MAX_TIME_WAIT_FOR_DECOMMISSION)
self.cluster.decommission(new_node)

def get_cassandra_stress_write_cmds(self):
write_cmds = self.tester.params.get("prepare_write_cmd")
Expand Down Expand Up @@ -4140,7 +4153,8 @@ def disrupt_create_index(self):
raise UnsupportedNemesis( # pylint: disable=raise-missing-from
"Tried to create already existing index. See log for details")
try:
wait_for_index_to_be_built(self.target_node, ks, index_name, timeout=7200)
with adaptive_timeout(operation=Operations.CREATE_INDEX, node=self.target_node, timeout=7200) as timeout:
wait_for_index_to_be_built(self.target_node, ks, index_name, timeout=timeout)
verify_query_by_index_works(session, ks, cf, column)
sleep_for_percent_of_duration(self.tester.test_duration * 60, percent=1,
min_duration=300, max_duration=2400)
Expand Down
44 changes: 3 additions & 41 deletions sdcm/sct_events/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import re
import sys
import time
import datetime
from typing import Any, Optional, Sequence, Type, List, Tuple
from traceback import format_stack

Expand Down Expand Up @@ -99,47 +98,10 @@ def msgfmt(self) -> str:


class SoftTimeoutEvent(TestFrameworkEvent):
"""
To be used as a context manager to raise an error event if `operation` took more the `soft_timeout`
Example:
>>> with SoftTimeoutEvent(soft_timeout=0.1, operation="long-one") as event:
... time.sleep(1) # do that long operation that takes more then `soft_timeout`
would raise event like, with a traceback where it happened:
SoftTimeoutEvent Severity.ERROR) period_type=one-time event_id=2cf14ba9-b2a0-402d-bc4f-ac102e9e51ff,
source=SoftTimeout message=operation 'long-one' took 0:00:01.000, soft_timeout=0:00:00.100
Traceback (most recent call last):
...
File "/home/fruch/projects/scylla-cluster-tests/unit_tests/test_events.py", line 462, in test_soft_timeout
with SoftTimeoutEvent(soft_timeout=0.1, operation="long-one") as event:
"""

def __init__(self, operation: str, soft_timeout: int | float):
super().__init__(source='SoftTimeout')
self.operation = operation
self.soft_timeout = soft_timeout
self.start_time = None

def __enter__(self):
if self.soft_timeout:
self.start_time = time.time()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if self.start_time and self.soft_timeout:
duration = (time.time() - self.start_time)
if self.soft_timeout < duration:
self.trace = "".join(format_stack(sys._getframe().f_back))
self.message = (f"operation '{self.operation}' took "
f"{str(datetime.timedelta(seconds=duration))[:-3]}, "
f"soft_timeout={str(datetime.timedelta(seconds=self.soft_timeout))[:-3]}")
self.severity = Severity.ERROR
self.publish_or_dump()
else:
self.dont_publish()
def __init__(self, operation: str, duration: int | float, soft_timeout: int | float):
message = f"operation '{operation}' took {duration}s and soft-timeout was set to {soft_timeout}s"
super().__init__(source='SoftTimeout', severity=Severity.ERROR, trace=sys._getframe().f_back, message=message)


class ElasticsearchEvent(InformationalEvent):
Expand Down
Loading

0 comments on commit 9697300

Please sign in to comment.