Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sustainability POC Add Optimization #3515

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
75 changes: 75 additions & 0 deletions examples/sustainability/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Sustainability
The algorithm used to generate carbon footprint estimates - pulled from https://github.com/GreenAlgorithms/green-algorithms-tool. Additional details can be found on the hosted [Green Algorithms Calculator](http://calculator.green-algorithms.org/).

## Getting Started
The sustainability enhancement is tracked in the [sustainibility branch](https://github.com/skypilot-org/skypilot/tree/sustainability_poc). Here are the steps for getting started.

### Installation
Setup the environment and checkout the sustainability branch of SkyPilot
```
# Recommended: use a new conda env to avoid package conflicts.
# SkyPilot requires 3.7 <= python <= 3.10.
conda create -y -n sky python=3.10
conda activate sky

# Clone the skypilot project in github and checkout the sustainability branch
git clone https://github.com/skypilot-org/skypilot.git
cd skypilot
git checkout -t origin/sustainability_poc

# Choose your cloud:
pip install -e ".[all]"

# Install Google Cloud SDK (Required)
conda install -c conda-forge google-cloud-sdk
```
### Verify Cloud Access
After setting up the `sustainability_poc` branch environment for SkyPilot follow these instructions to [verify cloud access](https://skypilot.readthedocs.io/en/latest/getting-started/installation.html#verifying-cloud-access).

## SkyPilot in 1 Minute with Carbon Footprint Estimates

Paste the following into a file `my_task.yaml`:

```yaml
resources:
accelerators: V100:1 # 1x NVIDIA V100 GPU

num_nodes: 1 # Number of VMs to launch

# Working directory (optional) containing the project codebase.
# Its contents are synced to ~/sky_workdir/ on the cluster.
workdir: ~/torch_examples

# Commands to be run before executing the job.
# Typical use: pip install -r requirements.txt, git clone, etc.
setup: |
pip install torch torchvision

# Commands to run as a job.
# Typical use: launch the main program.
run: |
cd mnist
python main.py --epochs 1
```

Prepare the workdir by cloning:
```bash
git clone https://github.com/pytorch/examples.git ~/torch_examples
```

Launch with `sky launch` (note: [access to GPU instances](https://skypilot.readthedocs.io/en/latest/cloud-setup/quota.html) is needed for this example):
```bash
sky launch my_task.yaml --optimize-cost-target=carbon_footprint
```

SkyPilot then performs the heavy-lifting for you, including:
1. Find the lowest priced VM instance type across different clouds
2. Displays the Carbon Footprint estimate (Note the `CARBON FOOTPRINT` column) for each VM instance type.
3. Provision the VM, with auto-failover if the cloud returned capacity errors
4. Sync the local `workdir` to the VM
5. Run the task's `setup` commands to prepare the VM for running the task
6. Run the task's `run` commands

<p align="center">
<img src="./images/carbon-footprint.png" alt="Carbon Footprint Demo"/>
</p>
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions examples/sustainability/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
torch
torchvision
wandb
absl-py
8 changes: 5 additions & 3 deletions sky/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,16 @@ def set_proxy_env_var(proxy_var: str, urllib_var: Optional[str]):
from sky.data import StoreType
from sky.execution import exec # pylint: disable=redefined-builtin
from sky.execution import launch
from sky.optimizer import OptimizeObjectiveTarget
from sky.optimizer import Optimizer
from sky.optimizer import OptimizeTarget
from sky.resources import Resources
# TODO (zhwu): These imports are for backward compatibility, and spot APIs
# should be called with `sky.spot.xxx` instead. Remove in release 0.8.0
from sky.jobs.core import spot_cancel
from sky.jobs.core import spot_launch
from sky.jobs.core import spot_queue
from sky.jobs.core import spot_tail_logs
from sky.optimizer import Optimizer
from sky.optimizer import OptimizeTarget
from sky.resources import Resources
from sky.skylet.job_lib import JobStatus
from sky.status_lib import ClusterStatus
from sky.task import Task
Expand Down Expand Up @@ -148,6 +149,7 @@ def set_proxy_env_var(proxy_var: str, urllib_var: Optional[str]):
'Fluidstack',
'Optimizer',
'OptimizeTarget',
'OptimizeObjectiveTarget',
'backends',
'benchmark',
'list_accelerators',
Expand Down
101 changes: 62 additions & 39 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ def _merge_env_vars(env_dict: Optional[Dict[str, str]],
return list(env_dict.items())


_TASK_OPTIMIZE_OPTIONS = [
click.option('--optimize-cost-target',
default=sky.OptimizeObjectiveTarget.PRICE.value,
type=click.Choice(
sky.OptimizeObjectiveTarget.supported_targets(),
case_sensitive=False),
required=False,
help=sky.OptimizeObjectiveTarget.cli_help_message()),
]

_TASK_OPTIONS = [
click.option(
'--workdir',
Expand Down Expand Up @@ -256,7 +266,7 @@ def _merge_env_vars(env_dict: Optional[Dict[str, str]],

3. ``--env MY_ENV3``: set ``$MY_ENV3`` on the cluster to be the
same value of ``$MY_ENV3`` in the local environment.""",
)
),
]
_TASK_OPTIONS_WITH_NAME = [
click.option('--name',
Expand All @@ -265,7 +275,7 @@ def _merge_env_vars(env_dict: Optional[Dict[str, str]],
type=str,
help=('Task name. Overrides the "name" '
'config in the YAML if both are supplied.')),
] + _TASK_OPTIONS
] + _TASK_OPTIONS + _TASK_OPTIMIZE_OPTIONS
_EXTRA_RESOURCES_OPTIONS = [
click.option(
'--gpus',
Expand Down Expand Up @@ -473,6 +483,7 @@ def _parse_override_params(
image_id: Optional[str] = None,
disk_size: Optional[int] = None,
disk_tier: Optional[str] = None,
optimize_cost_target: Optional[str] = None,
ports: Optional[Tuple[str]] = None) -> Dict[str, Any]:
"""Parses the override parameters into a dictionary."""
override_params: Dict[str, Any] = {}
Expand Down Expand Up @@ -525,6 +536,8 @@ def _parse_override_params(
override_params['disk_tier'] = None
else:
override_params['disk_tier'] = disk_tier
if optimize_cost_target is not None:
override_params['optimize_cost_target'] = optimize_cost_target
if ports:
override_params['ports'] = ports
return override_params
Expand All @@ -544,6 +557,7 @@ def _launch_with_confirm(
retry_until_up: bool = False,
no_setup: bool = False,
clone_disk_from: Optional[str] = None,
optimize_cost_target: Optional[str] = None,
):
"""Launch a cluster with a Task."""
if cluster is None:
Expand All @@ -569,7 +583,13 @@ def _launch_with_confirm(
# only print the error message without the error type.
click.secho(e, fg='yellow')
sys.exit(1)
dag = sky.optimize(dag)

# Set the optimize objective
if optimize_cost_target == sky.OptimizeObjectiveTarget.CARBON_FOOTPRINT.value:
optimize_cost_target_enum = sky.OptimizeObjectiveTarget.CARBON_FOOTPRINT
else:
optimize_cost_target_enum = sky.OptimizeObjectiveTarget.PRICE
dag = sky.optimize(dag, objective_target=optimize_cost_target_enum)
task = dag.tasks[0]

if handle is not None:
Expand Down Expand Up @@ -607,6 +627,7 @@ def _launch_with_confirm(
retry_until_up=retry_until_up,
no_setup=no_setup,
clone_disk_from=clone_disk_from,
optimize_cost_target=optimize_cost_target,
)


Expand Down Expand Up @@ -706,6 +727,7 @@ def _make_task_or_dag_from_entrypoint_with_overrides(
disk_size: Optional[int] = None,
disk_tier: Optional[str] = None,
ports: Optional[Tuple[str]] = None,
optimize_cost_target: Optional[str] = None,
env: Optional[List[Tuple[str, str]]] = None,
field_to_ignore: Optional[List[str]] = None,
# job launch specific
Expand Down Expand Up @@ -748,6 +770,7 @@ def _make_task_or_dag_from_entrypoint_with_overrides(
disk_size=disk_size,
disk_tier=disk_tier,
ports=ports)

if field_to_ignore is not None:
_pop_and_ignore_fields_in_override_params(override_params,
field_to_ignore)
Expand Down Expand Up @@ -1052,6 +1075,7 @@ def launch(
disk_size: Optional[int],
disk_tier: Optional[str],
ports: Tuple[str],
optimize_cost_target: Optional[str],
idle_minutes_to_autostop: Optional[int],
down: bool, # pylint: disable=redefined-outer-name
retry_until_up: bool,
Expand Down Expand Up @@ -1092,6 +1116,7 @@ def launch(
disk_size=disk_size,
disk_tier=disk_tier,
ports=ports,
optimize_cost_target=optimize_cost_target,
)
if isinstance(task_or_dag, sky.Dag):
raise click.UsageError(
Expand Down Expand Up @@ -1126,7 +1151,8 @@ def launch(
down=down,
retry_until_up=retry_until_up,
no_setup=no_setup,
clone_disk_from=clone_disk_from)
clone_disk_from=clone_disk_from,
optimize_cost_target=optimize_cost_target)


@cli.command(cls=_DocumentedCodeCommand)
Expand All @@ -1149,28 +1175,15 @@ def launch(
@_add_click_options(_TASK_OPTIONS_WITH_NAME + _EXTRA_RESOURCES_OPTIONS)
@usage_lib.entrypoint
# pylint: disable=redefined-builtin
def exec(
cluster: str,
entrypoint: List[str],
detach_run: bool,
name: Optional[str],
cloud: Optional[str],
region: Optional[str],
zone: Optional[str],
workdir: Optional[str],
gpus: Optional[str],
ports: Tuple[str],
instance_type: Optional[str],
num_nodes: Optional[int],
use_spot: Optional[bool],
image_id: Optional[str],
env_file: Optional[Dict[str, str]],
env: List[Tuple[str, str]],
cpus: Optional[str],
memory: Optional[str],
disk_size: Optional[int],
disk_tier: Optional[str],
):
def exec(cluster: str, entrypoint: List[str], detach_run: bool,
name: Optional[str], cloud: Optional[str], region: Optional[str],
zone: Optional[str], workdir: Optional[str], gpus: Optional[str],
ports: Tuple[str], instance_type: Optional[str],
num_nodes: Optional[int], use_spot: Optional[bool],
image_id: Optional[str], env_file: Optional[Dict[str, str]],
env: List[Tuple[str, str]], cpus: Optional[str], memory: Optional[str],
disk_size: Optional[int], disk_tier: Optional[str],
optimize_cost_target: Optional[str]):
# NOTE(dev): Keep the docstring consistent between the Python API and CLI.
"""Execute a task or command on an existing cluster.

Expand Down Expand Up @@ -1256,6 +1269,7 @@ def exec(
disk_size=disk_size,
disk_tier=disk_tier,
ports=ports,
optimize_cost_target=optimize_cost_target,
field_to_ignore=['cpus', 'memory', 'disk_size', 'disk_tier', 'ports'],
)

Expand Down Expand Up @@ -3770,7 +3784,8 @@ def _generate_task_with_service(
type=str,
help='A service name. Unique for each service. If not provided, '
'a unique name is autogenerated.')
@_add_click_options(_TASK_OPTIONS + _EXTRA_RESOURCES_OPTIONS)
@_add_click_options(_TASK_OPTIONS + _EXTRA_RESOURCES_OPTIONS +
_TASK_OPTIMIZE_OPTIONS)
@click.option('--yes',
'-y',
is_flag=True,
Expand Down Expand Up @@ -3879,7 +3894,8 @@ def serve_up(
type=str,
nargs=-1,
**_get_shell_complete_args(_complete_file_name))
@_add_click_options(_TASK_OPTIONS + _EXTRA_RESOURCES_OPTIONS)
@_add_click_options(_TASK_OPTIONS + _EXTRA_RESOURCES_OPTIONS +
_TASK_OPTIMIZE_OPTIONS)
@click.option('--mode',
default=serve_lib.DEFAULT_UPDATE_MODE.value,
type=click.Choice([m.value for m in serve_lib.UpdateMode],
Expand Down Expand Up @@ -4356,6 +4372,7 @@ def benchmark_launch(
disk_size: Optional[int],
disk_tier: Optional[str],
ports: Tuple[str],
optimize_cost_target: Optional[str],
idle_minutes_to_autostop: Optional[int],
yes: bool,
) -> None:
Expand Down Expand Up @@ -4419,6 +4436,10 @@ def benchmark_launch(
if ports:
if any('ports' in candidate for candidate in candidates):
raise click.BadParameter(f'ports {message}')
if optimize_cost_target:
if any('optimize_cost_target' in candidate
for candidate in candidates):
raise click.BadParameter(f'optimize_cost_target {message}')

# The user can specify the benchmark candidates in either of the two ways:
# 1. By specifying resources.candidates in the YAML.
Expand Down Expand Up @@ -4455,17 +4476,19 @@ def benchmark_launch(
config['workdir'] = workdir
if num_nodes is not None:
config['num_nodes'] = num_nodes
override_params = _parse_override_params(cloud=cloud,
region=region,
zone=zone,
gpus=override_gpu,
cpus=cpus,
memory=memory,
use_spot=use_spot,
image_id=image_id,
disk_size=disk_size,
disk_tier=disk_tier,
ports=ports)
override_params = _parse_override_params(
cloud=cloud,
region=region,
zone=zone,
gpus=override_gpu,
cpus=cpus,
memory=memory,
use_spot=use_spot,
optimize_cost_target=optimize_cost_target,
image_id=image_id,
disk_size=disk_size,
disk_tier=disk_tier,
ports=ports)
_pop_and_ignore_fields_in_override_params(
override_params, field_to_ignore=['cpus', 'memory'])
resources_config.update(override_params)
Expand Down
12 changes: 12 additions & 0 deletions sky/clouds/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,18 @@ def instance_type_to_hourly_cost(self,
zone=zone,
clouds='aws')

def instance_type_to_hourly_carbon_cost(
self,
instance_type: str,
accelerators: Dict[str, int],
region: Optional[str] = None,
zone: Optional[str] = None) -> float:
return service_catalog.get_hourly_carbon_cost(instance_type,
accelerators,
region=region,
zone=zone,
clouds='aws')

def accelerators_to_hourly_cost(self,
accelerators: Dict[str, int],
use_spot: bool,
Expand Down
12 changes: 12 additions & 0 deletions sky/clouds/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,18 @@ def instance_type_to_hourly_cost(self,
zone=zone,
clouds='azure')

def instance_type_to_hourly_carbon_cost(
self,
instance_type: str,
accelerators: Dict[str, int],
region: Optional[str] = None,
zone: Optional[str] = None) -> float:
return service_catalog.get_hourly_carbon_cost(instance_type,
accelerators,
region=region,
zone=zone,
clouds='azure')

def accelerators_to_hourly_cost(self,
accelerators: Dict[str, int],
use_spot: bool,
Expand Down