Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AWS] Bring-your-own-VPC that disables public IPs for all SkyPilot nodes. #1512

Merged
merged 60 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
994e622
Minor: sky logs hint
concretevitamin Dec 12, 2022
dee5590
Minor: add a FIXME in authentication.py.
concretevitamin Dec 12, 2022
e1f0c1b
New module: sky_config
concretevitamin Dec 12, 2022
3f48372
Backend changes for SSH proxy command support.
concretevitamin Dec 12, 2022
3e3c66e
spot_launch(): sync up config; pop any proxy command.
concretevitamin Dec 12, 2022
7c8f943
AutostopEvent: monkey patch SSHOptions.
concretevitamin Dec 12, 2022
ff224ba
aws/config.py: support vpc_name and new use_internal_ips semantics.
concretevitamin Dec 12, 2022
6a529c3
Make failover catch our 'ERROR' messages from AWS node provider.
concretevitamin Dec 12, 2022
fc8dd03
.j2 changes.
concretevitamin Dec 12, 2022
f03f27b
Fix local launch hash for workers: must pop ssh_proxy_command.
concretevitamin Dec 12, 2022
cecc25d
Fix pylint.
concretevitamin Dec 12, 2022
0328d4d
typo
concretevitamin Dec 12, 2022
22a6481
smoke: make printf usage safe.
concretevitamin Dec 12, 2022
b467b03
Use SKYPILOT_ERROR as logging prefix.
concretevitamin Dec 13, 2022
2456775
Fix Resources.__repr__().
concretevitamin Dec 15, 2022
e496137
Avoid printing unnecessary termination errors for VPC-not-found.
concretevitamin Dec 15, 2022
7d80337
Fix a syntax error in codegen.
concretevitamin Dec 16, 2022
0b982cd
Read from SKYPILOT_CONFIG env var to permit dynamic generation.
concretevitamin Dec 16, 2022
2d9e205
Fix smoke test name.
concretevitamin Dec 16, 2022
85387ec
Fix another test name
concretevitamin Dec 16, 2022
ea26a89
Merge branch 'master' into jump
concretevitamin Dec 16, 2022
410ed6a
Revert "Read from SKYPILOT_CONFIG env var to permit dynamic generation."
concretevitamin Dec 17, 2022
07e0145
Fix head_node_launch_requested log line.
concretevitamin Dec 19, 2022
f4f285f
Merge branch 'master' into jump
concretevitamin Dec 19, 2022
0b6ee4f
Merge branch 'master' into jump
concretevitamin Dec 20, 2022
d620435
Optional: env var to read configs for spot, for better isolation.
concretevitamin Dec 20, 2022
f5850ea
Make query_head_ip_with_retries() robust to extra output.
concretevitamin Dec 20, 2022
2c27757
aws/config.py: reword comments
concretevitamin Dec 20, 2022
5cfd700
events.py: restart_only=True
concretevitamin Dec 20, 2022
7b136d5
Fix Resources.__repr__ to handle None fields.
concretevitamin Dec 20, 2022
af5eb26
Use SKYPILOT_ERROR_NO_NODES_LAUNCHED
concretevitamin Dec 20, 2022
af9e2b5
rstrip() for ssh config entries.
concretevitamin Dec 20, 2022
c73555b
authentication.py: reword comment
concretevitamin Dec 20, 2022
4c56607
pylint
concretevitamin Dec 20, 2022
f5d628b
Fix logging
concretevitamin Dec 20, 2022
de9280f
Try using reties for handle.{internal,external}_ips().
concretevitamin Dec 20, 2022
08bef71
Merge branch 'master' into jump
concretevitamin Jan 2, 2023
cf527bd
Address some easy comments
concretevitamin Jan 2, 2023
3bbeb4c
Typo
concretevitamin Jan 2, 2023
0fe08cb
backend_utils: fix worker IPs fetch; fix >80-col lines.
concretevitamin Jan 2, 2023
797c7d2
Fix test_minimal.
concretevitamin Jan 2, 2023
4ce221e
test_smoke: printf -> echo
concretevitamin Jan 3, 2023
18cd6da
Query IPs once.
concretevitamin Jan 3, 2023
ac0883d
Merge branch 'master' into jump
concretevitamin Jan 3, 2023
7252fd3
Drop ssh_proxy_command in launch hash when provisioning.
concretevitamin Jan 3, 2023
229f0a5
Typo
concretevitamin Jan 3, 2023
0155cc3
Typo
concretevitamin Jan 3, 2023
3e5e4b8
Add comment
concretevitamin Jan 3, 2023
4d89c38
sky/sky_config.py -> sky/skypilot_config.py
concretevitamin Jan 3, 2023
ff955e9
Add: sky/backends/monkey_patches/
concretevitamin Jan 3, 2023
594ce70
Remove logging
concretevitamin Jan 3, 2023
510fb6c
pylint
concretevitamin Jan 3, 2023
71a0bd7
MANIFEST should add monkey patch file
concretevitamin Jan 3, 2023
e4398ab
tests/test_smoke.py: fix extra \n
concretevitamin Jan 3, 2023
5f14af5
Fix monkey patching bug.
concretevitamin Jan 3, 2023
7b73a44
Remove AutostopEvent monkey patching.
concretevitamin Jan 3, 2023
db32513
_ray_launch_hash: pop ssh proxy command for head & workers
concretevitamin Jan 3, 2023
e82c4b4
Make another 'ray up' use patched launch hash fn.
concretevitamin Jan 4, 2023
649d805
Fix smoke tests.
concretevitamin Jan 4, 2023
0113e58
Fix smoke: K80 VMs could be non-ssh-able (and are more costly).
concretevitamin Jan 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
165 changes: 98 additions & 67 deletions sky/backends/backend_utils.py

Large diffs are not rendered by default.

106 changes: 75 additions & 31 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import re
import signal
import subprocess
import sys
import tempfile
import textwrap
import time
Expand All @@ -28,6 +29,7 @@
from sky import resources as resources_lib
from sky import sky_logging
from sky import optimizer
from sky import skypilot_config
from sky import spot as spot_lib
from sky import task as task_lib
from sky.data import data_utils
Expand Down Expand Up @@ -98,6 +100,13 @@

_JOB_ID_PATTERN = re.compile(r'Job ID: ([0-9]+)')

# Path to the monkey-patched ray up script.
# We don't do import then __file__ because that script needs to be filled in
# (so import would fail).
_RAY_UP_WITH_MONKEY_PATCHED_HASH_LAUNCH_CONF_PATH = (
pathlib.Path(sky.__file__).resolve().parent / 'backends' /
'monkey_patches' / 'ray_up_with_monkey_patched_hash_launch_conf.py')


def _get_cluster_config_template(cloud):
cloud_to_template = {
Expand Down Expand Up @@ -707,8 +716,13 @@ def _update_blocklist_on_aws_error(self, region, zones, stdout, stderr):
with ux_utils.print_exception_no_traceback():
raise RuntimeError('Errors occurred during provision; '
'check logs above.')
# Underlying ray autoscaler will try all specified zones of a region.
logger.warning(f'Got error(s) in all zones of {region.name}:')
if set(zones) == set(region.zones):
# The underlying AWS NodeProvider will try all specified zones of a
# region. (Each boto3 request takes one zone.)
logger.warning(f'Got error(s) in all zones of {region.name}:')
else:
zones_str = ', '.join(z.name for z in zones)
logger.warning(f'Got error(s) in {zones_str}:')
messages = '\n\t'.join(errors)
logger.warning(f'{style.DIM}\t{messages}{style.RESET_ALL}')
self._blocked_regions.add(region.name)
Expand Down Expand Up @@ -816,7 +830,7 @@ def _update_blocklist_on_error(self, cloud, region, zones, stdout,
# func to skip cleaning up never-launched clusters that encountered VPC
# errors; their launch should not have printed any such outputs.
head_node_launch_may_have_been_requested = any(
line.startswith('Acquiring an up-to-date head node')
'Acquiring an up-to-date head node' in line
for line in stdout_splits + stderr_splits)
# If head node request has definitely not been sent (this happens when
# there are errors during node provider "bootstrapping", e.g.,
Expand Down Expand Up @@ -1284,26 +1298,44 @@ def _gang_schedule_ray_up(
# FIXME: refactor code path to remove use of stream_logs
del stream_logs

style = colorama.Style

def ray_up():
# Runs `ray up --no-restart` with our monkey-patched launch hash
# calculation. See the monkey patch file for why.
#
# The only other `ray up` in this file inside
# _ensure_cluster_ray_started() doesn't need this monkey patch (to
# test: launch a cluster; log in and `ray stop`; then launch
# again).
#
# NOTE: --no-restart solves the following bug. Without it, if 'ray
# up' (sky launch) twice on a cluster with >1 node, the worker node
# gets disconnected/killed by ray autoscaler; the whole task will
# just freeze. (Doesn't affect 1-node clusters.) With this flag,
# ray processes no longer restart and this bug doesn't show.
# Downside is existing tasks on the cluster will keep running
# (which may be ok with the semantics of 'sky launch' twice).
# Tracked in https://github.com/ray-project/ray/issues/20402.
# Ref: https://github.com/ray-project/ray/blob/releases/2.2.0/python/ray/autoscaler/sdk/sdk.py#L16-L49 # pylint: disable=line-too-long
with open(_RAY_UP_WITH_MONKEY_PATCHED_HASH_LAUNCH_CONF_PATH,
'r') as f:
ray_up_no_restart_script = f.read().format(
ray_yaml_path=repr(cluster_config_file),
kwargs={'no_restart': True})
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved

with tempfile.NamedTemporaryFile('w',
prefix='skypilot_ray_up_',
suffix='.py',
delete=False) as f:
f.write(ray_up_no_restart_script)
logger.debug(f'`ray up` script: {f.name}')

# Redirect stdout/err to the file and streaming (if stream_logs).
# With stdout/err redirected, 'ray up' will have no color and
# different order from directly running in the console. The
# `--log-style` and `--log-color` flags do not work. To reproduce,
# `ray up --log-style pretty --log-color true | tee tmp.out`.

returncode, stdout, stderr = log_lib.run_with_log(
# NOTE: --no-restart solves the following bug. Without it, if
# 'ray up' (sky launch) twice on a cluster with >1 node, the
# worker node gets disconnected/killed by ray autoscaler; the
# whole task will just freeze. (Doesn't affect 1-node
# clusters.) With this flag, ray processes no longer restart
# and this bug doesn't show. Downside is existing tasks on the
# cluster will keep running (which may be ok with the semantics
# of 'sky launch' twice).
# Tracked in https://github.com/ray-project/ray/issues/20402.
['ray', 'up', '-y', '--no-restart', cluster_config_file],
[sys.executable, f.name],
log_abs_path,
stream_logs=False,
start_streaming_at='Shared connection to',
Expand All @@ -1322,21 +1354,20 @@ def ray_up():
require_outputs=True,
# Disable stdin to avoid ray outputs mess up the terminal with
# misaligned output when multithreading/multiprocessing are used
# Refer to: https://github.com/ray-project/ray/blob/d462172be7c5779abf37609aed08af112a533e1e/python/ray/autoscaler/_private/subprocess_output_util.py#L264 # pylint: disable=line-too-long
# Refer to: https://github.com/ray-project/ray/blob/d462172be7c5779abf37609aed08af112a533e1e/python/ray/autoscaler/_private/subprocess_output_util.py#L264 # pylint: disable=line-too-long
stdin=subprocess.DEVNULL)
return returncode, stdout, stderr

region_name = logging_info['region_name']
zone_str = logging_info['zone_str']

style = colorama.Style
if isinstance(to_provision_cloud, clouds.Local):
cluster_name = logging_info['cluster_name']
logger.info(f'{colorama.Style.BRIGHT}Launching on local cluster '
logger.info(f'{style.BRIGHT}Launching on local cluster '
f'{cluster_name!r}.')
else:
logger.info(
f'{colorama.Style.BRIGHT}Launching on {to_provision_cloud} '
f'{region_name}{colorama.Style.RESET_ALL} ({zone_str})')
logger.info(f'{style.BRIGHT}Launching on {to_provision_cloud} '
f'{region_name}{style.RESET_ALL} ({zone_str})')
start = time.time()

# Edge case: /tmp/ray does not exist, so autoscaler can't create/store
Expand Down Expand Up @@ -1413,7 +1444,7 @@ def need_ray_up(
assert ray_up_return_value is not None
returncode, stdout, stderr = ray_up_return_value

logger.debug(f'Ray up takes {time.time() - start} seconds with '
logger.debug(f'`ray up` takes {time.time() - start:.1f} seconds with '
f'{retry_cnt} retries.')
if returncode != 0:
return self.GangSchedulingStatus.HEAD_FAILED, stdout, stderr, None
Expand Down Expand Up @@ -1775,13 +1806,25 @@ def _update_stable_cluster_ips(self,
# IPs since the cached IPs are up-to-date.
return

cluster_internal_ips = backend_utils.get_node_ips(
self.cluster_yaml,
self.launched_nodes,
handle=self,
head_ip_max_attempts=max_attempts,
worker_ip_max_attempts=max_attempts,
get_internal_ips=True)
is_cluster_aws = (self.launched_resources is not None and
isinstance(self.launched_resources.cloud,
clouds.AWS))
if is_cluster_aws and skypilot_config.get_nested(
keys=('aws', 'use_internal_ips'), default_value=False):
# Optimization: if we know use_internal_ips is True (currently
# only exposed for AWS), then our AWS NodeProvider is
# guaranteed to pick subnets that will not assign public IPs,
# thus the first list of IPs returned above are already private
# IPs. So skip the second query.
cluster_internal_ips = list(cluster_external_ips)
else:
cluster_internal_ips = backend_utils.get_node_ips(
self.cluster_yaml,
self.launched_nodes,
handle=self,
head_ip_max_attempts=max_attempts,
worker_ip_max_attempts=max_attempts,
get_internal_ips=True)

assert len(cluster_external_ips) == len(cluster_internal_ips), (
f'Cluster {self.cluster_name!r}:'
Expand Down Expand Up @@ -2754,7 +2797,8 @@ def teardown_no_lock(self,
# is safe to skip and return True.
ux_utils.console_newline()
logger.warning(
f'Cluster {handle.cluster_name!r} is already terminated. Skip.')
f'Cluster {handle.cluster_name!r} is already terminated. '
'Skipped.')
return True
log_path = os.path.join(os.path.expanduser(self.log_dir),
'teardown.log')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Runs `ray up` while not using ssh_proxy_command in launch hash.

This monkey patches the hash_launch_conf() function inside Ray autoscaler to
exclude any ssh_proxy_command in hash calculation.

Reasons:
- In the future, we want to support changing the ssh_proxy_command field for
an existing cluster. If the launch hash included this field, then this would
mean upon such a change a new cluster would've been launched, causing
leakage.
- With our patch, ssh_proxy_command will be excluded from the launch hash when
a cluster is first created. This then makes it possible for us to support
changing the proxy command in the future.
"""
import hashlib
import json
import os

from ray.autoscaler._private import util as ray_autoscaler_private_util
from ray.autoscaler import sdk


# Ref: https://github.com/ray-project/ray/blob/releases/2.2.0/python/ray/autoscaler/_private/util.py#L392-L404
def monkey_patch_hash_launch_conf(node_conf, auth):
hasher = hashlib.sha1()
# For hashing, we replace the path to the key with the key
# itself. This is to make sure the hashes are the same even if keys
# live at different locations on different machines.
full_auth = auth.copy()
full_auth.pop('ssh_proxy_command', None) # NOTE: skypilot changes.
for key_type in ['ssh_private_key', 'ssh_public_key']:
if key_type in auth:
with open(os.path.expanduser(auth[key_type])) as key:
full_auth[key_type] = key.read()
hasher.update(
json.dumps([node_conf, full_auth], sort_keys=True).encode('utf-8'))
print('Hashed without ssh_proxy_command:', hasher.hexdigest())
print(node_conf)
print(full_auth)
return hasher.hexdigest()


ray_autoscaler_private_util.hash_launch_conf = monkey_patch_hash_launch_conf
sdk.create_or_update_cluster({ray_yaml_path}, **{kwargs})
30 changes: 23 additions & 7 deletions sky/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from sky import exceptions
from sky import global_user_state
from sky import optimizer
from sky import sky_config
from sky import skypilot_config
from sky import sky_logging
from sky import spot
from sky import task as task_lib
Expand Down Expand Up @@ -526,26 +526,42 @@ def spot_launch(
'retry_until_up': retry_until_up,
'user': os.environ.get('USER', None),
}
if sky_config.loaded():
if skypilot_config.loaded():
# Look up the contents of the already loaded configs via the
# 'sky_config' module. Don't simply read the on-disk file as it may
# have changed since this process started.
# 'skypilot_config' module. Don't simply read the on-disk file as
# it may have changed since this process started.
#
# Pop any proxy command, because the controller would've been
# launched behind the proxy, and in general any nodes we launch may
# not have or need the proxy setup.
# not have or need the proxy setup. (If the controller needs to
# launch spot clusters in another region/VPC, the user should
# properly set up VPC peering, which will allow the
# cross-region/VPC communication. The proxy command is orthogonal
# to this scenario.)
#
# This file will be uploaded to the controller node and will be
# used throughout the spot job's recovery attempts (i.e., if it
# relaunches due to preemption, we make sure the same config is
# used).
config_dict = sky_config.pop_nested(('auth', 'ssh_proxy_command'))
#
# NOTE: suppose that we have a controller in old VPC, then user
# changes 'vpc_name' in the config and does a 'spot launch'. In
# general, the old controller may not successfully launch the job
# in the new VPC. This happens if the two VPCs don’t have peering
# set up. Like other places in the code, we assume properly setting
# up networking is user's responsibilities.
# TODO(zongheng): consider adding a basic check that checks
# controller VPC (or name) == the spot job's VPC (or name). It may
# not be a sufficient check (as it's always possible that peering
# is not set up), but it may catch some obvious errors.
config_dict = skypilot_config.pop_nested(
('auth', 'ssh_proxy_command'))
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmpfile:
common_utils.dump_yaml(tmpfile.name, config_dict)
vars_to_fill.update({
'user_config_path': tmpfile.name,
'env_var_skypilot_config':
sky_config.ENV_VAR_SKYPILOT_CONFIG,
skypilot_config.ENV_VAR_SKYPILOT_CONFIG,
})

yaml_path = backend_utils.fill_template(
Expand Down
1 change: 1 addition & 0 deletions sky/setup_files/MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
include sky/backends/monkey_patches/*.py
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
include sky/skylet/*.sh
include sky/skylet/providers/aws/*
include sky/skylet/providers/aws/cloudwatch/*
Expand Down
9 changes: 9 additions & 0 deletions sky/skylet/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,15 @@ def _ray_up_to_reset_upscaling_params():
# 2.2.0: https://github.com/ray-project/ray/blame/releases/2.2.0/python/ray/autoscaler/_private/command_runner.py#L114-L143 # pylint: disable=line-too-long
# which has not changed for 3 years, so it covers all local Ray
# versions we support inside setup.py.
#
# TODO(zongheng): it seems we could skip monkey patching this, if
# we monkey patch hash_launch_conf() to drop ssh_proxy_command as
# is done in the provision code path. I tried it and could not get
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
# it to work: (1) no outputs are shown (tried both subprocess.run()
# and log_lib.run_with_log()) (2) this first `ray up
# --restart-only` somehow calculates a different launch hash than
# the one used when the head node was created. To clean up in the
# future.
def monkey_patch_init(self, ssh_key, control_path=None, **kwargs):
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
"""SSHOptions.__init__(), but pops 'ProxyCommand'."""
self.ssh_key = ssh_key
Expand Down
8 changes: 4 additions & 4 deletions sky/skylet/providers/aws/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,10 @@ def bootstrap_aws(config, skypilot_iam_role: bool = False):

# The head node needs to have an IAM role that allows it to create further
# EC2 instances.
# This adds {'IamInstanceProfile': {'Arn':
# 'arn:aws:iam::xxxxxx:instance-profile/ray-autoscaler-v1'}} to
# config['head_node'].
config = _configure_iam_role(config)
#
# If skypilot_iam_role is True, we use our own IAM role for both head and
# workers.
config = _configure_iam_role(config, skypilot_iam_role=skypilot_iam_role)

# Configure SSH access, using an existing key pair if possible.
config = _configure_key_pair(config)
Expand Down