From e9e449aa1a5ccaabde8357236d532ac314d57a44 Mon Sep 17 00:00:00 2001 From: Josh Lewittes Date: Wed, 15 May 2024 17:32:22 +0300 Subject: [PATCH] Add cluster utilization data to `status` endpoint (#653) --- runhouse/constants.py | 4 + runhouse/main.py | 307 ++++++++++++++---- runhouse/resources/hardware/cluster.py | 11 +- runhouse/servers/env_servlet.py | 4 +- runhouse/servers/http/http_server.py | 15 +- runhouse/servers/obj_store.py | 285 +++++++++++++++- runhouse/utils.py | 7 + tests/conftest.py | 2 + tests/fixtures/on_demand_cluster_fixtures.py | 32 +- .../test_clusters/test_cluster.py | 154 ++++++++- .../test_clusters/test_cluster_default_env.py | 2 +- .../test_clusters/test_on_demand_cluster.py | 1 + 12 files changed, 720 insertions(+), 104 deletions(-) diff --git a/runhouse/constants.py b/runhouse/constants.py index 4fba40669..7d48c3501 100644 --- a/runhouse/constants.py +++ b/runhouse/constants.py @@ -58,3 +58,7 @@ TEST_ORG = "test-org" EMPTY_DEFAULT_ENV_NAME = "_cluster_default_env" + +# cluster status constants +DOUBLE_SPACE_UNICODE = "\u00A0\u00A0" +BULLET_UNICODE = "\u2022" diff --git a/runhouse/main.py b/runhouse/main.py index 028366b8f..fc7440991 100644 --- a/runhouse/main.py +++ b/runhouse/main.py @@ -1,4 +1,7 @@ +import copy +import importlib import logging +import math import shlex import subprocess import time @@ -17,8 +20,10 @@ import runhouse.rns.login -from runhouse import __version__, cluster, configs +from runhouse import __version__, cluster, Cluster, configs from runhouse.constants import ( + BULLET_UNICODE, + DOUBLE_SPACE_UNICODE, RAY_KILL_CMD, RAY_START_CMD, SERVER_LOGFILE, @@ -27,7 +32,7 @@ START_NOHUP_CMD, START_SCREEN_CMD, ) -from runhouse.globals import obj_store +from runhouse.globals import obj_store, rns_client from runhouse.resources.hardware.ray_utils import ( check_for_existing_ray_instance, kill_actors, @@ -128,81 +133,243 @@ def ssh(cluster_name: str, up: bool = typer.Option(False, help="Start the cluste c.ssh() -def _print_status(config): +############################### +# Status helping functions +############################### + + +def _adjust_resource_type(resource_type: str): """ - Prints the status of the cluster to the console - :param config: cluster's config - :return: cluster's config + status helping function. transforms a str form runhouse.resources.{X.Y...}.resource_type to runhouse.resource_type """ - envs = config["envs"] - config.pop("envs", []) - - # print headlines - daemon_headline_txt = ( - "\N{smiling face with horns} Runhouse Daemon is running \N{Runner}" - ) + try: + resource_type = resource_type.split(".")[-1] + getattr(importlib.import_module("runhouse"), resource_type) + return f"runhouse.{resource_type}" + except AttributeError: + return resource_type - console.print(daemon_headline_txt, style="bold royal_blue1") - if "name" in config.keys(): - console.print(config["name"]) - first_info_to_print = ["den_auth", "server_connection_type", "server_port"] +def _resource_name_to_rns(name: str): + """ + If possible, transform the resource name to a rns address. + If not, return the name as is (it is the key in the object store). + """ + resource_config = rns_client.load_config(name) + if resource_config and resource_config.get("resource_type") != "env": + return resource_config.get("name") + else: + return name - if config.get("default_env") and isinstance(config["default_env"], Dict): - config["default_env"] = config["default_env"]["name"] - for info in config: - if info in first_info_to_print: - console.print(f"\u2022 {info}: {config[info]}") - first_info_to_print.append("name") +def _print_cluster_config(cluster_config: Dict): + """ + Helping function to the `_print_status` which prints the relevant info from the cluster config. + """ + if "name" in cluster_config.keys(): + console.print(cluster_config.get("name")) + + top_level_config = [ + "server_port", + "server_pid", + "den_auth", + "server_connection_type", + ] + + backend_config = [ + "resource_subtype", + "use_local_telemetry", + "domain", + "server_host", + "ips", + "resource_subtype", + ] + + if cluster_config.get("resource_subtype") != "Cluster": + backend_config.append("autostop_mins") + + if cluster_config.get("default_env") and isinstance( + cluster_config.get("default_env"), Dict + ): + cluster_config["default_env"] = cluster_config["default_env"]["name"] + + for key in top_level_config: + console.print(f"{BULLET_UNICODE} {key}: {cluster_config[key]}") console.print("\u2022 backend config:") - for info in config: - if info not in first_info_to_print: - console.print(f"\t\u2022 {info}: {config[info]}") + for key in backend_config: + if key == "autostop_mins" and cluster_config[key] == -1: + console.print( + f"{DOUBLE_SPACE_UNICODE}{BULLET_UNICODE} {key}: autostop disabled" + ) + else: + console.print( + f"{DOUBLE_SPACE_UNICODE}{BULLET_UNICODE} {key}: {cluster_config[key]}" + ) - # print the environments in the cluster, and the resources associated with each environment. + +def _print_envs_info(envs: Dict, envs_info: Dict, current_cluster: Cluster): + """ + Prints info about the envs in the current_cluster. + Prints the resources in each env, and the CPU and GPU usage of the env (if exists). + + :param envs: Dict of envs in each cluster, and the resources associated with them. + :param envs_info: Dict of cpu and gpu info of the envs. + :param current_cluster: The cluster whose status we are printing. + """ + # Print headline envs_in_cluster_headline = "Serving 🍦 :" console.print(envs_in_cluster_headline, style="bold") if len(envs) == 0: console.print("This cluster has no environment nor resources.") - for env_name in envs: + first_envs_to_print = [] + + # First: if the default env does not have resources, print it. + default_env_name = current_cluster.default_env.name + if len(envs[default_env_name]) <= 1: + # case where the default env doesn't hve any other resources, apart from the default env itself. + console.print( + f"{BULLET_UNICODE} {default_env_name} (runhouse.Env)", + style="italic bold", + ) + console.print( + f"{DOUBLE_SPACE_UNICODE}This environment has only python packages installed, if such provided. No " + "resources were found." + ) + + else: + # case where the default env have other resources. We make sure that our of all the envs which have reosirces, + # the default_env will be printed first. + first_envs_to_print = [default_env_name] + + # Make sure to print envs with no resources first. + # (the only resource they have is a runhouse.env, which is the env itself). + first_envs_to_print = first_envs_to_print + [ + env_name + for env_name in envs + if (len(envs[env_name]) <= 1 and env_name != default_env_name) + ] + + # Now, print the envs. + # If the env have packages installed, that means that it contains an env resource. In that case: + # * If the env contains only itself, we will print that the env contains only the installed packages. + # * Else, we will print the resources (rh.function, th.module) associated with the env. + + envs_to_print = first_envs_to_print + [ + env_name + for env_name in envs + if env_name not in first_envs_to_print + [default_env_name] + ] + + for env_name in envs_to_print: resources_in_env = envs[env_name] - if len(resources_in_env) == 0: - console.print(f"{env_name} (Env):", style="italic underline") - console.print("This environment has no resources.") + env_process_info = envs_info[env_name] + current_env = [ + resource for resource in resources_in_env if resource["name"] == env_name + ] + + # sometimes the env itself is not a resource (key) inside the env's servlet. + if len(current_env) == 0: + env_name_print = _resource_name_to_rns(env_name) + env_type = "runhouse.Env" + else: + current_env = current_env[0] + env_name_print = _resource_name_to_rns(current_env["name"]) + env_type = _adjust_resource_type(current_env["resource_type"]) + + env_name_txt = f"{BULLET_UNICODE} {env_name_print} ({env_type}) | pid: {env_process_info['pid']} | node: {env_process_info['node_name']}" + console.print(env_name_txt, style="italic bold") + + # Print CPU info + env_cpu_info = env_process_info.get("env_memory_usage") + if env_cpu_info: + + # convert bytes to GB + memory_usage_gb = round( + int(env_cpu_info["memory_size_bytes"]) / (1024**3), + 2, + ) + total_cluster_memory = math.ceil( + int(env_cpu_info["total_cluster_memory"]) / (1024**3) + ) + cpu_memory_usage_percent = round( + float(env_cpu_info["memory_percent_from_cluster"]), + 2, + ) + cpu_usage_percent = round(float(env_cpu_info["cpu_usage_percent"]), 2) + + cpu_usage_summery = f"{DOUBLE_SPACE_UNICODE}CPU: {cpu_usage_percent}% | Memory: {memory_usage_gb} / {total_cluster_memory} Gb ({cpu_memory_usage_percent}%)" else: - current_env = [ - resource - for resource in resources_in_env - if resource["name"] == env_name - ] - - # sometimes the env itself is not a resource (key) inside the env's servlet. - if len(current_env) == 0: - env_name_txt = f"{env_name} (Env):" - else: - current_env = current_env[0] - env_name_txt = ( - f"{current_env['name']} ({current_env['resource_type']}):" - ) + cpu_usage_summery = ( + f"{DOUBLE_SPACE_UNICODE}CPU: This process did not use CPU memory." + ) - console.print( - env_name_txt, - style="italic underline", + console.print(cpu_usage_summery) + + # Print GPU info + env_gpu_info = env_process_info.get("env_gpu_usage") + + # sometimes the cluster has no GPU, therefore the env_gpu_info is an empty dictionary. + if env_gpu_info: + # get the gpu usage info, and convert it to GB. + total_gpu_memory = math.ceil( + float(env_gpu_info.get("total_gpu_memory")) / (1024**3) + ) + gpu_util_percent = round(float(env_gpu_info.get("gpu_util_percent")), 2) + used_gpu_memory = round( + float(env_gpu_info.get("used_gpu_memory")) / (1024**3), 2 ) + gpu_memory_usage_percent = round( + float(used_gpu_memory / total_gpu_memory) * 100, 2 + ) + gpu_usage_summery = f"{DOUBLE_SPACE_UNICODE}GPU: {gpu_util_percent}% | Memory: {used_gpu_memory} / {total_gpu_memory} Gb ({gpu_memory_usage_percent}%)" + console.print(gpu_usage_summery) + + resources_in_env = [ + resource for resource in resources_in_env if resource is not current_env + ] - resources_in_env = [ - resource for resource in resources_in_env if resource is not current_env - ] + if len(resources_in_env) == 0: + # No resources were found in the env, only the associated installed python reqs were installed. + console.print( + f"{DOUBLE_SPACE_UNICODE}This environment has only python packages installed, if such provided. No resources were " + "found." + ) + else: for resource in resources_in_env: - resource_name = resource["name"] - resource_type = resource["resource_type"] - console.print(f"\u2022{resource_name} ({resource_type})") + resource_name = _resource_name_to_rns(resource["name"]) + resource_type = _adjust_resource_type(resource["resource_type"]) + console.print( + f"{DOUBLE_SPACE_UNICODE}{BULLET_UNICODE} {resource_name} ({resource_type})" + ) + + +def _print_status(config: dict, current_cluster: Cluster): + """ + Prints the status of the cluster to the console + :param config: cluster's config + :return: cluster's config + """ + + cluster_config = config.get("cluster_config") + envs = cluster_config.pop("envs", []) + envs_info = config.pop("env_servlet_actors", []) + + # print headline + daemon_headline_txt = ( + "\N{smiling face with horns} Runhouse Daemon is running \N{Runner}" + ) + console.print(daemon_headline_txt, style="bold royal_blue1") + + # Print relevant info from cluster config. + _print_cluster_config(cluster_config) + + # print the environments in the cluster, and the resources associated with each environment. + _print_envs_info(envs, envs_info, current_cluster) return config @@ -216,6 +383,8 @@ def status( ): """Load the status of the Runhouse daemon running on a cluster.""" + cluster_or_local = rh.here + if cluster_name: current_cluster = cluster(name=cluster_name) if not current_cluster.is_up(): @@ -232,20 +401,42 @@ def status( f"`runhouse ssh {cluster_name}` or `sky status -r` for on-demand clusters." ) raise typer.Exit(1) - cluster_status = current_cluster.status() else: - current_cluster = rh.here - if not current_cluster or current_cluster == "file": + if not cluster_or_local or cluster_or_local == "file": console.print( "\N{smiling face with horns} Runhouse Daemon is not running... \N{No Entry} \N{Runner}. " "Start it with `runhouse restart` or specify a remote " "cluster to poll with `runhouse status `." ) raise typer.Exit(1) + + if cluster_or_local != "file": # If we are on the cluster load status directly from the object store cluster_status: dict = obj_store.status() + cluster_config = copy.deepcopy(cluster_status.get("cluster_config")) + current_cluster: Cluster = Cluster.from_config(cluster_config) + return _print_status(cluster_status, current_cluster) - return _print_status(cluster_status) + if cluster_name is None: + # If running outside the cluster must specify a cluster name + console.print("Missing argument `cluster_name`.") + return + + try: + current_cluster: Cluster = Cluster.from_name(name=cluster_name) + cluster_status: dict = current_cluster.status( + resource_address=current_cluster.rns_address + ) + + except ValueError: + console.print("Failed to load status for cluster.") + return + except requests.exceptions.ConnectionError: + console.print( + "\N{smiling face with horns} Runhouse Daemon is not running... \N{No Entry} \N{Runner}" + ) + return + return _print_status(cluster_status, current_cluster) def load_cluster(cluster_name: str): diff --git a/runhouse/resources/hardware/cluster.py b/runhouse/resources/hardware/cluster.py index e41231f88..9bba7e308 100644 --- a/runhouse/resources/hardware/cluster.py +++ b/runhouse/resources/hardware/cluster.py @@ -697,10 +697,13 @@ def status(self, resource_address: str = None): try: self.check_server() if self.on_this_cluster(): - return obj_store.status() - return self.client.status( - resource_address=resource_address or self.rns_address - ) + status = obj_store.status() + else: + status = self.client.status( + resource_address=resource_address or self.rns_address + ) + return status + except ValueError as e: raise e diff --git a/runhouse/servers/env_servlet.py b/runhouse/servers/env_servlet.py index c7f3cb809..81a386289 100644 --- a/runhouse/servers/env_servlet.py +++ b/runhouse/servers/env_servlet.py @@ -177,5 +177,5 @@ async def adelete_local(self, key: Any): async def aclear_local(self): return await obj_store.aclear_local() - async def astatus_local(self): - return obj_store.status_local() + async def astatus_local(self, env_servlet_pid): + return obj_store.status_local(env_servlet_pid) diff --git a/runhouse/servers/http/http_server.py b/runhouse/servers/http/http_server.py index e3ba3ee6a..02ea8aad2 100644 --- a/runhouse/servers/http/http_server.py +++ b/runhouse/servers/http/http_server.py @@ -713,8 +713,9 @@ async def get_keys(request: Request, env_name: Optional[str] = None): @staticmethod @app.get("/status") @validate_cluster_access - async def get_status(request: Request): - return await obj_store.astatus() + def get_status(request: Request): + + return obj_store.status() @staticmethod def _collect_cluster_stats(): @@ -1082,9 +1083,9 @@ async def main(): f"cluster_config.json: {cluster_config.get('ips', [None])[0]}. Prioritizing CLI provided certs_address." ) - certs_address = parse_args.certs_address or cluster_config.get("ips", [None])[0] - if certs_address is not None: - cluster_config["ips"] = [certs_address] + certs_addresses = parse_args.certs_address or cluster_config.get("ips", [None]) + if certs_addresses is not None: + cluster_config["ips"] = certs_addresses else: cluster_config["ips"] = ["0.0.0.0"] @@ -1173,14 +1174,14 @@ async def main(): # proxy to forward requests from port 80 (HTTP) or 443 (HTTPS) to the app's port. if use_caddy: logger.debug("Using Caddy as a reverse proxy") - if certs_address is None and domain is None: + if certs_addresses is None and domain is None: raise ValueError( "Must provide the server address or domain to configure Caddy. No address or domain found in the " "server start command (--certs-address or --domain) or in the cluster config YAML saved on the cluster." ) cc = CaddyConfig( - address=certs_address, + address=certs_addresses, domain=domain, rh_server_port=daemon_port, ssl_key_path=parsed_ssl_keyfile, diff --git a/runhouse/servers/obj_store.py b/runhouse/servers/obj_store.py index 5bfe4dd48..e7b64f8c2 100644 --- a/runhouse/servers/obj_store.py +++ b/runhouse/servers/obj_store.py @@ -1415,7 +1415,6 @@ async def aput_resource( # actually create the corresponding env servlet. resource_config, _, _ = tuple(deserialize_data(serialized_data, serialization)) if resource_config["resource_type"] == "env": - # Note that the passed in `env_name` and the `env_name_to_create` here are # distinct. The `env_name` is the name of the env servlet where we want to store # the resource itself. The `env_name_to_create` is the name of the env servlet @@ -1499,11 +1498,9 @@ async def aput_resource_local( ############################################## # Cluster info methods ############################################## - def status_local(self): - # The objects in env can be of any type, and not only runhouse resources, - # therefore we need to distinguish them when creating the list of the resources in each env. + def _get_objects_in_env(self): + objects_in_env_modified = [] if self.has_local_storage: - resources_in_env_modified = [] for k, v in self._kv_store.items(): cls = type(v) py_module = cls.__module__ @@ -1513,28 +1510,288 @@ def status_local(self): else (py_module + "." + cls.__qualname__) ) if isinstance(v, runhouse.Resource): - resources_in_env_modified.append( + objects_in_env_modified.append( {"name": k, "resource_type": cls_name} ) else: - resources_in_env_modified.append( + objects_in_env_modified.append( {"name": k, "resource_type": cls_name} ) - return resources_in_env_modified - else: - return [] + return objects_in_env_modified + + def _parse_env_actor_info(self, env_actor: list, servlet_property: str): + for line_of_info in env_actor: + # checking if it is the columns names line. + if "NAME" in line_of_info: + env_properties = line_of_info.split() + # checking if this is the line containing the unique property of the servlet + if servlet_property in line_of_info: + env_raw_info = line_of_info.split()[1:] + # removing element in index 0, this is the row index in the table, + # it does not contain unique info about the env. + env_actor_info = { + env_properties[i]: env_raw_info[i] for i in range(len(env_properties)) + } + return env_actor_info + + def _get_env_cpu_usage( + self, + env_name: str, + cluster_config: Dict, + ): + import subprocess + + import psutil + + from runhouse.utils import string_to_dict + + if env_name: + env_actor = ( + subprocess.check_output( + [ + "ray", + "list", + "actors", + "-f", + "state=ALIVE", + "-f", + f"name={env_name}", + ] + ) + .decode("utf-8") + .split("\n") + ) + + total_memory = psutil.virtual_memory().total + node_name = ( + "" # will be set later. if not set here, an error will be raised. + ) + + # getting in "real" info about the env from env_actor. + # The info about the env is described in a table. + # The columns are different properties of the env, and the rows are the envs (=ray actors) + env_actor_info = self._parse_env_actor_info(env_actor, env_name) + + env_node_info = ( + subprocess.check_output( + ["ray", "get", "nodes", env_actor_info.get("NODE_ID")] + ) + .decode("utf-8") + .split("\n") + ) + + env_node_info = { + string_to_dict(node_property)[0]: string_to_dict(node_property)[1] + for node_property in env_node_info + if ":" in node_property + } + + node_ip = env_node_info.get("node_ip") + env_servlet_pid = int(env_actor_info.get("PID")) + if not cluster_config.get("resource_subtype") == "Cluster": + stable_internal_external_ips = cluster_config.get( + "stable_internal_external_ips" + ) + for ips_set in stable_internal_external_ips: + internal_ip, external_ip = ips_set[0], ips_set[1] + if internal_ip == node_ip: + # head ip == cluster address == cluster.ips[0] + if ips_set[1] == cluster_config.get("ips")[0]: + node_name = f"head ({external_ip})" + else: + node_name = f"worker_{stable_internal_external_ips.index(ips_set)} ({external_ip}" + else: + # a case it is a BYO cluster, assume that first ip in the ips list is the head. + ips = cluster_config.get("ips") + if len(ips) == 1 or node_ip == ips[0]: + node_name = f"head ({node_ip})" + else: + node_name = f"worker_{ips.index(node_ip)} ({node_ip})" + + try: + env_servlet_process = psutil.Process(pid=env_servlet_pid) + memory_size_bytes = env_servlet_process.memory_full_info().uss + cpu_usage_percent = env_servlet_process.cpu_percent(interval=1) + env_memory_usage = { + "memory_size_bytes": memory_size_bytes, + "cpu_usage_percent": cpu_usage_percent, + "memory_percent_from_cluster": (memory_size_bytes / total_memory) + * 100, + "total_cluster_memory": total_memory, + "env_memory_info": psutil.virtual_memory(), + } + except psutil.NoSuchProcess: + env_memory_usage = {} + + return ( + env_memory_usage, + node_name, + total_memory, + env_servlet_pid, + env_actor_info, + node_ip, + ) + + def _get_env_gpu_usage(self, env_servlet_pid): + import subprocess + + import runhouse as rh + + # check it the cluster uses GPU or not + if rh.Package._detect_cuda_version_or_cpu() == "cpu": + return {} + + try: + gpu_general_info = ( + subprocess.run( + [ + "nvidia-smi", + "--query-gpu=utilization.gpu,memory.total,count", + "--format=csv,noheader,nounits", + ], + stdout=subprocess.PIPE, + ) + .stdout.decode("utf-8") + .strip() + .split(", ") + ) + gpu_util_percent = float(gpu_general_info[0]) + total_gpu_memory = int(gpu_general_info[1]) * (1024**2) # in bytes + num_of_gpus = int(gpu_general_info[2]) + used_gpu_memory = 0 # in bytes + + env_gpu_usage = ( + subprocess.run( + [ + "nvidia-smi", + "--query-compute-apps=pid,gpu_uuid,used_memory", + "--format=csv,nounits", + ], + stdout=subprocess.PIPE, + ) + .stdout.decode("utf-8") + .strip() + .split("\n") + ) + for i in range(1, len(env_gpu_usage)): + single_env_gpu_info = env_gpu_usage[i].strip().split(", ") + if int(single_env_gpu_info[0]) == env_servlet_pid: + used_gpu_memory = used_gpu_memory + int(single_env_gpu_info[-1]) * ( + 1024**2 + ) + if used_gpu_memory > 0: + env_gpu_usage = { + "used_gpu_memory": used_gpu_memory, + "gpu_util_percent": gpu_util_percent / num_of_gpus, + "total_gpu_memory": total_gpu_memory, + } + else: + env_gpu_usage = {} + except subprocess.CalledProcessError: + env_gpu_usage = {} + + return env_gpu_usage + + def status_local(self, env_servlet_pid): + + # The objects in env can be of any type, and not only runhouse resources, + # therefore we need to distinguish them when creating the list of the resources in each env. + objects_in_env_modified = self._get_objects_in_env() + + # Try loading GPU data (if relevant) + env_gpu_usage = self._get_env_gpu_usage(env_servlet_pid) + + env_utilization_data = { + "env_gpu_usage": env_gpu_usage, + } + + return objects_in_env_modified, env_utilization_data + + def _get_server_pid(self): + import subprocess + + cli_list_cluster_actor = ( + subprocess.check_output( + [ + "ray", + "list", + "actors", + "-f", + "state=ALIVE", + "-f" "class_name=ClusterServlet", + ] + ) + .decode("utf-8") + .split("\n") + ) + # The info about the actor is at index 9 of 'cli_list_cluster_actor' list. + # The info about the actor is in a form of a table, where the PID is at index 7 of the table record + # which describes the cluster_servlet. + server_pid = self._parse_env_actor_info( + cli_list_cluster_actor, "ClusterServlet" + ).get("PID") + return server_pid async def astatus(self): + import psutil + config_cluster = self.get_cluster_config() + + # poping out creds because we don't want to show them in the status config_cluster.pop("creds", None) + + # getting cluster servlets (envs) and their related objects cluster_servlets = {} + cluster_envs_env_utilization_data = {} for env in await self.aget_all_initialized_env_servlet_names(): - resources_in_env_modified = await self.acall_env_servlet_method( - env, "astatus_local" + ( + env_memory_usage, + node_name, + total_memory, + env_servlet_pid, + env_actor_info, + node_ip, + ) = self._get_env_cpu_usage(env_name=env, cluster_config=config_cluster) + + ( + objects_in_env_modified, + env_utilization_data, + ) = await self.acall_actor_method( + self.get_env_servlet(env), + method="astatus_local", + env_servlet_pid=env_servlet_pid, ) - cluster_servlets[env] = resources_in_env_modified + env_utilization_data.update( + { + "node_id": env_actor_info.get("NODE_ID"), + "node_ip": node_ip, + "node_name": node_name, + "pid": env_servlet_pid, + "actor_id": env_actor_info.get("ACTOR_ID"), + "env_memory_usage": env_memory_usage, + } + ) + cluster_servlets[env] = objects_in_env_modified + cluster_envs_env_utilization_data[env] = env_utilization_data config_cluster["envs"] = cluster_servlets - return config_cluster + config_cluster["server_pid"] = self._get_server_pid() + + # TODO: decide if we need this info at all: cpu_usage, memory_usage, disk_usage + cpu_usage = psutil.cpu_percent(interval=1) + + # Fields: `available`, `percent`, `used`, `free`, `active`, `inactive`, `buffers`, `cached`, `shared`, `slab` + memory_usage = psutil.virtual_memory()._asdict() + + # Fields: `total`, `used`, `free`, `percent` + disk_usage = psutil.disk_usage("/")._asdict() + + return { + "cluster_config": config_cluster, + "env_servlet_actors": cluster_envs_env_utilization_data, + "system_cpu_usage": cpu_usage, + "system_memory_usage": memory_usage, + "system_disk_usage": disk_usage, + } def status(self): return sync_function(self.astatus)() diff --git a/runhouse/utils.py b/runhouse/utils.py index 3be33f175..c7a022c62 100644 --- a/runhouse/utils.py +++ b/runhouse/utils.py @@ -37,3 +37,10 @@ def wrapper(*args, **kwargs): return future.result() return wrapper + + +def string_to_dict(dict_as_string): + parts = dict_as_string.split(":") + key = parts[0].strip() + value = parts[1].strip() + return key, value diff --git a/tests/conftest.py b/tests/conftest.py index ea6498cb1..006d46298 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -239,6 +239,7 @@ def event_loop(): a10g_gpu_cluster, # noqa: F401 k80_gpu_cluster, # noqa: F401 multinode_cpu_cluster, # noqa: F401 + multinode_gpu_cluster, # noqa: F401 ondemand_aws_cluster, # noqa: F401 ondemand_aws_https_cluster_with_auth, # noqa: F401 ondemand_cluster, # noqa: F401 @@ -408,5 +409,6 @@ def event_loop(): "password_cluster", "multinode_cpu_cluster", "static_cpu_cluster", + "multinode_gpu_cluster", # for testing cluster status on multinode gpu. ] } diff --git a/tests/fixtures/on_demand_cluster_fixtures.py b/tests/fixtures/on_demand_cluster_fixtures.py index 39c8435dd..8ad7f28de 100644 --- a/tests/fixtures/on_demand_cluster_fixtures.py +++ b/tests/fixtures/on_demand_cluster_fixtures.py @@ -9,23 +9,28 @@ from tests.conftest import init_args from tests.utils import test_env +NUM_OF_INSTANCES = 2 + @pytest.fixture() def restart_server(request): return request.config.getoption("--restart-server") -def setup_test_cluster(args, request): +def setup_test_cluster(args, request, create_env=True): cluster = rh.ondemand_cluster(**args) init_args[id(cluster)] = args - if not cluster.is_up(): cluster.up() elif request.config.getoption("--restart-server"): cluster.restart_server() cluster.save() - if cluster.default_env.name == EMPTY_DEFAULT_ENV_NAME: + + # checking if to create_env or not for the status tests. Default val of create_env is True, + # meaning env will be created if the other part of the condition match. + # Therefore it will not affect all other tests (which don't tests status) + if create_env and cluster.default_env.name == EMPTY_DEFAULT_ENV_NAME: test_env().to(cluster) return cluster @@ -116,10 +121,27 @@ def a10g_gpu_cluster(request): def multinode_cpu_cluster(request): args = { "name": "rh-cpu-multinode", - "num_instances": 2, + "num_instances": NUM_OF_INSTANCES, "instance_type": "CPU:2+", } - cluster = setup_test_cluster(args, request) + create_env = False + cluster = setup_test_cluster(args, request, create_env) + env = rh.env(name="worker_env", compute={"CPU": 2}).to(cluster) + assert env + return cluster + + +@pytest.fixture(scope="session") +def multinode_gpu_cluster(request): + args = { + "name": "rh-gpu-multinode", + "num_instances": NUM_OF_INSTANCES, + "instance_type": "g5.xlarge", + } + create_env = False + cluster = setup_test_cluster(args, request, create_env) + env = rh.env(name="worker_env", compute={"GPU": 1, "CPU": 4}).to(cluster) + assert env return cluster diff --git a/tests/test_resources/test_clusters/test_cluster.py b/tests/test_resources/test_clusters/test_cluster.py index 1e683ad1a..2207aeb65 100644 --- a/tests/test_resources/test_clusters/test_cluster.py +++ b/tests/test_resources/test_clusters/test_cluster.py @@ -85,7 +85,7 @@ class TestCluster(tests.test_resources.test_resource.TestResource): "docker_cluster_pk_tls_den_auth", # Represents public app use case "docker_cluster_pk_http_exposed", # Represents within VPC use case "docker_cluster_pwd_ssh_no_auth", - ] + ], } MINIMAL = {"cluster": ["static_cpu_cluster"]} RELEASE = { @@ -105,6 +105,8 @@ class TestCluster(tests.test_resources.test_resource.TestResource): ] } + GPU_CLUSTER_NAMES = ["rh-v100", "rh-k80", "rh-a10x", "rh-gpu-multinode"] + @pytest.mark.level("unit") def test_cluster_factory_and_properties(self, cluster): assert isinstance(cluster, rh.Cluster) @@ -190,7 +192,26 @@ def test_cluster_endpoint(self, cluster): headers=rh.globals.rns_client.request_headers(), ) assert r.status_code == 200 - assert r.json()["resource_type"] == "cluster" + assert r.json().get("cluster_config")["resource_type"] == "cluster" + + @pytest.mark.level("local") + def test_load_cluster_status(self, cluster): + endpoint = cluster.endpoint() + verify = cluster.client.verify + r = requests.get( + f"{endpoint}/status", + verify=verify, + headers=rh.globals.rns_client.request_headers(), + ) + + assert r.status_code == 200 + status_data = r.json() + assert status_data["cluster_config"]["resource_type"] == "cluster" + assert status_data["env_servlet_actors"] + assert status_data["system_cpu_usage"] + assert status_data["system_memory_usage"] + assert status_data["system_disk_usage"] + assert not status_data.get("system_gpu_data") @pytest.mark.level("local") def test_cluster_objects(self, cluster): @@ -248,18 +269,83 @@ def test_rh_here_objects(self, cluster): @pytest.mark.level("local") def test_rh_status_pythonic(self, cluster): - cluster.put(key="status_key1", obj="status_value1", env="numpy_env") - res = cluster.status() + if "multinode" in cluster.name: + cluster.put(key="status_key1", obj="status_value1", env="worker_env") + else: + cluster.put(key="status_key1", obj="status_value1", env="numpy_env") + cluster_data = cluster.status() + res = cluster_data.get("cluster_config") + + # test cluster config info assert res.get("creds") is None assert res.get("server_port") == (cluster.server_port or DEFAULT_SERVER_PORT) assert res.get("server_connection_type") == cluster.server_connection_type assert res.get("den_auth") == cluster.den_auth assert res.get("resource_type") == cluster.RESOURCE_TYPE assert res.get("ips") == cluster.ips - assert "numpy_env" in res.get("envs") - assert {"name": "status_key1", "resource_type": "str"} in res.get("envs")[ - "numpy_env" + + if "multinode" in cluster.name: + assert "worker_env" in res.get("envs") + assert {"name": "status_key1", "resource_type": "str"} in res.get("envs")[ + "worker_env" + ] + else: + assert "numpy_env" in res.get("envs") + assert {"name": "status_key1", "resource_type": "str"} in res.get("envs")[ + "numpy_env" + ] + + # test memory usage info + expected_env_servlet_keys = [ + "actor_id", + "env_gpu_usage", + "env_memory_usage", + "node_id", + "node_ip", + "node_name", + "pid", ] + envs_names = list(res.get("envs").keys()) + envs_names.sort() + assert "env_servlet_actors" in cluster_data.keys() + env_servlets_info = cluster_data.get("env_servlet_actors") + env_actors_keys = list(env_servlets_info.keys()) + env_actors_keys.sort() + assert envs_names == env_actors_keys + for env_name in envs_names: + env_servlet_info = env_servlets_info.get(env_name) + env_servlet_info_keys = list(env_servlet_info.keys()) + env_servlet_info_keys.sort() + assert env_servlet_info_keys == expected_env_servlet_keys + + if cluster.name in self.GPU_CLUSTER_NAMES and env_name == "sd_env": + assert env_servlet_info.get("env_gpu_usage") + + @pytest.mark.level("maximal") + def test_rh_status_pythonic_gpu(self, cluster): + if cluster.name in self.GPU_CLUSTER_NAMES: + from tests.test_tutorials import sd_generate + + env_sd = rh.env( + reqs=["pytest", "diffusers", "torch", "transformers"], + name="sd_env", + compute={"GPU": 1, "CPU": 4}, + ).to(system=cluster, force_install=True) + + assert env_sd + + generate_gpu = rh.function(fn=sd_generate).to(system=cluster, env=env_sd) + + images = generate_gpu( + prompt="A hot dog made of matcha powder.", num_images=4, steps=50 + ) + + assert images + + self.test_rh_status_pythonic(cluster) + + else: + pytest.skip(f"{cluster.name} is not a GPU cluster, skipping") @pytest.mark.level("local") def test_rh_status_cli_in_cluster(self, cluster): @@ -282,17 +368,58 @@ def test_rh_status_cli_in_cluster(self, cluster): in status_output_string ) assert f"den_auth: {str(cluster.den_auth)}" in status_output_string - assert f"resource_type: {cluster.RESOURCE_TYPE.lower()}" in status_output_string + assert ( + f"resource_subtype: {cluster.config().get('resource_subtype')}" + in status_output_string + ) assert f"ips: {str(cluster.ips)}" in status_output_string assert "Serving " in status_output_string - assert f"{default_env_name}" in status_output_string + assert f"{default_env_name} (runhouse.Env)" in status_output_string assert "status_key2 (str)" in status_output_string assert "creds" not in status_output_string + # checking the memory info is printed correctly + assert "CPU: " in status_output_string + assert status_output_string.count("CPU: ") >= 1 + assert "pid: " in status_output_string + assert status_output_string.count("pid: ") >= 1 + assert "node: " in status_output_string + assert status_output_string.count("node: ") >= 1 + + # if it is a GPU cluster, check GPU print as well + if cluster.name in self.GPU_CLUSTER_NAMES: + assert "GPU: " in status_output_string + assert status_output_string.count("GPU: ") >= 1 + + @pytest.mark.level("maximal") + def test_rh_status_cli_in_gpu_cluster(self, cluster): + if cluster.name in self.GPU_CLUSTER_NAMES: + from tests.test_tutorials import sd_generate + + env_sd = rh.env( + reqs=["pytest", "diffusers", "torch", "transformers"], + name="sd_env", + compute={"GPU": 1}, + ).to(system=cluster, force_install=True) + + assert env_sd + generate_gpu = rh.function(fn=sd_generate).to(system=cluster, env=env_sd) + images = generate_gpu( + prompt="A hot dog made of matcha powder.", num_images=4, steps=50 + ) + assert images + + self.test_rh_status_cli_in_cluster(cluster) + + else: + pytest.skip(f"{cluster.name} is not a GPU cluster, skipping") + @pytest.mark.skip("Restarting the server mid-test causes some errors, need to fix") @pytest.mark.level("local") + # TODO: once fixed, extend this tests for gpu clusters as well. def test_rh_status_cli_not_in_cluster(self, cluster): - # TODO -- check this base_env + default_env_name = cluster.default_env.name + cluster.put(key="status_key3", obj="status_value3") res = str( subprocess.check_output(["runhouse", "status", f"{cluster.name}"]), "utf-8" @@ -301,15 +428,16 @@ def test_rh_status_cli_not_in_cluster(self, cluster): assert f"server_port: {cluster.server_port}" in res assert f"server_connection_type: {cluster.server_connection_type}" in res assert f"den_auth: {str(cluster.den_auth)}" in res - assert f"resource_type: {cluster.RESOURCE_TYPE.lower()}" in res + assert f"resource_subtype: {cluster.RESOURCE_TYPE.capitalize()}" in res assert f"ips: {str(cluster.ips)}" in res assert "Serving 🍦 :" in res - assert "base_env (runhouse.resources.envs.env.Env):" in res + assert f"{default_env_name} (runhouse.Env)" in res assert "status_key3 (str)" in res assert "ssh_certs" not in res @pytest.mark.skip("Restarting the server mid-test causes some errors, need to fix") @pytest.mark.level("local") + # TODO: once fixed, extend this tests for gpu clusters as well. def test_rh_status_stopped(self, cluster): try: cluster_name = cluster.name @@ -457,7 +585,7 @@ def test_changing_name_and_saving_in_between(self, cluster): # If save did not update the name, this will attempt to create a connection # when the cluster is used remotely. However, if you update the name, `on_this_cluster` will - # work correclty and then the remote function will just call the object store when it calls .keys() + # work correctly and then the remote function will just call the object store when it calls .keys() assert cluster.keys() == cluster_keys_remote(cluster) # Restore the state? diff --git a/tests/test_resources/test_clusters/test_cluster_default_env.py b/tests/test_resources/test_clusters/test_cluster_default_env.py index 27a261949..f89d5d7a2 100644 --- a/tests/test_resources/test_clusters/test_cluster_default_env.py +++ b/tests/test_resources/test_clusters/test_cluster_default_env.py @@ -28,7 +28,7 @@ class TestCluster(tests.test_resources.test_clusters.test_cluster.TestCluster): @pytest.mark.level("local") def test_default_env_in_status(self, cluster): res = cluster.status() - assert cluster.default_env.name in res.get("envs") + assert cluster.default_env.name in res.get("cluster_config").get("envs") @pytest.mark.level("local") def test_put_in_default_env(self, cluster): diff --git a/tests/test_resources/test_clusters/test_on_demand_cluster.py b/tests/test_resources/test_clusters/test_on_demand_cluster.py index 1ad1e838e..fa52ad320 100644 --- a/tests/test_resources/test_clusters/test_on_demand_cluster.py +++ b/tests/test_resources/test_clusters/test_on_demand_cluster.py @@ -33,5 +33,6 @@ class TestOnDemandCluster(tests.test_resources.test_clusters.test_cluster.TestCl "static_cpu_cluster", "password_cluster", "multinode_cpu_cluster", + "multinode_gpu_cluster", ] }