Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AWS] Bring-your-own-VPC that disables public IPs for all SkyPilot nodes. #1512

Merged
merged 60 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
994e622
Minor: sky logs hint
concretevitamin Dec 12, 2022
dee5590
Minor: add a FIXME in authentication.py.
concretevitamin Dec 12, 2022
e1f0c1b
New module: sky_config
concretevitamin Dec 12, 2022
3f48372
Backend changes for SSH proxy command support.
concretevitamin Dec 12, 2022
3e3c66e
spot_launch(): sync up config; pop any proxy command.
concretevitamin Dec 12, 2022
7c8f943
AutostopEvent: monkey patch SSHOptions.
concretevitamin Dec 12, 2022
ff224ba
aws/config.py: support vpc_name and new use_internal_ips semantics.
concretevitamin Dec 12, 2022
6a529c3
Make failover catch our 'ERROR' messages from AWS node provider.
concretevitamin Dec 12, 2022
fc8dd03
.j2 changes.
concretevitamin Dec 12, 2022
f03f27b
Fix local launch hash for workers: must pop ssh_proxy_command.
concretevitamin Dec 12, 2022
cecc25d
Fix pylint.
concretevitamin Dec 12, 2022
0328d4d
typo
concretevitamin Dec 12, 2022
22a6481
smoke: make printf usage safe.
concretevitamin Dec 12, 2022
b467b03
Use SKYPILOT_ERROR as logging prefix.
concretevitamin Dec 13, 2022
2456775
Fix Resources.__repr__().
concretevitamin Dec 15, 2022
e496137
Avoid printing unnecessary termination errors for VPC-not-found.
concretevitamin Dec 15, 2022
7d80337
Fix a syntax error in codegen.
concretevitamin Dec 16, 2022
0b982cd
Read from SKYPILOT_CONFIG env var to permit dynamic generation.
concretevitamin Dec 16, 2022
2d9e205
Fix smoke test name.
concretevitamin Dec 16, 2022
85387ec
Fix another test name
concretevitamin Dec 16, 2022
ea26a89
Merge branch 'master' into jump
concretevitamin Dec 16, 2022
410ed6a
Revert "Read from SKYPILOT_CONFIG env var to permit dynamic generation."
concretevitamin Dec 17, 2022
07e0145
Fix head_node_launch_requested log line.
concretevitamin Dec 19, 2022
f4f285f
Merge branch 'master' into jump
concretevitamin Dec 19, 2022
0b6ee4f
Merge branch 'master' into jump
concretevitamin Dec 20, 2022
d620435
Optional: env var to read configs for spot, for better isolation.
concretevitamin Dec 20, 2022
f5850ea
Make query_head_ip_with_retries() robust to extra output.
concretevitamin Dec 20, 2022
2c27757
aws/config.py: reword comments
concretevitamin Dec 20, 2022
5cfd700
events.py: restart_only=True
concretevitamin Dec 20, 2022
7b136d5
Fix Resources.__repr__ to handle None fields.
concretevitamin Dec 20, 2022
af5eb26
Use SKYPILOT_ERROR_NO_NODES_LAUNCHED
concretevitamin Dec 20, 2022
af9e2b5
rstrip() for ssh config entries.
concretevitamin Dec 20, 2022
c73555b
authentication.py: reword comment
concretevitamin Dec 20, 2022
4c56607
pylint
concretevitamin Dec 20, 2022
f5d628b
Fix logging
concretevitamin Dec 20, 2022
de9280f
Try using reties for handle.{internal,external}_ips().
concretevitamin Dec 20, 2022
08bef71
Merge branch 'master' into jump
concretevitamin Jan 2, 2023
cf527bd
Address some easy comments
concretevitamin Jan 2, 2023
3bbeb4c
Typo
concretevitamin Jan 2, 2023
0fe08cb
backend_utils: fix worker IPs fetch; fix >80-col lines.
concretevitamin Jan 2, 2023
797c7d2
Fix test_minimal.
concretevitamin Jan 2, 2023
4ce221e
test_smoke: printf -> echo
concretevitamin Jan 3, 2023
18cd6da
Query IPs once.
concretevitamin Jan 3, 2023
ac0883d
Merge branch 'master' into jump
concretevitamin Jan 3, 2023
7252fd3
Drop ssh_proxy_command in launch hash when provisioning.
concretevitamin Jan 3, 2023
229f0a5
Typo
concretevitamin Jan 3, 2023
0155cc3
Typo
concretevitamin Jan 3, 2023
3e5e4b8
Add comment
concretevitamin Jan 3, 2023
4d89c38
sky/sky_config.py -> sky/skypilot_config.py
concretevitamin Jan 3, 2023
ff955e9
Add: sky/backends/monkey_patches/
concretevitamin Jan 3, 2023
594ce70
Remove logging
concretevitamin Jan 3, 2023
510fb6c
pylint
concretevitamin Jan 3, 2023
71a0bd7
MANIFEST should add monkey patch file
concretevitamin Jan 3, 2023
e4398ab
tests/test_smoke.py: fix extra \n
concretevitamin Jan 3, 2023
5f14af5
Fix monkey patching bug.
concretevitamin Jan 3, 2023
7b73a44
Remove AutostopEvent monkey patching.
concretevitamin Jan 3, 2023
db32513
_ray_launch_hash: pop ssh proxy command for head & workers
concretevitamin Jan 3, 2023
e82c4b4
Make another 'ray up' use patched launch hash fn.
concretevitamin Jan 4, 2023
649d805
Fix smoke tests.
concretevitamin Jan 4, 2023
0113e58
Fix smoke: K80 VMs could be non-ssh-able (and are more costly).
concretevitamin Jan 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion sky/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,12 @@ def get_or_generate_keys() -> Tuple[str, str]:
_save_key_pair(private_key_path, public_key_path, private_key,
public_key)
else:
assert os.path.exists(public_key_path)
# FIXME(skypilot): ran into failing this assert once, but forgot the
# reproduction (has private key; but has not generated public key).
# AssertionError: /home/ubuntu/.ssh/sky-key.pub
assert os.path.exists(public_key_path), (
'Private key found, but associated public key '
f'{public_key_path} does not exist.')
return private_key_path, public_key_path


Expand Down
278 changes: 182 additions & 96 deletions sky/backends/backend_utils.py

Large diffs are not rendered by default.

163 changes: 114 additions & 49 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from sky import resources as resources_lib
from sky import sky_logging
from sky import optimizer
from sky import sky_config
from sky import spot as spot_lib
from sky import task as task_lib
from sky.data import data_utils
Expand Down Expand Up @@ -447,7 +448,7 @@ def add_ray_task(self,
rank = job_ip_rank_map[ip]

if len(cluster_ips_to_node_id) == 1: # Single-node task on single-node cluter
name_str = '{task_name},' if {task_name!r} is not None else 'task,'
name_str = '{task_name},' if {task_name!r} != None else 'task,'
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
log_path = os.path.expanduser(os.path.join({log_dir!r}, 'run.log'))
else: # Single-node or multi-node task on multi-node cluster
idx_in_cluster = cluster_ips_to_node_id[ip]
Expand Down Expand Up @@ -678,7 +679,12 @@ def _update_blocklist_on_aws_error(self, region, zones, stdout, stderr):
errors = [
s.strip()
for s in stdout_splits + stderr_splits
if 'An error occurred' in s.strip()
# 'An error occurred': boto3 errors
# 'SKYPILOT_ERROR_NO_NODES_LAUNCHED': skypilot's changes to the AWS
# node provider; for errors prior to provisioning like VPC
# setup.
if 'An error occurred' in s or
'SKYPILOT_ERROR_NO_NODES_LAUNCHED: ' in s
]
# Need to handle boto3 printing error but its retry succeeded:
# error occurred (Unsupported) .. not supported in your requested
Expand All @@ -703,8 +709,8 @@ def _update_blocklist_on_aws_error(self, region, zones, stdout, stderr):
raise RuntimeError('Errors occurred during provision; '
'check logs above.')
if set(zones) == set(region.zones):
# The underlying ray autoscaler / boto3 will try all zones of a
# region at once.
# The underlying AWS NodeProvider will try all specified zones of a
# region. (Each boto3 request takes one zone.)
logger.warning(f'Got error(s) in all zones of {region.name}:')
else:
zones_str = ', '.join(z.name for z in zones)
Expand Down Expand Up @@ -772,36 +778,59 @@ def _update_blocklist_on_local_error(self, region, zones, stdout, stderr):
self._blocked_regions.add(region.name)

def _update_blocklist_on_error(self, cloud, region, zones, stdout,
stderr) -> None:
stderr) -> bool:
"""Handles cloud-specific errors and updates the block list.

This parses textual stdout/stderr because we don't directly use the
underlying clouds' SDKs. If we did that, we could catch proper
exceptions instead.

Returns:
definitely_no_nodes_launched: bool, True if definitely no nodes
launched (e.g., due to VPC errors we have never sent the provision
request), False otherwise.
"""
if stdout is None:
# Gang scheduling failure. Simply block the region.
# Gang scheduling failure (head node is definitely up, but some
# workers' provisioning failed). Simply block the region.
assert stderr is None, stderr
self._blocked_regions.add(region.name)
return

if isinstance(cloud, clouds.GCP):
return self._update_blocklist_on_gcp_error(region, zones, stdout,
stderr)

if isinstance(cloud, clouds.AWS):
return self._update_blocklist_on_aws_error(region, zones, stdout,
stderr)

if isinstance(cloud, clouds.Azure):
return self._update_blocklist_on_azure_error(
region, zones, stdout, stderr)
return False # definitely_no_nodes_launched

# TODO(zongheng): refactor into Cloud interface?
handlers = {
clouds.AWS: self._update_blocklist_on_aws_error,
clouds.Azure: self._update_blocklist_on_azure_error,
clouds.GCP: self._update_blocklist_on_gcp_error,
clouds.Local: self._update_blocklist_on_local_error,
}
cloud_type = type(cloud)
if cloud_type not in handlers:
raise NotImplementedError(
'Cloud {cloud} unknown, or has not added '
'support for parsing and handling provision failures.')
handler = handlers[cloud_type]
handler(region, zones, stdout, stderr)

if isinstance(cloud, clouds.Local):
return self._update_blocklist_on_local_error(
region, zones, stdout, stderr)
stdout_splits = stdout.split('\n')
stderr_splits = stderr.split('\n')
# Determining whether head node launch *may* have been requested based
# on outputs is tricky. We are conservative here by choosing an "early
# enough" output line in the following:
# https://github.com/ray-project/ray/blob/03b6bc7b5a305877501110ec04710a9c57011479/python/ray/autoscaler/_private/commands.py#L704-L737 # pylint: disable=line-too-long
# This is okay, because we mainly want to use the return value of this
# func to skip cleaning up never-launched clusters that encountered VPC
# errors; their launch should not have printed any such outputs.
head_node_launch_may_have_been_requested = any(
'Acquiring an up-to-date head node' in line
for line in stdout_splits + stderr_splits)
# If head node request has definitely not been sent (this happens when
# there are errors during node provider "bootstrapping", e.g.,
# VPC-not-found errors), then definitely no nodes are launched.
definitely_no_nodes_launched = (
not head_node_launch_may_have_been_requested)

assert False, f'Unknown cloud: {cloud}.'
return definitely_no_nodes_launched

def _yield_region_zones(self, to_provision: resources_lib.Resources,
cluster_name: str, cluster_exists: bool):
Expand Down Expand Up @@ -1126,47 +1155,65 @@ def _retry_region_zones(self,
f' existing VM{plural}.{style.RESET_ALL}')
return config_dict

# The cluster is not ready.
# The cluster is not ready. We must perform error recording and/or
# cleanup.

# If cluster was previously UP or STOPPED, stop it; otherwise
# terminate.
# FIXME(zongheng): terminating a potentially live cluster is
# scary. Say: users have an existing cluster that got into INIT, do
# sky launch, somehow failed, then we may be terminating it here.
need_terminate = not is_prev_cluster_healthy
terminate_or_stop = not is_prev_cluster_healthy
definitely_no_nodes_launched = False
if status == self.GangSchedulingStatus.HEAD_FAILED:
# ray up failed for the head node.
self._update_blocklist_on_error(to_provision.cloud, region,
zones, stdout, stderr)
definitely_no_nodes_launched = self._update_blocklist_on_error(
to_provision.cloud, region, zones, stdout, stderr)
else:
# gang scheduling failed.
assert status == self.GangSchedulingStatus.GANG_FAILED, status
# The stdout/stderr of ray up is not useful here, since
# head node is successfully provisioned.
self._update_blocklist_on_error(
definitely_no_nodes_launched = self._update_blocklist_on_error(
to_provision.cloud,
region,
# Ignored and block region:
zones=None,
stdout=None,
stderr=None)
# GANG_FAILED means head is up, workers failed.
assert definitely_no_nodes_launched is False, (
definitely_no_nodes_launched)

# Only log the errors for GANG_FAILED, since HEAD_FAILED may
# not have created any resources (it can happen however) and
# HEAD_FAILED can happen in "normal" failover cases.
logger.error('*** Failed provisioning the cluster. ***')
terminate_str = 'Terminating' if need_terminate else 'Stopping'
terminate_str = ('Terminating'
if terminate_or_stop else 'Stopping')
logger.error(f'*** {terminate_str} the failed cluster. ***')

# There may exists partial nodes (e.g., head node) so we must
# If these conditions hold, it *should* be safe to skip the cleanup
# action.
#
# We want to skip mainly for custom VPC: if users encountered "No
# VPC with name 'xxx' is found in <region>.", then going ahead to
# down the non-existent cluster will itself error out with the same
# error message. This was found to be confusing. In that case we
# skip termination.
skip_cleanup = not cluster_exists and definitely_no_nodes_launched
if skip_cleanup:
continue

# There may exist partial nodes (e.g., head node) so we must
# terminate or stop before moving on to other regions.
#
# NOTE: even HEAD_FAILED could've left a live head node there, so
# we must terminate/stop here too. E.g., node is up, and ray
# NOTE: even HEAD_FAILED could've left a live head node there,
# so we must terminate/stop here too. E.g., node is up, and ray
# autoscaler proceeds to setup commands, which may fail:
# ERR updater.py:138 -- New status: update-failed
CloudVmRayBackend().teardown_no_lock(handle,
terminate=need_terminate)
terminate=terminate_or_stop)

if to_provision.zone is not None:
message = (
Expand Down Expand Up @@ -1372,7 +1419,7 @@ def need_ray_up(
assert ray_up_return_value is not None
returncode, stdout, stderr = ray_up_return_value

logger.debug(f'Ray up takes {time.time() - start} seconds with '
logger.debug(f'`ray up` takes {time.time() - start:.1f} seconds with '
f'{retry_cnt} retries.')
if returncode != 0:
return self.GangSchedulingStatus.HEAD_FAILED, stdout, stderr, None
Expand Down Expand Up @@ -1734,13 +1781,25 @@ def _update_stable_cluster_ips(self,
# IPs since the cached IPs are up-to-date.
return

cluster_internal_ips = backend_utils.get_node_ips(
self.cluster_yaml,
self.launched_nodes,
handle=self,
head_ip_max_attempts=max_attempts,
worker_ip_max_attempts=max_attempts,
get_internal_ips=True)
is_cluster_aws = (self.launched_resources is not None and
isinstance(self.launched_resources.cloud,
clouds.AWS))
if is_cluster_aws and sky_config.get_nested(
keys=('aws', 'use_internal_ips'), default_value=False):
# Optimization: if we know use_internal_ips is True (currently
# only exposed for AWS), then our AWS NodeProvider is
# guaranteed to pick subnets that will not assign public IPs,
# thus the first list of IPs returned above are already private
# IPs. So skip the second query.
cluster_internal_ips = list(cluster_external_ips)
else:
cluster_internal_ips = backend_utils.get_node_ips(
self.cluster_yaml,
self.launched_nodes,
handle=self,
head_ip_max_attempts=max_attempts,
worker_ip_max_attempts=max_attempts,
get_internal_ips=True)

assert len(cluster_external_ips) == len(cluster_internal_ips), (
f'Cluster {self.cluster_name!r}:'
Expand All @@ -1757,7 +1816,7 @@ def _update_stable_cluster_ips(self,
self.stable_internal_external_ips = stable_internal_external_ips

def internal_ips(self,
max_attempts: int = 1,
max_attempts: int = _FETCH_IP_MAX_ATTEMPTS,
use_cached_ips: bool = True) -> Optional[List[str]]:
if not use_cached_ips:
self._update_stable_cluster_ips(max_attempts=max_attempts)
Expand All @@ -1766,7 +1825,7 @@ def internal_ips(self,
return None

def external_ips(self,
max_attempts: int = 1,
max_attempts: int = _FETCH_IP_MAX_ATTEMPTS,
use_cached_ips: bool = True) -> Optional[List[str]]:
if not use_cached_ips:
self._update_stable_cluster_ips(max_attempts=max_attempts)
Expand Down Expand Up @@ -1979,7 +2038,7 @@ def _provision(self,
'Failed to provision all possible launchable '
'resources.'
f' Relax the task\'s resource requirements: '
f'{task.num_nodes}x {task.resources}')
f'{task.num_nodes}x {list(task.resources)[0]}')
if retry_until_up:
logger.error(error_message)
# Sleep and retry.
Expand Down Expand Up @@ -2803,11 +2862,17 @@ def teardown_no_lock(self,
logger.warning(
_TEARDOWN_PURGE_WARNING.format(
reason='stopping/terminating cluster nodes'))
# This error returns when we call "gcloud delete" with an empty VM
# list where no instance exists. Safe to ignore it and do cleanup
# locally.
# TODO(wei-lin): refactor error handling mechanism.
elif 'TPU must be specified.' not in stderr:
# 'TPU must be specified.': This error returns when we call "gcloud
# delete" with an empty VM list where no instance exists. Safe to
# ignore it and do cleanup locally. TODO(wei-lin): refactor error
# handling mechanism.
#
# 'SKYPILOT_ERROR_NO_NODES_LAUNCHED': this indicates nodes are
# never launched and the errors are related to pre-launch
# configurations (such as VPC not found). So it's safe & good UX
# to not print a failure message.
elif ('TPU must be specified.' not in stderr and
'SKYPILOT_ERROR_NO_NODES_LAUNCHED: ' not in stderr):
logger.error(
_TEARDOWN_FAILURE_MESSAGE.format(
extra_reason='',
Expand Down
4 changes: 2 additions & 2 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1558,8 +1558,8 @@ def logs(

if len(job_ids) > 1 and not sync_down:
raise click.UsageError(
f'Cannot stream logs of multiple jobs {job_ids}. '
'Set --sync-down to download them.')
f'Cannot stream logs of multiple jobs (IDs: {", ".join(job_ids)}).'
'\nPass -s/--sync-down to download the logs instead.')

job_ids = None if not job_ids else job_ids

Expand Down
54 changes: 42 additions & 12 deletions sky/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from sky import exceptions
from sky import global_user_state
from sky import optimizer
from sky import sky_config
from sky import sky_logging
from sky import spot
from sky import task as task_lib
Expand Down Expand Up @@ -512,19 +513,48 @@ def spot_launch(
common_utils.dump_yaml(f.name, task_config)

controller_name = spot.SPOT_CONTROLLER_NAME
vars_to_fill = {
'remote_user_yaml_prefix': spot.SPOT_TASK_YAML_PREFIX,
'user_yaml_path': f.name,
'user_config_path': None,
'spot_controller': controller_name,
'cluster_name': name,
'gcloud_installation_commands': gcp.GCLOUD_INSTALLATION_COMMAND,
'is_dev': env_options.Options.IS_DEVELOPER.get(),
'disable_logging': env_options.Options.DISABLE_LOGGING.get(),
'logging_user_hash': common_utils.get_user_hash(),
'retry_until_up': retry_until_up,
'user': os.environ.get('USER', None),
}
if sky_config.loaded():
# Look up the contents of the already loaded configs via the
# 'sky_config' module. Don't simply read the on-disk file as it may
# have changed since this process started.
#
# Pop any proxy command, because the controller would've been
# launched behind the proxy, and in general any nodes we launch may
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
# not have or need the proxy setup. (If the controller needs to
# launch spot clusters in another region/VPC, the user should
# properly set up VPC peering, which will allow the
# cross-region/VPC communication. The proxy command is orthogonal
# to this scenario.)
#
# This file will be uploaded to the controller node and will be
# used throughout the spot job's recovery attempts (i.e., if it
# relaunches due to preemption, we make sure the same config is
# used).
config_dict = sky_config.pop_nested(('auth', 'ssh_proxy_command'))
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmpfile:
common_utils.dump_yaml(tmpfile.name, config_dict)
vars_to_fill.update({
'user_config_path': tmpfile.name,
'env_var_skypilot_config':
sky_config.ENV_VAR_SKYPILOT_CONFIG,
})

yaml_path = backend_utils.fill_template(
spot.SPOT_CONTROLLER_TEMPLATE, {
'remote_user_yaml_prefix': spot.SPOT_TASK_YAML_PREFIX,
'user_yaml_path': f.name,
'spot_controller': controller_name,
'cluster_name': name,
'gcloud_installation_commands': gcp.GCLOUD_INSTALLATION_COMMAND,
'is_dev': env_options.Options.IS_DEVELOPER.get(),
'disable_logging': env_options.Options.DISABLE_LOGGING.get(),
'logging_user_hash': common_utils.get_user_hash(),
'retry_until_up': retry_until_up,
'user': os.environ.get('USER', None),
},
spot.SPOT_CONTROLLER_TEMPLATE,
vars_to_fill,
output_prefix=spot.SPOT_CONTROLLER_YAML_PREFIX)
controller_task = task_lib.Task.from_yaml(yaml_path)
controller_task.spot_task = task
Expand Down