Skip to content

Commit

Permalink
disable periodic status check, enable changing interval size between …
Browse files Browse the repository at this point in the history
…cluster checks
  • Loading branch information
Alexandra Belousov authored and Alexandra Belousov committed May 26, 2024
1 parent edd1839 commit 8f27a55
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 5 deletions.
56 changes: 55 additions & 1 deletion runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
DEFAULT_HTTPS_PORT,
DEFAULT_RAY_PORT,
DEFAULT_SERVER_PORT,
DEFAULT_STATUS_CHECK_INTERVAL,
EMPTY_DEFAULT_ENV_NAME,
LOCALHOST,
RESERVED_SYSTEM_NAMES,
Expand Down Expand Up @@ -150,9 +151,21 @@ def default_env(self, env):
"Run `cluster.restart_server()` to restart the Runhouse server on the new default env."
)

def save_config_to_cluster(self, node: str = None):
def save_config_to_cluster(
self,
node: str = None,
status_check_interval: int = DEFAULT_STATUS_CHECK_INTERVAL,
):
config = self.config(condensed=False)

# popping creds, because we don't want the secret reds will be saved on the cluster.
config.pop("creds")

# if the cluster has den authorization, the cluster status will be checked periodically.
# Saving the time interval between consecutive status checks to cluster_config.
if self.den_auth:
config["status_check_interval"] = status_check_interval

json_config = f"{json.dumps(config)}"

self.run(
Expand Down Expand Up @@ -1645,3 +1658,44 @@ def _check_for_child_configs(cls, config: dict):
config["default_env"] = default_env

return config

def _is_den_authed_and_saved_to_den(self, change_type: str):
"""
checking if the cluster and den_auth and if it saved to den.
:param change_type: the change the user wants to make to the cluster check job: disable it or change the
interval size between consecutive runs.
:return: True if cluster has den_auth and it is saved in Den. False otherwise.
"""
if not self.den_auth:
logger.error(
f"Cluster must have Den authorization to {change_type} periodic status checks. "
f"Make sure you have a Den account, and you've created your cluster with den_auth = True."
)
return False
saved_to_den = rns_client.exists(self.name)
if not saved_to_den:
logger.error(
f"The cluster must be saved to Den to {change_type} periodic status checks."
)
return False

return True

def _disable_periodic_status_check(self):
"""
Stopping consecutively sending status to Den.
"""
if not self._is_den_authed_and_saved_to_den(change_type="disable"):
return
self.save_config_to_cluster(status_check_interval=-1)

def _change_periodic_status_check_interval(self, new_interval: int):
"""
change the interval size between consecutive status check runs.
:param new_interval: int, the new interval size.
"""
if not self._is_den_authed_and_saved_to_den(
change_type="change periodic status checks interval size"
):
return
self.save_config_to_cluster(status_check_interval=new_interval)
36 changes: 32 additions & 4 deletions runhouse/servers/cluster_servlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import threading
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Union

import requests
Expand Down Expand Up @@ -118,6 +119,22 @@ def update_autostop(self):
async def aget_cluster_config(self) -> Dict[str, Any]:
return self.cluster_config

async def aupdate_status_check_interval_in_cluster_config(self):
cluster_path = Path("~/.rh/cluster_config.json").expanduser()
with open(cluster_path) as cluster_local_config:
local_config = json.load(cluster_local_config)
local_interval = local_config.get("status_check_interval")
servlet_interval = self.cluster_config.get("status_check_interval")
if local_interval != servlet_interval:
await self.aset_cluster_config(local_config)
if local_interval > 0:
logger.info(
f"Updated cluster_config with new status check interval: {round(local_interval/60, 2)} minutes."
)
return True
else:
return False

async def aset_cluster_config(self, cluster_config: Dict[str, Any]):
self.cluster_config = cluster_config

Expand Down Expand Up @@ -266,10 +283,22 @@ async def asend_status_info_to_den(self):
# Delay the start of post_status_thread, so we'll finish the cluster startup properly
await asyncio.sleep(STATUS_CHECK_DELAY)
while True:
logger.info("Sending cluster status to Den.")
logger.info("Trying to send cluster status to Den.")
try:
interval_size = DEFAULT_STATUS_CHECK_INTERVAL
is_config_updated = (
await self.aupdate_status_check_interval_in_cluster_config()
)
interval_size = (await self.aget_cluster_config()).get(
"status_check_interval", DEFAULT_STATUS_CHECK_INTERVAL
)
if interval_size == -1:
if is_config_updated:
logger.info(
f"Disabled periodic cluster status check. For enabling it, please run "
f"cluster.restart_server(). If you want to set up an interval size that is not the "
f"default value {round(DEFAULT_STATUS_CHECK_INTERVAL/60,2)} please run "
f"cluster._change_periodic_status_check_interval(interval_size) after restarting the server."
)
break
status: ResourceStatusData = await self.astatus()
status_data = {
Expand Down Expand Up @@ -303,12 +332,11 @@ async def asend_status_info_to_den(self):
logger.warning(
f"Temporarily increasing the interval between two consecutive status checks. "
f"Next status check will be in {round(INCREASED_STATUS_CHECK_INTERVAL / 60, 2)} minutes. "
f"For changing the interval size, please restart the server with a new interval size value. "
f"For changing the interval size, please run cluster._change_periodic_status_check_interval(new_interval). "
f"If a value is not provided, interval size will be set to {DEFAULT_STATUS_CHECK_INTERVAL}"
)
await asyncio.sleep(INCREASED_STATUS_CHECK_INTERVAL)
finally:

await asyncio.sleep(interval_size)

def send_status_info_to_den(self):
Expand Down

0 comments on commit 8f27a55

Please sign in to comment.