Skip to content

Commit

Permalink
Add cluster utilization data to status endpoint (#653)
Browse files Browse the repository at this point in the history
  • Loading branch information
jlewitt1 committed May 15, 2024
1 parent 2ea74ff commit e9e449a
Show file tree
Hide file tree
Showing 12 changed files with 720 additions and 104 deletions.
4 changes: 4 additions & 0 deletions runhouse/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,7 @@
TEST_ORG = "test-org"

EMPTY_DEFAULT_ENV_NAME = "_cluster_default_env"

# cluster status constants
DOUBLE_SPACE_UNICODE = "\u00A0\u00A0"
BULLET_UNICODE = "\u2022"
307 changes: 249 additions & 58 deletions runhouse/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import copy
import importlib
import logging
import math
import shlex
import subprocess
import time
Expand All @@ -17,8 +20,10 @@

import runhouse.rns.login

from runhouse import __version__, cluster, configs
from runhouse import __version__, cluster, Cluster, configs
from runhouse.constants import (
BULLET_UNICODE,
DOUBLE_SPACE_UNICODE,
RAY_KILL_CMD,
RAY_START_CMD,
SERVER_LOGFILE,
Expand All @@ -27,7 +32,7 @@
START_NOHUP_CMD,
START_SCREEN_CMD,
)
from runhouse.globals import obj_store
from runhouse.globals import obj_store, rns_client
from runhouse.resources.hardware.ray_utils import (
check_for_existing_ray_instance,
kill_actors,
Expand Down Expand Up @@ -128,81 +133,243 @@ def ssh(cluster_name: str, up: bool = typer.Option(False, help="Start the cluste
c.ssh()


def _print_status(config):
###############################
# Status helping functions
###############################


def _adjust_resource_type(resource_type: str):
"""
Prints the status of the cluster to the console
:param config: cluster's config
:return: cluster's config
status helping function. transforms a str form runhouse.resources.{X.Y...}.resource_type to runhouse.resource_type
"""
envs = config["envs"]
config.pop("envs", [])

# print headlines
daemon_headline_txt = (
"\N{smiling face with horns} Runhouse Daemon is running \N{Runner}"
)
try:
resource_type = resource_type.split(".")[-1]
getattr(importlib.import_module("runhouse"), resource_type)
return f"runhouse.{resource_type}"
except AttributeError:
return resource_type

console.print(daemon_headline_txt, style="bold royal_blue1")
if "name" in config.keys():
console.print(config["name"])

first_info_to_print = ["den_auth", "server_connection_type", "server_port"]
def _resource_name_to_rns(name: str):
"""
If possible, transform the resource name to a rns address.
If not, return the name as is (it is the key in the object store).
"""
resource_config = rns_client.load_config(name)
if resource_config and resource_config.get("resource_type") != "env":
return resource_config.get("name")
else:
return name

if config.get("default_env") and isinstance(config["default_env"], Dict):
config["default_env"] = config["default_env"]["name"]

for info in config:
if info in first_info_to_print:
console.print(f"\u2022 {info}: {config[info]}")
first_info_to_print.append("name")
def _print_cluster_config(cluster_config: Dict):
"""
Helping function to the `_print_status` which prints the relevant info from the cluster config.
"""
if "name" in cluster_config.keys():
console.print(cluster_config.get("name"))

top_level_config = [
"server_port",
"server_pid",
"den_auth",
"server_connection_type",
]

backend_config = [
"resource_subtype",
"use_local_telemetry",
"domain",
"server_host",
"ips",
"resource_subtype",
]

if cluster_config.get("resource_subtype") != "Cluster":
backend_config.append("autostop_mins")

if cluster_config.get("default_env") and isinstance(
cluster_config.get("default_env"), Dict
):
cluster_config["default_env"] = cluster_config["default_env"]["name"]

for key in top_level_config:
console.print(f"{BULLET_UNICODE} {key}: {cluster_config[key]}")

console.print("\u2022 backend config:")
for info in config:
if info not in first_info_to_print:
console.print(f"\t\u2022 {info}: {config[info]}")
for key in backend_config:
if key == "autostop_mins" and cluster_config[key] == -1:
console.print(
f"{DOUBLE_SPACE_UNICODE}{BULLET_UNICODE} {key}: autostop disabled"
)
else:
console.print(
f"{DOUBLE_SPACE_UNICODE}{BULLET_UNICODE} {key}: {cluster_config[key]}"
)

# print the environments in the cluster, and the resources associated with each environment.

def _print_envs_info(envs: Dict, envs_info: Dict, current_cluster: Cluster):
"""
Prints info about the envs in the current_cluster.
Prints the resources in each env, and the CPU and GPU usage of the env (if exists).
:param envs: Dict of envs in each cluster, and the resources associated with them.
:param envs_info: Dict of cpu and gpu info of the envs.
:param current_cluster: The cluster whose status we are printing.
"""
# Print headline
envs_in_cluster_headline = "Serving 🍦 :"
console.print(envs_in_cluster_headline, style="bold")

if len(envs) == 0:
console.print("This cluster has no environment nor resources.")

for env_name in envs:
first_envs_to_print = []

# First: if the default env does not have resources, print it.
default_env_name = current_cluster.default_env.name
if len(envs[default_env_name]) <= 1:
# case where the default env doesn't hve any other resources, apart from the default env itself.
console.print(
f"{BULLET_UNICODE} {default_env_name} (runhouse.Env)",
style="italic bold",
)
console.print(
f"{DOUBLE_SPACE_UNICODE}This environment has only python packages installed, if such provided. No "
"resources were found."
)

else:
# case where the default env have other resources. We make sure that our of all the envs which have reosirces,
# the default_env will be printed first.
first_envs_to_print = [default_env_name]

# Make sure to print envs with no resources first.
# (the only resource they have is a runhouse.env, which is the env itself).
first_envs_to_print = first_envs_to_print + [
env_name
for env_name in envs
if (len(envs[env_name]) <= 1 and env_name != default_env_name)
]

# Now, print the envs.
# If the env have packages installed, that means that it contains an env resource. In that case:
# * If the env contains only itself, we will print that the env contains only the installed packages.
# * Else, we will print the resources (rh.function, th.module) associated with the env.

envs_to_print = first_envs_to_print + [
env_name
for env_name in envs
if env_name not in first_envs_to_print + [default_env_name]
]

for env_name in envs_to_print:
resources_in_env = envs[env_name]
if len(resources_in_env) == 0:
console.print(f"{env_name} (Env):", style="italic underline")
console.print("This environment has no resources.")
env_process_info = envs_info[env_name]
current_env = [
resource for resource in resources_in_env if resource["name"] == env_name
]

# sometimes the env itself is not a resource (key) inside the env's servlet.
if len(current_env) == 0:
env_name_print = _resource_name_to_rns(env_name)
env_type = "runhouse.Env"
else:
current_env = current_env[0]
env_name_print = _resource_name_to_rns(current_env["name"])
env_type = _adjust_resource_type(current_env["resource_type"])

env_name_txt = f"{BULLET_UNICODE} {env_name_print} ({env_type}) | pid: {env_process_info['pid']} | node: {env_process_info['node_name']}"
console.print(env_name_txt, style="italic bold")

# Print CPU info
env_cpu_info = env_process_info.get("env_memory_usage")
if env_cpu_info:

# convert bytes to GB
memory_usage_gb = round(
int(env_cpu_info["memory_size_bytes"]) / (1024**3),
2,
)
total_cluster_memory = math.ceil(
int(env_cpu_info["total_cluster_memory"]) / (1024**3)
)
cpu_memory_usage_percent = round(
float(env_cpu_info["memory_percent_from_cluster"]),
2,
)
cpu_usage_percent = round(float(env_cpu_info["cpu_usage_percent"]), 2)

cpu_usage_summery = f"{DOUBLE_SPACE_UNICODE}CPU: {cpu_usage_percent}% | Memory: {memory_usage_gb} / {total_cluster_memory} Gb ({cpu_memory_usage_percent}%)"

else:
current_env = [
resource
for resource in resources_in_env
if resource["name"] == env_name
]

# sometimes the env itself is not a resource (key) inside the env's servlet.
if len(current_env) == 0:
env_name_txt = f"{env_name} (Env):"
else:
current_env = current_env[0]
env_name_txt = (
f"{current_env['name']} ({current_env['resource_type']}):"
)
cpu_usage_summery = (
f"{DOUBLE_SPACE_UNICODE}CPU: This process did not use CPU memory."
)

console.print(
env_name_txt,
style="italic underline",
console.print(cpu_usage_summery)

# Print GPU info
env_gpu_info = env_process_info.get("env_gpu_usage")

# sometimes the cluster has no GPU, therefore the env_gpu_info is an empty dictionary.
if env_gpu_info:
# get the gpu usage info, and convert it to GB.
total_gpu_memory = math.ceil(
float(env_gpu_info.get("total_gpu_memory")) / (1024**3)
)
gpu_util_percent = round(float(env_gpu_info.get("gpu_util_percent")), 2)
used_gpu_memory = round(
float(env_gpu_info.get("used_gpu_memory")) / (1024**3), 2
)
gpu_memory_usage_percent = round(
float(used_gpu_memory / total_gpu_memory) * 100, 2
)
gpu_usage_summery = f"{DOUBLE_SPACE_UNICODE}GPU: {gpu_util_percent}% | Memory: {used_gpu_memory} / {total_gpu_memory} Gb ({gpu_memory_usage_percent}%)"
console.print(gpu_usage_summery)

resources_in_env = [
resource for resource in resources_in_env if resource is not current_env
]

resources_in_env = [
resource for resource in resources_in_env if resource is not current_env
]
if len(resources_in_env) == 0:
# No resources were found in the env, only the associated installed python reqs were installed.
console.print(
f"{DOUBLE_SPACE_UNICODE}This environment has only python packages installed, if such provided. No resources were "
"found."
)

else:
for resource in resources_in_env:
resource_name = resource["name"]
resource_type = resource["resource_type"]
console.print(f"\u2022{resource_name} ({resource_type})")
resource_name = _resource_name_to_rns(resource["name"])
resource_type = _adjust_resource_type(resource["resource_type"])
console.print(
f"{DOUBLE_SPACE_UNICODE}{BULLET_UNICODE} {resource_name} ({resource_type})"
)


def _print_status(config: dict, current_cluster: Cluster):
"""
Prints the status of the cluster to the console
:param config: cluster's config
:return: cluster's config
"""

cluster_config = config.get("cluster_config")
envs = cluster_config.pop("envs", [])
envs_info = config.pop("env_servlet_actors", [])

# print headline
daemon_headline_txt = (
"\N{smiling face with horns} Runhouse Daemon is running \N{Runner}"
)
console.print(daemon_headline_txt, style="bold royal_blue1")

# Print relevant info from cluster config.
_print_cluster_config(cluster_config)

# print the environments in the cluster, and the resources associated with each environment.
_print_envs_info(envs, envs_info, current_cluster)

return config

Expand All @@ -216,6 +383,8 @@ def status(
):
"""Load the status of the Runhouse daemon running on a cluster."""

cluster_or_local = rh.here

if cluster_name:
current_cluster = cluster(name=cluster_name)
if not current_cluster.is_up():
Expand All @@ -232,20 +401,42 @@ def status(
f"`runhouse ssh {cluster_name}` or `sky status -r` for on-demand clusters."
)
raise typer.Exit(1)
cluster_status = current_cluster.status()
else:
current_cluster = rh.here
if not current_cluster or current_cluster == "file":
if not cluster_or_local or cluster_or_local == "file":
console.print(
"\N{smiling face with horns} Runhouse Daemon is not running... \N{No Entry} \N{Runner}. "
"Start it with `runhouse restart` or specify a remote "
"cluster to poll with `runhouse status <cluster_name>`."
)
raise typer.Exit(1)

if cluster_or_local != "file":
# If we are on the cluster load status directly from the object store
cluster_status: dict = obj_store.status()
cluster_config = copy.deepcopy(cluster_status.get("cluster_config"))
current_cluster: Cluster = Cluster.from_config(cluster_config)
return _print_status(cluster_status, current_cluster)

return _print_status(cluster_status)
if cluster_name is None:
# If running outside the cluster must specify a cluster name
console.print("Missing argument `cluster_name`.")
return

try:
current_cluster: Cluster = Cluster.from_name(name=cluster_name)
cluster_status: dict = current_cluster.status(
resource_address=current_cluster.rns_address
)

except ValueError:
console.print("Failed to load status for cluster.")
return
except requests.exceptions.ConnectionError:
console.print(
"\N{smiling face with horns} Runhouse Daemon is not running... \N{No Entry} \N{Runner}"
)
return
return _print_status(cluster_status, current_cluster)


def load_cluster(cluster_name: str):
Expand Down

0 comments on commit e9e449a

Please sign in to comment.