Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto stop for cluster #653

Merged
merged 40 commits into from
Mar 31, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
f9d43c9
refactorize skylet
Michaelvll Mar 25, 2022
af060a4
implement autostop event without cluster stopping
Michaelvll Mar 25, 2022
dfce233
wip
Michaelvll Mar 26, 2022
ba3af68
Remove autostop from yaml file
Michaelvll Mar 26, 2022
fbdd296
fix naming
Michaelvll Mar 26, 2022
b33ba85
fix config
Michaelvll Mar 26, 2022
9b6cde2
fix skylet
Michaelvll Mar 27, 2022
8793f57
add autostop to status
Michaelvll Mar 27, 2022
d483c76
Merge branch 'master' of github.com:concretevitamin/sky-experiments i…
Michaelvll Mar 27, 2022
dc22d74
fix state and name match
Michaelvll Mar 27, 2022
a8d32b4
Replace min_workers/max_workers for gcp
Michaelvll Mar 27, 2022
5f883be
using ray up / ray down process
Michaelvll Mar 27, 2022
ddf0ba1
fix stopping
Michaelvll Mar 27, 2022
3524f04
set autostop in globle user state
Michaelvll Mar 27, 2022
b481ebe
update sky status
Michaelvll Mar 28, 2022
a66913c
format
Michaelvll Mar 28, 2022
f968bf2
Add refresh to sky status
Michaelvll Mar 28, 2022
dec91cd
Merge branch 'master' of github.com:concretevitamin/sky-experiments i…
Michaelvll Mar 28, 2022
42844d9
Merge branch 'master' of github.com:concretevitamin/sky-experiments i…
Michaelvll Mar 29, 2022
492c39d
Merge branch 'master' of github.com:concretevitamin/sky-experiments i…
Michaelvll Mar 29, 2022
9a207ae
address comments
Michaelvll Mar 29, 2022
6d3afba
comment
Michaelvll Mar 29, 2022
642f0a8
address comments
Michaelvll Mar 29, 2022
70a3aeb
Merge branch 'master' of github.com:concretevitamin/sky-experiments i…
Michaelvll Mar 29, 2022
905ba07
Fix logging
Michaelvll Mar 29, 2022
a34a810
update help
Michaelvll Mar 29, 2022
13f757a
remove ssh config and bring cursor back
Michaelvll Mar 30, 2022
af40dc4
Fix exec on stopped instance
Michaelvll Mar 30, 2022
15466be
address comment
Michaelvll Mar 30, 2022
5d20eef
format
Michaelvll Mar 30, 2022
af404a6
fix
Michaelvll Mar 30, 2022
0331aac
Add test for autostop
Michaelvll Mar 30, 2022
15b434b
Fix cancel
Michaelvll Mar 30, 2022
dfaea0f
Merge branch 'master' of github.com:concretevitamin/sky-experiments i…
Michaelvll Mar 31, 2022
c653006
address comment
Michaelvll Mar 31, 2022
e5227b6
address comment
Michaelvll Mar 31, 2022
0fb5a4d
Fix sky launch will change autostop to -1
Michaelvll Mar 31, 2022
1d1f9bd
format
Michaelvll Mar 31, 2022
d70bfbc
Add docs
Michaelvll Mar 31, 2022
b62c7dc
update
Michaelvll Mar 31, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 47 additions & 14 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
import textwrap
import threading
import time
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union
import typing
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import uuid
import yaml

Expand All @@ -31,7 +32,7 @@
from sky.adaptors import azure
from sky.skylet import log_lib

if TYPE_CHECKING:
if typing.TYPE_CHECKING:
from sky import resources

logger = sky_logging.init_logger(__name__)
Expand Down Expand Up @@ -1046,16 +1047,28 @@ def get_node_ips(
handle is not None and handle.head_ip is not None):
return [handle.head_ip]

out = run(f'ray get-head-ip {yaml_handle}',
stdout=subprocess.PIPE).stdout.decode().strip()
head_ip = re.findall(IP_ADDR_REGEX, out)
try:
proc = run(f'ray get-head-ip {yaml_handle}',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out = proc.stdout.decode().strip()
head_ip = re.findall(IP_ADDR_REGEX, out)
except subprocess.CalledProcessError as e:
raise exceptions.FetchIPError(
exceptions.FetchIPError.Reason.HEAD) from e
if len(head_ip) != 1:
raise exceptions.FetchIPError(exceptions.FetchIPError.Reason.HEAD)
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved

if expected_num_nodes > 1:
out = run(f'ray get-worker-ips {yaml_handle}',
stdout=subprocess.PIPE).stdout.decode()
worker_ips = re.findall(IP_ADDR_REGEX, out)
try:
proc = run(f'ray get-worker-ips {yaml_handle}',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out = proc.stdout.decode()
worker_ips = re.findall(IP_ADDR_REGEX, out)
except subprocess.CalledProcessError as e:
raise exceptions.FetchIPError(
exceptions.FetchIPError.Reason.WORKER) from e
if len(worker_ips) != expected_num_nodes - 1:
raise exceptions.FetchIPError(exceptions.FetchIPError.Reason.WORKER)
else:
Expand Down Expand Up @@ -1086,8 +1099,11 @@ def get_head_ip(
return head_ip


def _update_cluster(record: Dict[str, Any]) -> global_user_state.ClusterStatus:
def _ping_cluster_or_set_to_stopped(
record: Dict[str, Any]) -> global_user_state.ClusterStatus:
handle = record['handle']
if not isinstance(handle, backends.CloudVmRayBackend.ResourceHandle):
return record
# Autostop is disabled for the cluster
if record['autostop'] < 0:
return record
Expand All @@ -1100,26 +1116,26 @@ def _update_cluster(record: Dict[str, Any]) -> global_user_state.ClusterStatus:
# since it will be stopped as soon as the workers are stopped.
logger.debug(f'Failed to get IPs from cluster {cluster_name}: {e}, '
'set to STOPPED')
except subprocess.CalledProcessError as e:
logger.debug(e)
global_user_state.remove_cluster(cluster_name, terminate=False)
auth_config = read_yaml(handle.cluster_yaml)['auth']
SSHConfigHelper.remove_cluster(cluster_name, handle.head_ip, auth_config)
return global_user_state.get_cluster_from_name(cluster_name)


def get_status_from_cluster_name(
cluster_name: str) -> global_user_state.ClusterStatus:
cluster_name: str) -> Optional[global_user_state.ClusterStatus]:
record = global_user_state.get_cluster_from_name(cluster_name)
if record is None:
return None
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
record = _update_cluster(record)
record = _ping_cluster_or_set_to_stopped(record)
return record['status']


def get_clusters(refresh: bool) -> List[Dict[str, Any]]:
records = global_user_state.get_clusters()
if not refresh:
return records
return [_update_cluster(record) for record in records]
return [_ping_cluster_or_set_to_stopped(record) for record in records]


def query_head_ip_with_retries(cluster_yaml: str, retry_count: int = 1) -> str:
Expand Down Expand Up @@ -1155,3 +1171,20 @@ def get_backend_from_handle(
raise NotImplementedError(
f'Handle type {type(handle)} is not supported yet.')
return backend


class NoOpConsole:
"""An empty class for multi-threaded console.status."""

def __enter__(self):
pass

def __exit__(self, exc_type, exc_val, exc_tb):
pass


def safe_console_status(msg: str):
"""A wrapper for multi-threaded console.status."""
if threading.current_thread() is threading.main_thread():
return console.status(msg)
return NoOpConsole()
58 changes: 32 additions & 26 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
import tempfile
import textwrap
import time
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union
import typing
from typing import Dict, List, Optional, Tuple, Union

import colorama
import filelock
Expand All @@ -30,11 +31,12 @@
from sky import optimizer
from sky import task as task_lib
from sky.backends import backend_utils
from sky.backends import wheel_utils
from sky.skylet import autostop_lib, job_lib, log_lib

if TYPE_CHECKING:
if typing.TYPE_CHECKING:
from sky import dag
from sky import resources
from sky import resources as resources_lib

OptimizeTarget = optimizer.OptimizeTarget
Path = str
Expand Down Expand Up @@ -408,7 +410,7 @@ class ToProvisionConfig:

def __init__(self,
cluster_name: str,
resources: Optional['resources.Resources'],
resources: Optional['resources_lib.Resources'],
num_nodes: int,
cluster_exists: bool = False) -> None:
assert cluster_name is not None, 'cluster_name must be specified.'
Expand All @@ -426,7 +428,8 @@ class GangSchedulingStatus(enum.Enum):
HEAD_FAILED = 2

def __init__(self, log_dir: str, dag: 'dag.Dag',
optimize_target: OptimizeTarget):
optimize_target: OptimizeTarget,
local_wheel_path: pathlib.Path):
self._blocked_regions = set()
self._blocked_zones = set()
self._blocked_launchable_resources = set()
Expand Down Expand Up @@ -599,7 +602,7 @@ def _update_blocklist_on_error(self, cloud, region, zones, stdout,
region, zones, stdout, stderr)
assert False, f'Unknown cloud: {cloud}.'

def _yield_region_zones(self, to_provision: 'resources.Resources',
def _yield_region_zones(self, to_provision: 'resources_lib.Resources',
cluster_name: str, cluster_exists: bool):
cloud = to_provision.cloud
region = None
Expand Down Expand Up @@ -714,7 +717,7 @@ def _yield_region_zones(self, to_provision: 'resources.Resources',
):
yield (region, zones)

def _try_provision_tpu(self, to_provision: 'resources.Resources',
def _try_provision_tpu(self, to_provision: 'resources_lib.Resources',
config_dict: Dict[str, str]) -> bool:
"""Returns whether the provision is successful."""
tpu_name = config_dict['tpu_name']
Expand Down Expand Up @@ -767,7 +770,7 @@ def _try_provision_tpu(self, to_provision: 'resources.Resources',
raise e

def _retry_region_zones(self,
to_provision: 'resources.Resources',
to_provision: 'resources_lib.Resources',
num_nodes: int,
dryrun: bool,
stream_logs: bool,
Expand Down Expand Up @@ -1114,15 +1117,16 @@ class ResourceHandle(object):
- (optional) If TPU(s) are managed, a path to a deletion script.
"""

def __init__(self,
*,
cluster_name: str,
cluster_yaml: str,
head_ip: Optional[str] = None,
launched_nodes: Optional[int] = None,
launched_resources: Optional['resources.Resources'] = None,
tpu_create_script: Optional[str] = None,
tpu_delete_script: Optional[str] = None) -> None:
def __init__(
self,
*,
cluster_name: str,
cluster_yaml: str,
head_ip: Optional[str] = None,
launched_nodes: Optional[int] = None,
launched_resources: Optional['resources_lib.Resources'] = None,
tpu_create_script: Optional[str] = None,
tpu_delete_script: Optional[str] = None) -> None:
self.cluster_name = cluster_name
self.cluster_yaml = cluster_yaml
self.head_ip = head_ip
Expand Down Expand Up @@ -1180,7 +1184,7 @@ def _check_task_resources_smaller_than_cluster(self, handle: ResourceHandle,
f'existing cluster first: sky down {cluster_name}')

def _check_existing_cluster(
self, task: task_lib.Task, to_provision: 'resources.Resources',
self, task: task_lib.Task, to_provision: 'resources_lib.Resources',
cluster_name: str) -> RetryingVmProvisioner.ToProvisionConfig:
handle = global_user_state.get_handle_from_cluster_name(cluster_name)
if handle is not None:
Expand Down Expand Up @@ -1222,7 +1226,7 @@ def _set_tpu_name(self, cluster_config_file: str, num_nodes: int,

def provision(self,
task: task_lib.Task,
to_provision: Optional['resources.Resources'],
to_provision: Optional['resources_lib.Resources'],
dryrun: bool,
stream_logs: bool,
cluster_name: Optional[str] = None):
Expand All @@ -1247,6 +1251,11 @@ def provision(self,
task, to_provision, cluster_name)
prev_cluster_status = (
backend_utils.get_status_from_cluster_name(cluster_name))
assert to_provision_config.resources is not None, (
'to_provision should not be None', to_provision_config)
# TODO(suquark): once we have sky on PYPI, we should directly
# install sky from PYPI.
local_wheel_path = wheel_utils.build_sky_wheel()
try:
provisioner = RetryingVmProvisioner(self.log_dir, self._dag,
self._optimize_target,
Expand Down Expand Up @@ -1299,7 +1308,8 @@ def provision(self,
# PENDING / RUNNING jobs for the real status, since we do not
# know the actual previous status of the cluster.
cmd = job_lib.JobLibCodeGen.update_status()
with console.status('[bold cyan]Preparing Job Queue'):
with backend_utils.safe_console_status(
'[bold cyan]Preparing Job Queue'):
returncode, _, stderr = self.run_on_head(
handle, cmd, require_outputs=True)
backend_utils.handle_returncode(returncode, cmd,
Expand Down Expand Up @@ -1343,8 +1353,8 @@ def set_autostop(self, handle: ResourceHandle,
code,
'Failed to set autostop',
stderr=stderr)
global_user_state.set_cluster_autostop(handle.cluster_name,
idle_minutes_to_autostop)
global_user_state.set_cluster_autostop_value(
handle.cluster_name, idle_minutes_to_autostop)

def sync_workdir(self, handle: ResourceHandle, workdir: Path) -> None:
# Even though provision() takes care of it, there may be cases where
Expand Down Expand Up @@ -1834,10 +1844,6 @@ def execute(
# Case: task_lib.Task(run, num_nodes=N)
assert task.num_nodes > 1, task.num_nodes
self._execute_task_n_nodes(handle, task, job_id, detach_run)
# # This should be called after the job is submitted, otherwise the
# # cluster will be stopped immediately after provisioned if the
# # idle_minutes_to_autostop==0.
# self.set_autostop(handle, task.idle_minutes_to_autostop)

def _execute_task_one_node(self, handle: ResourceHandle,
task: task_lib.Task, job_id: int,
Expand Down
Loading