diff --git a/poetry.lock b/poetry.lock index b663c41..4bd1f1e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -549,7 +549,7 @@ from = ["pytablereader (>=0.30.0,<2)"] html = ["dominate (>=2.1.5,<3)"] logging = ["loguru (>=0.4.1,<1)"] sqlite = ["SimpleSQLite (>=1.1.3,<2)"] -test = ["sqliteschema", "dominate (>=2.1.5,<3)", "pytest-discord (>=0.0.7)", "toml (>=0.9.3,<1)", "SimpleSQLite (>=1.1.3,<2)", "pytablereader (>=0.30.0,<2)", "pytablewriter-altrow-theme (>=0.0.2,<1)", "pytest (>=6.0.1)", "pytest-md-report (>=0.1)", "loguru (>=0.4.1,<1)", "PyYAML (>=3.11,<6)", "elasticsearch (>=7.0.5,<8)", "simplejson (>=3.8.1,<4)", "tablib", "XlsxWriter (>=0.9.6,<2)", "pytablereader[sqlite,excel] (>=0.29)", "xlwt"] +test = ["sqliteschema", "dominate (>=2.1.5,<3)", "pytest-discord (>=0.0.7)", "toml (>=0.9.3,<1)", "SimpleSQLite (>=1.1.3,<2)", "pytablereader (>=0.30.0,<2)", "pytablewriter-altrow-theme (>=0.0.2,<1)", "pytest (>=6.0.1)", "pytest-md-report (>=0.1)", "loguru (>=0.4.1,<1)", "PyYAML (>=3.11,<6)", "elasticsearch (>=7.0.5,<8)", "simplejson (>=3.8.1,<4)", "tablib", "XlsxWriter (>=0.9.6,<2)", "pytablereader[excel,sqlite] (>=0.29)", "xlwt"] theme = ["pytablewriter-altrow-theme (>=0.0.2,<1)"] toml = ["toml (>=0.9.3,<1)"] yaml = ["PyYAML (>=3.11,<6)"] @@ -737,6 +737,22 @@ category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" +[[package]] +name = "sshtunnel" +version = "0.4.0" +description = "Pure python SSH tunnels" +category = "main" +optional = false +python-versions = "*" + +[package.dependencies] +paramiko = ">=2.7.2" + +[package.extras] +build_sphinx = ["sphinx", "sphinxcontrib-napoleon"] +dev = ["check-manifest"] +test = ["tox (>=1.8.1)"] + [[package]] name = "tabledata" version = "1.1.4" @@ -874,7 +890,7 @@ testing = ["pytest (>=4.6)", "pytest-checkdocs (>=1.2.3)", "pytest-flake8", "pyt [metadata] lock-version = "1.1" python-versions = "^3.6.1" -content-hash = "eb458dd6547915bf61c597fa42053f236fb10117bc88ce7226d22d0abd62d1df" +content-hash = "f0ff6922116276ce9ff89cf7a6b5e54573f670ce22f870dcfac3a2e5860e5af5" [metadata.files] appdirs = [ @@ -1357,6 +1373,11 @@ smmap = [ {file = "smmap-3.0.5-py2.py3-none-any.whl", hash = "sha256:7bfcf367828031dc893530a29cb35eb8c8f2d7c8f2d0989354d75d24c8573714"}, {file = "smmap-3.0.5.tar.gz", hash = "sha256:84c2751ef3072d4f6b2785ec7ee40244c6f45eb934d9e543e2c51f1bd3d54c50"}, ] +sshtunnel = [ + {file = "sshtunnel-0.4.0-py2.py3-none-any.whl", hash = "sha256:98e54c26f726ab8bd42b47a3a21fca5c3e60f58956f0f70de2fb8ab0046d0606"}, + {file = "sshtunnel-0.4.0-py3.8.egg", hash = "sha256:4a07cf989712e8ba76a370584bec922d14775054b3ff18e5d075507a298dc3ed"}, + {file = "sshtunnel-0.4.0.tar.gz", hash = "sha256:e7cb0ea774db81bf91844db22de72a40aae8f7b0f9bb9ba0f666d474ef6bf9fc"}, +] tabledata = [ {file = "tabledata-1.1.4-py3-none-any.whl", hash = "sha256:c0902a7f90130b362a6e29b2faaca09bc468efec0f53e8b01a3e5ab607554ea9"}, {file = "tabledata-1.1.4.tar.gz", hash = "sha256:76984fd93d9cb05258a03941ecd9d95d549379b35294b3dfc21c844ed06a67b2"}, diff --git a/pyproject.toml b/pyproject.toml index 8598627..c795080 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "redisbench-admin" -version = "0.2.2" +version = "0.2.3" description = "Redis benchmark run helper. A wrapper around Redis and Redis Modules benchmark tools ( ftsb_redisearch, memtier_benchmark, redis-benchmark, aibench, etc... )." authors = ["filipecosta90 "] readme = "README.md" @@ -27,6 +27,7 @@ GitPython = "^3.1.12" PyYAML = "^5.4.0" wget = "^3.2" pytablewriter = "^0.60.0" +sshtunnel = "^0.4.0" [tool.poetry.dev-dependencies] pytest = "^4.6" diff --git a/redisbench_admin/cli.py b/redisbench_admin/cli.py index 81e9e46..3da37c4 100644 --- a/redisbench_admin/cli.py +++ b/redisbench_admin/cli.py @@ -19,6 +19,8 @@ from redisbench_admin.run_local.run_local import run_local_command_logic from redisbench_admin.run_remote.args import create_run_remote_arguments, LOG_LEVEL from redisbench_admin.run_remote.run_remote import run_remote_command_logic +from redisbench_admin.watchdog.args import create_watchdog_arguments +from redisbench_admin.watchdog.watchdog import watchdog_command_logic def populate_with_poetry_data(): @@ -73,6 +75,8 @@ def main(): parser = create_extract_arguments(parser) elif requested_tool == "export": parser = create_export_arguments(parser) + elif requested_tool == "watchdog": + parser = create_watchdog_arguments(parser) elif requested_tool == "--version": print_version(project_name, project_version) sys.exit(0) @@ -80,7 +84,13 @@ def main(): print_help(project_name, project_version) sys.exit(0) else: - valid_tool_options = ["run-local", "run-remote", "export", "extract"] + valid_tool_options = [ + "run-local", + "run-remote", + "export", + "extract", + "watchdog", + ] print_invalid_tool_option(requested_tool, valid_tool_options) sys.exit(1) @@ -94,6 +104,8 @@ def main(): export_command_logic(args) if requested_tool == "extract": extract_command_logic(args) + if requested_tool == "watchdog": + watchdog_command_logic(args) def print_invalid_tool_option(requested_tool, valid_tool_options): diff --git a/redisbench_admin/compare/__init__.py b/redisbench_admin/compare/__init__.py new file mode 100644 index 0000000..4a70c9e --- /dev/null +++ b/redisbench_admin/compare/__init__.py @@ -0,0 +1,5 @@ +# Apache License Version 2.0 +# +# Copyright (c) 2021., Redis Labs Modules +# All rights reserved. +# diff --git a/redisbench_admin/export/common/common.py b/redisbench_admin/export/common/common.py index e9ab6fc..fef242c 100644 --- a/redisbench_admin/export/common/common.py +++ b/redisbench_admin/export/common/common.py @@ -72,10 +72,16 @@ def split_key_metrics_by_step(key_metrics_specs): return key_metrics_by_step -def get_or_none(dictionary, prop: str): +def get_or_none(dictionary, prop: str, inner_prop=None): result = None if prop in dictionary: result = dictionary[prop] + if inner_prop is not None: + if inner_prop in result: + result = result[inner_prop] + else: + None + return result diff --git a/redisbench_admin/profilers/perf.py b/redisbench_admin/profilers/perf.py index 8ae7b55..bd4377e 100644 --- a/redisbench_admin/profilers/perf.py +++ b/redisbench_admin/profilers/perf.py @@ -10,6 +10,7 @@ import subprocess import time + from redisbench_admin.profilers.pprof import ( PPROF_FORMAT_TEXT, run_pprof, @@ -19,6 +20,18 @@ from redisbench_admin.utils.utils import whereis +PERF_CALLGRAPH_MODE_DEFAULT = "lbr" +LINUX_PERF_SETTINGS_MESSAGE = ( + "If running in non-root user please confirm that you have:\n" + + " - access to Kernel address maps." + + " Check if `0` ( disabled ) appears from the output of `cat /proc/sys/kernel/kptr_restrict`\n" + + ' If not then fix via: `sudo sh -c " echo 0 > /proc/sys/kernel/kptr_restrict"`\n' + + " - permission to collect stats." + + " Check if `-1` appears from the output of `cat /proc/sys/kernel/perf_event_paranoid`\n" + + " If not then fix via: `sudo sh -c 'echo -1 > /proc/sys/kernel/perf_event_paranoid'`\n" +) + + class Perf: def __init__(self): """ @@ -29,13 +42,11 @@ def __init__(self): if not self.perf: self.perf = "perf" - self.stack_collapser = os.getenv("STACKCOLLAPSE_PATH") - if not self.stack_collapser: - self.stack_collapser = STACKCOLLAPSE_PATH - - self.flamegraph_utity = os.getenv("FLAMEGRAPH_PATH") - if not self.flamegraph_utity: - self.flamegraph_utity = FLAMEGRAPH_PATH + self.stack_collapser = os.getenv("STACKCOLLAPSE_PATH", STACKCOLLAPSE_PATH) + self.flamegraph_utity = os.getenv("FLAMEGRAPH_PATH", FLAMEGRAPH_PATH) + self.callgraph_mode = os.getenv( + "PERF_CALLGRAPH_MODE", PERF_CALLGRAPH_MODE_DEFAULT + ) self.output = None self.profiler_process = None @@ -44,6 +55,7 @@ def __init__(self): self.profiler_process_exit_code = None self.trace_file = None self.collapsed_stack_file = None + self.stack_collapse_file = None self.collapsed_stacks = [] self.pid = None self.started_profile = False @@ -71,7 +83,7 @@ def retrieve_perf_version(self): self.version_minor = m.group(2) return m, self.version_major, self.version_minor - def generate_record_command(self, pid, output, frequency=None, call_graph="lbr"): + def generate_record_command(self, pid, output, frequency=None): self.output = output self.pid = pid cmd = [ @@ -85,7 +97,7 @@ def generate_record_command(self, pid, output, frequency=None, call_graph="lbr") "--output", output, "--call-graph", - call_graph, + self.callgraph_mode, ] if frequency: cmd += ["--freq", "{}".format(frequency)] @@ -172,13 +184,7 @@ def stop_profile(self, **kwargs): "Profiler process exit with error. Exit code: {}\n\n".format( self.profiler_process_exit_code ) - + "If running in non-root user please confirm that you have:\n" - + " - access to Kernel address maps." - + " Check if `0` ( disabled ) appears from the output of `cat /proc/sys/kernel/kptr_restrict`\n" - + ' If not then fix via: `sudo sh -c " echo 0 > /proc/sys/kernel/kptr_restrict"`\n' - + " - permission to collect stats." - + " Check if `-1` appears from the output of `cat /proc/sys/kernel/perf_event_paranoid`\n" - + " If not then fix via: `sudo sh -c 'echo -1 > /proc/sys/kernel/perf_event_paranoid'`\n" + + LINUX_PERF_SETTINGS_MESSAGE ) except OSError as e: diff --git a/redisbench_admin/run/common.py b/redisbench_admin/run/common.py index 6fbc572..de3ee5b 100644 --- a/redisbench_admin/run/common.py +++ b/redisbench_admin/run/common.py @@ -6,6 +6,7 @@ import datetime as dt import logging +import os from redisbench_admin.run.redis_benchmark.redis_benchmark import ( prepare_redis_benchmark_command, @@ -28,6 +29,8 @@ extract_perbranch_timeseries_from_results, ) +BENCHMARK_REPETITIONS = int(os.getenv("BENCHMARK_REPETITIONS", 1)) + def prepare_benchmark_parameters( benchmark_config, @@ -221,3 +224,16 @@ def get_start_time_vars(start_time=None): start_time_ms = int((start_time - dt.datetime(1970, 1, 1)).total_seconds() * 1000) start_time_str = start_time.strftime("%Y-%m-%d-%H-%M-%S") return start_time, start_time_ms, start_time_str + + +def execute_init_commands(benchmark_config, r, dbconfig_keyname="dbconfig"): + cmds = None + if dbconfig_keyname in benchmark_config: + for k in benchmark_config[dbconfig_keyname]: + if "init_commands" in k: + cmds = k["init_commands"] + if cmds is not None: + for cmd in cmds: + cmd_split = cmd.split(None, 2) + stdout = r.execute_command(*cmd_split) + print(stdout) diff --git a/redisbench_admin/run_local/run_local.py b/redisbench_admin/run_local/run_local.py index 69cd3a2..c7d92cc 100644 --- a/redisbench_admin/run_local/run_local.py +++ b/redisbench_admin/run_local/run_local.py @@ -23,6 +23,7 @@ from redisbench_admin.run.common import ( prepare_benchmark_parameters, get_start_time_vars, + execute_init_commands, ) from redisbench_admin.run_local.args import PROFILE_FREQ from redisbench_admin.utils.benchmark_config import ( @@ -149,7 +150,6 @@ def run_local_command_logic(args): r = redis.StrictRedis(port=args.port) stdout = r.execute_command("info modules") - print(stdout) ( module_names, _, @@ -525,16 +525,3 @@ def which_local(benchmark_tool, executable, full_path, which_benchmark_tool): which_benchmark_tool = full_path_filename break return which_benchmark_tool - - -def execute_init_commands(benchmark_config, r, dbconfig_keyname="dbconfig"): - cmds = None - if dbconfig_keyname in benchmark_config: - for k in benchmark_config[dbconfig_keyname]: - if "init_commands" in k: - cmds = k["init_commands"] - if cmds is not None: - for cmd in cmds: - cmd_split = cmd.split(None, 2) - stdout = r.execute_command(*cmd_split) - print(stdout) diff --git a/redisbench_admin/run_remote/args.py b/redisbench_admin/run_remote/args.py index 13dd9a2..4541f39 100644 --- a/redisbench_admin/run_remote/args.py +++ b/redisbench_admin/run_remote/args.py @@ -8,16 +8,21 @@ import socket # environment variables -PERFORMANCE_RTS_PUSH = bool(os.getenv("PUSH_RTS", False)) -PERFORMANCE_RTS_AUTH = os.getenv("PERFORMANCE_RTS_AUTH", None) -PERFORMANCE_RTS_HOST = os.getenv("PERFORMANCE_RTS_HOST", 6379) -PERFORMANCE_RTS_PORT = os.getenv("PERFORMANCE_RTS_PORT", None) -TERRAFORM_BIN_PATH = os.getenv("TERRAFORM_BIN_PATH", "terraform") +from redisbench_admin.utils.remote import ( + TERRAFORM_BIN_PATH, + PERFORMANCE_RTS_HOST, + PERFORMANCE_RTS_PORT, + PERFORMANCE_RTS_AUTH, + PERFORMANCE_RTS_PUSH, +) LOG_LEVEL = logging.INFO if os.getenv("VERBOSE", "1") == "0": LOG_LEVEL = logging.WARN +DEFAULT_TRIGGERING_ENV = socket.gethostname() +TRIGGERING_ENV = os.getenv("TRIGGERING_ENV", DEFAULT_TRIGGERING_ENV) + def create_run_remote_arguments(parser): parser.add_argument("--module_path", type=str, required=True) @@ -45,7 +50,7 @@ def create_run_remote_arguments(parser): "You can use `--required-module` more than once", ) parser.add_argument("--github_branch", type=str, default=None, nargs="?", const="") - parser.add_argument("--triggering_env", type=str, default=socket.gethostname()) + parser.add_argument("--triggering_env", type=str, default=TRIGGERING_ENV) parser.add_argument("--terraform_bin_path", type=str, default=TERRAFORM_BIN_PATH) parser.add_argument("--setup_name_sufix", type=str, default="") parser.add_argument( diff --git a/redisbench_admin/run_remote/run_remote.py b/redisbench_admin/run_remote/run_remote.py index 3b69fdd..c4f8d7a 100644 --- a/redisbench_admin/run_remote/run_remote.py +++ b/redisbench_admin/run_remote/run_remote.py @@ -10,6 +10,7 @@ import sys import traceback +import paramiko import redis from python_terraform import Terraform from redistimeseries.client import Client @@ -19,6 +20,8 @@ run_remote_benchmark, common_exporter_logic, get_start_time_vars, + execute_init_commands, + BENCHMARK_REPETITIONS, ) from redisbench_admin.run.redis_benchmark.redis_benchmark import ( redis_benchmark_ensure_min_version_remote, @@ -50,10 +53,13 @@ retrieve_tf_connection_vars, get_project_ts_tags, get_overall_dashboard_keynames, + EC2_PRIVATE_PEM, + check_ec2_env, ) +from sshtunnel import SSHTunnelForwarder # internal aux vars -from redisbench_admin.utils.utils import get_ts_metric_name +from redisbench_admin.utils.utils import get_ts_metric_name, wait_for_conn redisbenchmark_go_link = ( "https://s3.amazonaws.com/benchmarks.redislabs/" @@ -67,16 +73,6 @@ private_key = "/tmp/benchmarks.redislabs.pem" min_recommended_benchmark_duration = 60 -# environment variables -PERFORMANCE_RTS_AUTH = os.getenv("PERFORMANCE_RTS_AUTH", None) -PERFORMANCE_RTS_HOST = os.getenv("PERFORMANCE_RTS_HOST", 6379) -PERFORMANCE_RTS_PORT = os.getenv("PERFORMANCE_RTS_PORT", None) -TERRAFORM_BIN_PATH = os.getenv("TERRAFORM_BIN_PATH", "terraform") -EC2_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID", None) -EC2_REGION = os.getenv("AWS_DEFAULT_REGION", None) -EC2_SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", None) -EC2_PRIVATE_PEM = os.getenv("EC2_PRIVATE_PEM", None) - # noinspection PyBroadException def setup_remote_benchmark_tool_requirements_tsbs( @@ -223,15 +219,7 @@ def run_remote_command_logic(args): local_module_file = args.module_path if args.skip_env_vars_verify is False: - if EC2_ACCESS_KEY is None or EC2_ACCESS_KEY == "": - logging.error("missing required AWS_ACCESS_KEY_ID env variable") - exit(1) - if EC2_REGION is None or EC2_REGION == "": - logging.error("missing required AWS_DEFAULT_REGION env variable") - exit(1) - if EC2_SECRET_KEY is None or EC2_SECRET_KEY == "": - logging.error("missing required AWS_SECRET_ACCESS_KEY env variable") - exit(1) + check_ec2_env() if EC2_PRIVATE_PEM is None or EC2_PRIVATE_PEM == "": logging.error("missing required EC2_PRIVATE_PEM env variable") @@ -302,349 +290,386 @@ def run_remote_command_logic(args): password=args.redistimesies_pass, ) rts.redis.ping() - - for test_name, benchmark_config in benchmark_definitions.items(): - s3_bucket_path = get_test_s3_bucket_path( - s3_bucket_name, test_name, tf_github_org, tf_github_repo - ) - - if "remote" in benchmark_config: - ( - remote_setup, - deployment_type, - remote_id, - ) = fetch_remote_setup_from_config(benchmark_config["remote"]) - logging.info( - "Deploying test {} on AWS using {}".format(test_name, remote_setup) + for repetition in range(1, BENCHMARK_REPETITIONS): + for test_name, benchmark_config in benchmark_definitions.items(): + s3_bucket_path = get_test_s3_bucket_path( + s3_bucket_name, test_name, tf_github_org, tf_github_repo ) - tf_setup_name = "{}{}".format(remote_setup, tf_setup_name_sufix) - logging.info("Using full setup name: {}".format(tf_setup_name)) - if remote_id not in remote_envs: - # check if terraform is present - tf = Terraform( - working_dir=remote_setup, - terraform_bin_path=tf_bin_path, - ) - ( - tf_return_code, - username, - server_private_ip, - server_public_ip, - server_plaintext_port, - client_private_ip, - client_public_ip, - ) = setup_remote_environment( - tf, - tf_github_sha, - tf_github_actor, - tf_setup_name, - tf_github_org, - tf_github_repo, - tf_triggering_env, - ) - remote_envs[remote_id] = tf - else: - logging.info("Reusing remote setup {}".format(remote_id)) - tf = remote_envs[remote_id] + + if "remote" in benchmark_config: ( - tf_return_code, - username, - server_private_ip, - server_public_ip, - server_plaintext_port, - client_private_ip, - client_public_ip, - ) = retrieve_tf_connection_vars(None, tf) - commands = [ - "redis-cli -h {} -p {} shutdown".format( - server_private_ip, server_plaintext_port + remote_setup, + deployment_type, + remote_id, + ) = fetch_remote_setup_from_config(benchmark_config["remote"]) + logging.info( + "Repetition {} of {}. Deploying test {} on AWS using {}".format( + repetition, BENCHMARK_REPETITIONS, test_name, remote_setup ) - ] - execute_remote_commands( - server_public_ip, username, private_key, commands ) - # after we've created the env, even on error we should always teardown - # in case of some unexpected error we fail the test - try: + tf_setup_name = "{}{}".format(remote_setup, tf_setup_name_sufix) + logging.info("Using full setup name: {}".format(tf_setup_name)) + if remote_id not in remote_envs: + # check if terraform is present + tf = Terraform( + working_dir=remote_setup, + terraform_bin_path=tf_bin_path, + ) + ( + tf_return_code, + username, + server_private_ip, + server_public_ip, + server_plaintext_port, + client_private_ip, + client_public_ip, + ) = setup_remote_environment( + tf, + tf_github_sha, + tf_github_actor, + tf_setup_name, + tf_github_org, + tf_github_repo, + tf_triggering_env, + ) + remote_envs[remote_id] = tf + else: + logging.info("Reusing remote setup {}".format(remote_id)) + tf = remote_envs[remote_id] + ( + tf_return_code, + username, + server_private_ip, + server_public_ip, + server_plaintext_port, + client_private_ip, + client_public_ip, + ) = retrieve_tf_connection_vars(None, tf) - redis_configuration_parameters = extract_redis_configuration_parameters( - benchmark_config, "dbconfig" - ) + local_redis_conn, ssh_tunnel = ssh_tunnel_redisconn( + server_plaintext_port, + server_private_ip, + server_public_ip, + username, + ) + local_redis_conn.shutdown() + ssh_tunnel.close() # Close the tunnel - # setup Redis - spin_up_standalone_remote_redis( - benchmark_config, - server_public_ip, - username, - private_key, - local_module_file, - remote_module_file, - remote_dataset_file, - dirname, - redis_configuration_parameters, - ) - module_names, artifact_versions = extract_artifact_version_remote( - server_public_ip, server_plaintext_port, username, private_key - ) - check_required_modules(module_names, required_modules) + # after we've created the env, even on error we should always teardown + # in case of some unexpected error we fail the test + try: - artifact_version = artifact_versions[0] - ( - benchmark_min_tool_version, - benchmark_min_tool_version_major, - benchmark_min_tool_version_minor, - benchmark_min_tool_version_patch, - benchmark_tool, - benchmark_tool_source, - _, - ) = extract_benchmark_tool_settings(benchmark_config) - benchmark_tools_sanity_check(args, benchmark_tool) - # setup the benchmark tool - if benchmark_tool == "redisgraph-benchmark-go": - setup_remote_benchmark_tool_redisgraph_benchmark_go( - client_public_ip, + redis_configuration_parameters = ( + extract_redis_configuration_parameters( + benchmark_config, "dbconfig" + ) + ) + # setup Redis + spin_up_standalone_remote_redis( + benchmark_config, + server_public_ip, username, private_key, - redisbenchmark_go_link, + local_module_file, + remote_module_file, + remote_dataset_file, + dirname, + redis_configuration_parameters, ) - if "ycsb" in benchmark_tool: - setup_remote_benchmark_tool_ycsb_redisearch( - client_public_ip, + dataset_load_start_time = datetime.datetime.now() + local_redis_conn, ssh_tunnel = ssh_tunnel_redisconn( + server_plaintext_port, + server_private_ip, + server_public_ip, username, - private_key, ) - if "tsbs_" in benchmark_tool: + result = wait_for_conn(local_redis_conn) + dataset_load_end_time = datetime.datetime.now() + if result is True: + logging.info("Redis available") + else: + logging.error("Remote redis is not available") + raise Exception("Remote redis is not available. Aborting...") + + dataset_load_duration_seconds = ( + dataset_load_end_time - dataset_load_start_time + ).seconds + logging.info( + "Dataset loading duration {} secs.".format( + dataset_load_duration_seconds + ) + ) + + stdout = local_redis_conn.execute_command("info modules") ( - queries_file_link, - remote_tool_link, - tool_link, - ) = extract_tsbs_extra_links(benchmark_config, benchmark_tool) + module_names, + artifact_versions, + ) = extract_module_semver_from_info_modules_cmd(stdout) - setup_remote_benchmark_tool_requirements_tsbs( - client_public_ip, - username, - private_key, - tool_link, - queries_file_link, - remote_tool_link, - ) + check_required_modules(module_names, required_modules) - if ( - benchmark_min_tool_version is not None - and benchmark_tool == "redis-benchmark" - ): - redis_benchmark_ensure_min_version_remote( - benchmark_tool, + # run initialization commands before benchmark starts + execute_init_commands(benchmark_config, local_redis_conn) + + ssh_tunnel.close() # Close the tunnel + + artifact_version = artifact_versions[0] + ( benchmark_min_tool_version, benchmark_min_tool_version_major, benchmark_min_tool_version_minor, benchmark_min_tool_version_patch, - client_public_ip, - username, - private_key, - ) + benchmark_tool, + benchmark_tool_source, + _, + ) = extract_benchmark_tool_settings(benchmark_config) + benchmark_tools_sanity_check(args, benchmark_tool) + # setup the benchmark tool + if benchmark_tool == "redisgraph-benchmark-go": + setup_remote_benchmark_tool_redisgraph_benchmark_go( + client_public_ip, + username, + private_key, + redisbenchmark_go_link, + ) + if "ycsb" in benchmark_tool: + setup_remote_benchmark_tool_ycsb_redisearch( + client_public_ip, + username, + private_key, + ) + if "tsbs_" in benchmark_tool: + ( + queries_file_link, + remote_tool_link, + tool_link, + ) = extract_tsbs_extra_links(benchmark_config, benchmark_tool) + + setup_remote_benchmark_tool_requirements_tsbs( + client_public_ip, + username, + private_key, + tool_link, + queries_file_link, + remote_tool_link, + ) - command, command_str = prepare_benchmark_parameters( - benchmark_config, - benchmark_tool, - server_plaintext_port, - server_private_ip, - remote_results_file, - True, - ) + if ( + benchmark_min_tool_version is not None + and benchmark_tool == "redis-benchmark" + ): + redis_benchmark_ensure_min_version_remote( + benchmark_tool, + benchmark_min_tool_version, + benchmark_min_tool_version_major, + benchmark_min_tool_version_minor, + benchmark_min_tool_version_patch, + client_public_ip, + username, + private_key, + ) - start_time, start_time_ms, start_time_str = get_start_time_vars() - local_benchmark_output_filename = get_run_full_filename( - start_time_str, - deployment_type, - tf_github_org, - tf_github_repo, - tf_github_branch, - test_name, - tf_github_sha, - ) - logging.info( - "Will store benchmark json output to local file {}".format( - local_benchmark_output_filename + command, command_str = prepare_benchmark_parameters( + benchmark_config, + benchmark_tool, + server_plaintext_port, + server_private_ip, + remote_results_file, + True, ) - ) - tmp = None - if benchmark_tool == "redis-benchmark": - tmp = local_benchmark_output_filename - local_benchmark_output_filename = "result.csv" - - benchmark_start_time = datetime.datetime.now() - # run the benchmark - _, stdout, _ = run_remote_benchmark( - client_public_ip, - username, - private_key, - remote_results_file, - local_benchmark_output_filename, - command_str, - ) - benchmark_end_time = datetime.datetime.now() - benchmark_duration_seconds = ( - benchmark_end_time - benchmark_start_time - ).seconds - logging.info( - "Benchmark duration {} secs.".format(benchmark_duration_seconds) - ) - if benchmark_duration_seconds < min_recommended_benchmark_duration: - logging.warning( - "Benchmark duration of {} secs is bellow the considered" - " minimum duration for a stable run ({} secs).".format( - benchmark_duration_seconds, - min_recommended_benchmark_duration, + + start_time, start_time_ms, start_time_str = get_start_time_vars() + local_benchmark_output_filename = get_run_full_filename( + start_time_str, + deployment_type, + tf_github_org, + tf_github_repo, + tf_github_branch, + test_name, + tf_github_sha, + ) + logging.info( + "Will store benchmark json output to local file {}".format( + local_benchmark_output_filename ) ) - - if benchmark_tool == "redis-benchmark": - local_benchmark_output_filename = tmp - with open("result.csv", "r") as txt_file: - stdout = txt_file.read() - - if benchmark_tool == "redis-benchmark" or benchmark_tool == "ycsb": - post_process_benchmark_results( - benchmark_tool, + tmp = None + if benchmark_tool == "redis-benchmark": + tmp = local_benchmark_output_filename + local_benchmark_output_filename = "result.csv" + + benchmark_start_time = datetime.datetime.now() + # run the benchmark + _, stdout, _ = run_remote_benchmark( + client_public_ip, + username, + private_key, + remote_results_file, local_benchmark_output_filename, - start_time_ms, - start_time_str, - stdout, + command_str, ) + benchmark_end_time = datetime.datetime.now() + benchmark_duration_seconds = ( + benchmark_end_time - benchmark_start_time + ).seconds + logging.info( + "Benchmark duration {} secs.".format(benchmark_duration_seconds) + ) + if benchmark_duration_seconds < min_recommended_benchmark_duration: + logging.warning( + "Benchmark duration of {} secs is bellow the considered" + " minimum duration for a stable run ({} secs).".format( + benchmark_duration_seconds, + min_recommended_benchmark_duration, + ) + ) - with open(local_benchmark_output_filename, "r") as json_file: - results_dict = json.load(json_file) + if benchmark_tool == "redis-benchmark": + local_benchmark_output_filename = tmp + with open("result.csv", "r") as txt_file: + stdout = txt_file.read() - # check KPIs - return_code = results_dict_kpi_check( - benchmark_config, results_dict, return_code - ) + if benchmark_tool == "redis-benchmark" or benchmark_tool == "ycsb": + post_process_benchmark_results( + benchmark_tool, + local_benchmark_output_filename, + start_time_ms, + start_time_str, + stdout, + ) - # if the benchmark tool is redisgraph-benchmark-go and - # we still dont have the artifact semver we can extract it from the results dict - if ( - benchmark_tool == "redisgraph-benchmark-go" - and artifact_version is None - ): - artifact_version = extract_redisgraph_version_from_resultdict( - results_dict - ) + with open(local_benchmark_output_filename, "r") as json_file: + results_dict = json.load(json_file) - if artifact_version is None: - artifact_version = "N/A" + # check KPIs + return_code = results_dict_kpi_check( + benchmark_config, results_dict, return_code + ) - if args.upload_results_s3: - logging.info( - "Uploading results to s3. s3 bucket name: {}. s3 bucket path: {}".format( - s3_bucket_name, s3_bucket_path + # if the benchmark tool is redisgraph-benchmark-go and + # we still dont have the artifact semver we can extract it from the results dict + if ( + benchmark_tool == "redisgraph-benchmark-go" + and artifact_version is None + ): + artifact_version = extract_redisgraph_version_from_resultdict( + results_dict ) - ) - artifacts = [local_benchmark_output_filename] - upload_artifacts_to_s3(artifacts, s3_bucket_name, s3_bucket_path) - if args.push_results_redistimeseries: - logging.info("Pushing results to RedisTimeSeries.") - redistimeseries_results_logic( - artifact_version, - benchmark_config, - default_metrics, - deployment_type, - exporter_timemetric_path, - results_dict, - rts, - test_name, - tf_github_branch, - tf_github_org, - tf_github_repo, - tf_triggering_env, - ) - try: - rts.redis.sadd(testcases_setname, test_name) - rts.incrby( - tsname_project_total_success, - 1, - timestamp=start_time_ms, - labels=get_project_ts_tags( - tf_github_org, - tf_github_repo, - deployment_type, - tf_triggering_env, - ), + if artifact_version is None: + artifact_version = "N/A" + + if args.upload_results_s3: + logging.info( + "Uploading results to s3. s3 bucket name: {}. s3 bucket path: {}".format( + s3_bucket_name, s3_bucket_path + ) + ) + artifacts = [local_benchmark_output_filename] + upload_artifacts_to_s3( + artifacts, s3_bucket_name, s3_bucket_path ) - metric_name = "benchmark_duration" - tsname_use_case_duration = get_ts_metric_name( - "by.version", + + if args.push_results_redistimeseries: + logging.info("Pushing results to RedisTimeSeries.") + redistimeseries_results_logic( artifact_version, - tf_github_org, - tf_github_repo, + benchmark_config, + default_metrics, deployment_type, + exporter_timemetric_path, + results_dict, + rts, test_name, - tf_triggering_env, - metric_name, - ) - labels = get_project_ts_tags( + tf_github_branch, tf_github_org, tf_github_repo, - deployment_type, tf_triggering_env, ) - labels["version"] = artifact_version - labels["test_name"] = str(test_name) - labels["metric"] = str(metric_name) - logging.info( - "Adding duration {} secs to time-serie named {}".format( - benchmark_duration_seconds, tsname_use_case_duration - ) - ) - rts.add( - tsname_use_case_duration, - start_time_ms, - benchmark_duration_seconds, - labels=labels, - ) - except redis.exceptions.ResponseError as e: - logging.warning( - "Error while updating secondary data structures {}. ".format( - e.__str__() + try: + rts.redis.sadd(testcases_setname, test_name) + rts.incrby( + tsname_project_total_success, + 1, + timestamp=start_time_ms, + labels=get_project_ts_tags( + tf_github_org, + tf_github_repo, + deployment_type, + tf_triggering_env, + ), ) - ) - pass - except: - if args.push_results_redistimeseries: - try: - rts.incrby( - tsname_project_total_failures, - 1, - timestamp=start_time_ms, - labels=get_project_ts_tags( + add_standardized_metric( + "benchmark_duration", + benchmark_duration_seconds, + artifact_version, + deployment_type, + rts, + start_time_ms, + test_name, tf_github_org, tf_github_repo, + tf_triggering_env, + ) + add_standardized_metric( + "dataset_load_duration", + dataset_load_duration_seconds, + artifact_version, deployment_type, + rts, + start_time_ms, + test_name, + tf_github_org, + tf_github_repo, tf_triggering_env, - ), - ) - except redis.exceptions.ResponseError as e: - logging.warning( - "Error while updating secondary data structures {}. ".format( - e.__str__() ) - ) - pass - return_code |= 1 - logging.critical( - "Some unexpected exception was caught " - "during remote work. Failing test...." - ) - logging.critical(sys.exc_info()[0]) - print("-" * 60) - traceback.print_exc(file=sys.stdout) - print("-" * 60) + except redis.exceptions.ResponseError as e: + logging.warning( + "Error while updating secondary data structures {}. ".format( + e.__str__() + ) + ) + pass + + except: + if args.push_results_redistimeseries: + if start_time_ms is None: + _, start_time_ms, _ = get_start_time_vars() + try: + rts.incrby( + tsname_project_total_failures, + 1, + timestamp=start_time_ms, + labels=get_project_ts_tags( + tf_github_org, + tf_github_repo, + deployment_type, + tf_triggering_env, + ), + ) + except redis.exceptions.ResponseError as e: + logging.warning( + "Error while updating secondary data structures {}. ".format( + e.__str__() + ) + ) + pass + return_code |= 1 + logging.critical( + "Some unexpected exception was caught " + "during remote work. Failing test...." + ) + logging.critical(sys.exc_info()[0]) + print("-" * 60) + traceback.print_exc(file=sys.stdout) + print("-" * 60) - else: - logging.info( - "Test {} does not have remote config. Skipping test.".format(test_name) - ) + else: + logging.info( + "Test {} does not have remote config. Skipping test.".format( + test_name + ) + ) for remote_setup_name, tf in remote_envs.items(): # tear-down logging.info("Tearing down setup {}".format(remote_setup_name)) @@ -654,6 +679,84 @@ def run_remote_command_logic(args): exit(return_code) +def add_standardized_metric( + metric_name, + metric_value, + artifact_version, + deployment_type, + rts, + start_time_ms, + test_name, + tf_github_org, + tf_github_repo, + tf_triggering_env, +): + tsname_use_case_duration = get_ts_metric_name( + "by.version", + artifact_version, + tf_github_org, + tf_github_repo, + deployment_type, + test_name, + tf_triggering_env, + metric_name, + ) + labels = get_project_ts_tags( + tf_github_org, + tf_github_repo, + deployment_type, + tf_triggering_env, + ) + labels["version"] = artifact_version + labels["test_name"] = str(test_name) + labels["metric"] = str(metric_name) + logging.info( + "Adding metric {}={} to time-serie named {}".format( + metric_name, metric_value, tsname_use_case_duration + ) + ) + try: + logging.info( + "Creating timeseries named {} with labels {}".format( + tsname_use_case_duration, labels + ) + ) + rts.create(tsname_use_case_duration, labels=labels) + except redis.exceptions.ResponseError: + logging.warning( + "Timeseries named {} already exists".format(tsname_use_case_duration) + ) + pass + rts.add( + tsname_use_case_duration, + start_time_ms, + metric_value, + labels=labels, + ) + + +def ssh_tunnel_redisconn( + server_plaintext_port, server_private_ip, server_public_ip, username +): + ssh_pkey = paramiko.RSAKey.from_private_key_file(private_key) + + # Use sshtunnelforwarder tunnel to connect redis via springboard + server = SSHTunnelForwarder( + ssh_address_or_host=(server_public_ip, 22), + ssh_username=username, + ssh_pkey=ssh_pkey, + logger=logging.getLogger(), + remote_bind_address=( + server_private_ip, + server_plaintext_port, + ), # remote redis server + local_bind_address=("0.0.0.0", 10022), # enable local forwarding port + ) + server.start() # start tunnel + r = redis.StrictRedis(host="localhost", port=server.local_bind_port) + return r, server + + def extract_tsbs_extra_links(benchmark_config, benchmark_tool): remote_tool_link = "/tmp/{}".format(benchmark_tool) tool_link = ( diff --git a/redisbench_admin/utils/local.py b/redisbench_admin/utils/local.py index 9fb5d1c..119fe12 100644 --- a/redisbench_admin/utils/local.py +++ b/redisbench_admin/utils/local.py @@ -7,12 +7,13 @@ import logging import os import subprocess -import time from shutil import copyfile import redis import wget +from redisbench_admin.utils.utils import wait_for_conn + def check_dataset_local_requirements( benchmark_config, @@ -68,27 +69,6 @@ def check_if_needs_remote_fetch( return full_path -def wait_for_conn(conn, retries=20, command="PING", should_be=True): - """Wait until a given Redis connection is ready""" - result = False - while retries > 0 and result is False: - try: - if conn.execute_command(command) == should_be: - result = True - except redis.exceptions.BusyLoadingError: - time.sleep(0.1) # give extra 100msec in case of RDB loading - except redis.ConnectionError as err: - str(err) - except redis.ResponseError as err: - err1 = str(err) - if not err1.startswith("DENIED"): - raise - time.sleep(0.1) - retries -= 1 - logging.debug("Waiting for Redis") - return result - - def spin_up_local_redis( dbdir, port, diff --git a/redisbench_admin/utils/remote.py b/redisbench_admin/utils/remote.py index 8c4593e..f5f67e8 100644 --- a/redisbench_admin/utils/remote.py +++ b/redisbench_admin/utils/remote.py @@ -24,6 +24,17 @@ from redisbench_admin.utils.local import check_dataset_local_requirements from redisbench_admin.utils.utils import get_ts_metric_name +# environment variables +PERFORMANCE_RTS_PUSH = bool(os.getenv("PUSH_RTS", False)) +PERFORMANCE_RTS_AUTH = os.getenv("PERFORMANCE_RTS_AUTH", None) +PERFORMANCE_RTS_HOST = os.getenv("PERFORMANCE_RTS_HOST", "localhost") +PERFORMANCE_RTS_PORT = os.getenv("PERFORMANCE_RTS_PORT", 6379) +TERRAFORM_BIN_PATH = os.getenv("TERRAFORM_BIN_PATH", "terraform") +EC2_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID", None) +EC2_REGION = os.getenv("AWS_DEFAULT_REGION", None) +EC2_SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", None) +EC2_PRIVATE_PEM = os.getenv("EC2_PRIVATE_PEM", None) + def get_git_root(path): git_repo = git.Repo(path, search_parent_directories=True) @@ -429,18 +440,7 @@ def push_data_to_redistimeseries(rts: client, branch_time_series_dict: dict): datapoint_inserts = 0 if rts is not None: for timeseries_name, time_series in branch_time_series_dict.items(): - try: - logging.info( - "Creating timeseries named {} with labels {}".format( - timeseries_name, time_series["labels"] - ) - ) - rts.create(timeseries_name, labels=time_series["labels"]) - except redis.exceptions.ResponseError: - logging.warning( - "Timeseries named {} already exists".format(timeseries_name) - ) - pass + exporter_create_ts(rts, time_series, timeseries_name) for timestamp, value in time_series["data"].items(): try: rts.add( @@ -461,6 +461,19 @@ def push_data_to_redistimeseries(rts: client, branch_time_series_dict: dict): return datapoint_errors, datapoint_inserts +def exporter_create_ts(rts, time_series, timeseries_name): + try: + logging.info( + "Creating timeseries named {} with labels {}".format( + timeseries_name, time_series["labels"] + ) + ) + rts.create(timeseries_name, labels=time_series["labels"]) + except redis.exceptions.ResponseError: + logging.warning("Timeseries named {} already exists".format(timeseries_name)) + pass + + def extract_redisgraph_version_from_resultdict(results_dict: dict): version = None if "DBSpecificConfigs" in results_dict: @@ -610,3 +623,15 @@ def get_overall_dashboard_keynames(tf_github_org, tf_github_repo, tf_triggering_ tsname_project_total_failures, tsname_project_total_success, ) + + +def check_ec2_env(): + if EC2_ACCESS_KEY is None or EC2_ACCESS_KEY == "": + logging.error("missing required AWS_ACCESS_KEY_ID env variable") + exit(1) + if EC2_REGION is None or EC2_REGION == "": + logging.error("missing required AWS_DEFAULT_REGION env variable") + exit(1) + if EC2_SECRET_KEY is None or EC2_SECRET_KEY == "": + logging.error("missing required AWS_SECRET_ACCESS_KEY env variable") + exit(1) diff --git a/redisbench_admin/utils/utils.py b/redisbench_admin/utils/utils.py index 4cfebfc..217b458 100644 --- a/redisbench_admin/utils/utils.py +++ b/redisbench_admin/utils/utils.py @@ -12,10 +12,12 @@ import os import os.path import tarfile +import time from functools import reduce from zipfile import ZipFile import boto3 +import redis import requests from tqdm import tqdm @@ -222,3 +224,24 @@ def get_ts_metric_name( ) ) return ts_name + + +def wait_for_conn(conn, retries=120, command="PING", should_be=True): + """Wait until a given Redis connection is ready""" + result = False + while retries > 0 and result is False: + try: + if conn.execute_command(command) == should_be: + result = True + except redis.exceptions.BusyLoadingError: + time.sleep(1) # give extra 1sec in case of RDB loading + except redis.ConnectionError as err: + logging.error(err) + except redis.ResponseError as err: + err1 = str(err) + if not err1.startswith("DENIED"): + raise + time.sleep(1) + retries -= 1 + logging.debug("Waiting for Redis") + return result diff --git a/redisbench_admin/watchdog/__init__.py b/redisbench_admin/watchdog/__init__.py new file mode 100644 index 0000000..8b39445 --- /dev/null +++ b/redisbench_admin/watchdog/__init__.py @@ -0,0 +1,5 @@ +# BSD 3-Clause License +# +# Copyright (c) 2021., Redis Labs Modules +# All rights reserved. +# diff --git a/redisbench_admin/watchdog/args.py b/redisbench_admin/watchdog/args.py new file mode 100644 index 0000000..3af0f0e --- /dev/null +++ b/redisbench_admin/watchdog/args.py @@ -0,0 +1,35 @@ +# Apache License Version 2.0 +# +# Copyright (c) 2021., Redis Labs Modules +# All rights reserved. +# +from redisbench_admin.utils.remote import ( + PERFORMANCE_RTS_HOST, + PERFORMANCE_RTS_PORT, + PERFORMANCE_RTS_AUTH, +) + + +def create_watchdog_arguments(parser): + parser.add_argument( + "--steps", + type=str, + default="dangling,count-active", + help="comma separated list of steps to be run", + ) + parser.add_argument( + "--exporter", + type=str, + default="redistimeseries", + help="exporter to be used ( either csv or redistimeseries )", + ) + parser.add_argument( + "--update-interval", + type=int, + default=60, + help="watchdog update interval in seconds", + ) + parser.add_argument("--redistimesies_host", type=str, default=PERFORMANCE_RTS_HOST) + parser.add_argument("--redistimesies_port", type=int, default=PERFORMANCE_RTS_PORT) + parser.add_argument("--redistimesies_pass", type=str, default=PERFORMANCE_RTS_AUTH) + return parser diff --git a/redisbench_admin/watchdog/watchdog.py b/redisbench_admin/watchdog/watchdog.py new file mode 100644 index 0000000..620cdb5 --- /dev/null +++ b/redisbench_admin/watchdog/watchdog.py @@ -0,0 +1,141 @@ +# BSD 3-Clause License +# +# Copyright (c) 2021., Redis Labs Modules +# All rights reserved. +# + + +import datetime +import logging +import time + +import boto3 +from redistimeseries.client import Client + +from redisbench_admin.run.common import get_start_time_vars +from redisbench_admin.utils.remote import ( + EC2_REGION, + EC2_SECRET_KEY, + EC2_ACCESS_KEY, + check_ec2_env, +) + +terminate_after_secs = 45 * 60 +dry_run = True +ci_machines_prefix = "/tmp/" + + +def get_ci_ec2_instances_by_state(ec2_client, ci_machines_prefix, requested_state): + count = 0 + state_instances = [] + response = ec2_client.describe_instances() + for group in response["Reservations"]: + instances = group["Instances"] + for instance in instances: + state = instance["State"]["Name"] + for tag_dict in instance["Tags"]: + key = tag_dict["Key"] + key_v = tag_dict["Value"] + if key == "Name": + if ci_machines_prefix in key_v: + if state == requested_state: + count = count + 1 + state_instances.append(instance) + return count, state_instances + + +def watchdog_dangling_ec2_instances( + ec2_client, terminate_after_secs, ci_machines_prefix, dry_run +): + current_datetime = datetime.datetime.now(datetime.timezone.utc) + total_instances = 0 + response = ec2_client.describe_instances() + for group in response["Reservations"]: + instances = group["Instances"] + for instance in instances: + launch_time = instance["LaunchTime"] + instance_id = instance["InstanceId"] + state = instance["State"]["Name"] + if state != "terminated": + for tag_dict in instance["Tags"]: + key = tag_dict["Key"] + key_v = tag_dict["Value"] + if key == "Name": + if ci_machines_prefix in key_v: + total_instances = total_instances + 1 + elapsed = current_datetime - launch_time + will_terminate = False + if elapsed.total_seconds() > terminate_after_secs: + will_terminate = True + + logging.info( + "Temporary machine {} {}. terminate? {}".format( + key_v, elapsed, will_terminate + ) + ) + if will_terminate: + logging.warning( + "Requesting to terminate instance with id {} given it ".format( + instance_id + ) + + "surpassed the maximum allowed ci duration" + ) + response = ec2_client.terminate_instances( + InstanceIds=[ + instance_id, + ] + ) + logging.info( + "Request to terminate instance with id {} reply: {}".format( + instance_id, response + ) + ) + logging.info("Detected a total of {} ci.bechmark VMs".format(total_instances)) + + +def watchdog_command_logic(args): + cloud = "aws" + prefix = "ci.benchmarks.redislabs/{}/{}".format(cloud, EC2_REGION) + tsname_overall_running = "{}/state-running".format(prefix) + check_ec2_env() + boto3.setup_default_session( + region_name=EC2_REGION, + aws_access_key_id=EC2_ACCESS_KEY, + aws_secret_access_key=EC2_SECRET_KEY, + ) + logging.info("Checking connection to RedisTimeSeries.") + rts = Client( + host=args.redistimesies_host, + port=args.redistimesies_port, + password=args.redistimesies_pass, + ) + rts.redis.ping() + ec2_client = boto3.client("ec2") + update_interval = args.update_interval + logging.info( + "Entering watching loop. Ticking every {} secs".format(update_interval) + ) + while True: + starttime, start_time_ms, _ = get_start_time_vars() + + watchdog_dangling_ec2_instances( + ec2_client, terminate_after_secs, ci_machines_prefix, dry_run + ) + + running_count, _ = get_ci_ec2_instances_by_state( + ec2_client, ci_machines_prefix, "running" + ) + + rts.add( + tsname_overall_running, + start_time_ms, + running_count, + labels={"cloud": cloud, "region": EC2_REGION}, + ) + + sleep_time_secs = float(update_interval) - ( + (datetime.datetime.now() - starttime).total_seconds() + % float(update_interval) + ) + logging.info("Sleeping for {} secs".format(sleep_time_secs)) + time.sleep(sleep_time_secs)