Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sky Data #63

Merged
merged 21 commits into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 32 additions & 0 deletions prototype/examples/playground/storage_playground.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from sky.data import storage


def test_bucket_creation():
storage_1 = storage.Storage(name='mluo-data', source='~/Downloads/temp/')

storage_1.get_or_copy_to_s3() # Transfers data from local to S3
storage_1.get_or_copy_to_gcs() # Transfers data from local to GCS


def test_bucket_deletion():
storage_1 = storage.Storage(name='mluo-data', source='~/Downloads/temp/')
storage_1.get_or_copy_to_s3()
storage_1.get_or_copy_to_gcs()
storage_1.delete() # Deletes Data


def test_bucket_transfer():
# First time upload to s3
storage_1 = storage.Storage(name='mluo-data', source='~/Downloads/temp/')
bucket_path = storage_1.get_or_copy_to_s3(
) # Transfers data from local to S3

storage_2 = storage.Storage(name='mluo-data', source=bucket_path)
storage_2.get_or_copy_to_s3() # Connects to existing S3 bucket
storage_2.get_or_copy_to_gcs() # Transfer data from S3 to Gs bucket


if __name__ == "__main__":
test_bucket_creation()
test_bucket_deletion()
test_bucket_transfer()
20 changes: 6 additions & 14 deletions prototype/examples/resnet_app_storage.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import subprocess

import sky
from sky import clouds, Storage

import time_estimators

with sky.Dag() as dag:
# The working directory contains all code and will be synced to remote.
Expand Down Expand Up @@ -35,26 +32,21 @@
--model_dir=resnet-model-dir \
--amp --xla --loss_scale=128'

storage = Storage(name="imagenet-bucket",
source_path="s3://imagenet-bucket",
default_mount_path=data_mount_path)
# If the backend to be added is not specified, then Sky optimizer will
# choose the backend bucket to be stored.
storage = sky.Storage(name="imagenet-bucket", source="s3://imagenet-bucket")

train = sky.Task(
'train',
workdir=workdir,
storage=storage,
storage_mounts={storage: data_mount_path},
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
setup=setup,
run=run,
)
train.set_inputs('s3://imagenet-bucket', estimated_size_gigabytes=150)
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
train.set_outputs('resnet-model-dir', estimated_size_gigabytes=0.1)
train.set_resources({
##### Fully specified
sky.Resources(clouds.AWS(), 'p3.2xlarge'),
sky.Resources(sky.AWS(), 'p3.2xlarge'),
})

# Optionally, specify a time estimator: Resources -> time in seconds.
# train.set_time_estimator(time_estimators.resnet50_estimate_runtime)

dag = sky.Optimizer.optimize(dag, minimize=sky.Optimizer.COST)
# sky.execute(dag, dryrun=True)
sky.execute(dag)
34 changes: 0 additions & 34 deletions prototype/examples/storage_playground.py

This file was deleted.

4 changes: 3 additions & 1 deletion prototype/sky/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""The Sky package."""
import os

# Keep this order to avoid cyclic imports
from sky import backends
from sky import clouds
from sky.clouds.service_catalog import list_accelerators
Expand All @@ -9,8 +10,8 @@
from sky.resources import Resources
from sky.task import ParTask, Task
from sky.registry import fill_in_launchable_resources
from sky.storage import Storage
from sky.optimizer import Optimizer, OptimizeTarget
from sky.data import Storage, StorageType

__root_dir__ = os.path.dirname(os.path.abspath(__file__))

Expand All @@ -37,4 +38,5 @@
'list_accelerators',
'__root_dir__',
'Storage',
'StorageType',
]
6 changes: 6 additions & 0 deletions prototype/sky/backends/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def sync_file_mounts(
) -> None:
raise NotImplementedError

def add_storage_objects(self, task: App) -> None:
raise NotImplementedError

def run_post_setup(self, handle: ResourceHandle, post_setup_fn: PostSetupFn,
task: App) -> None:
raise NotImplementedError
Expand All @@ -49,6 +52,9 @@ def post_execute(self, handle: ResourceHandle, teardown: bool) -> None:
"""Post execute(): e.g., print helpful inspection messages."""
raise NotImplementedError

def teardown_storage(self, task: App) -> None:
raise NotImplementedError

def teardown(self, handle: ResourceHandle) -> None:
raise NotImplementedError

Expand Down
66 changes: 52 additions & 14 deletions prototype/sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from sky import resources as resources_lib
from sky import task as task_mod
from sky.backends import backend_utils
from sky.data import storage

App = backend_utils.App

Expand Down Expand Up @@ -554,13 +555,44 @@ def sync_workdir(self, handle: ResourceHandle, workdir: Path) -> None:
target=SKY_REMOTE_WORKDIR,
with_outputs=True)

def add_storage_backend(self, task: App, cloud_type: str) -> None:
storage = task.storage
storage.add_backend(cloud_type)
if cloud_type == 'AWS':
task.append_file_mount('~/.aws', '~/.aws')
task.append_file_mount(storage.default_mount_path,
's3://' + storage.name + '/')
def add_storage_objects(self, task: App) -> None:
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
# Hack: Hardcode storage_plans to AWS for optimal plan
# Optimizer is supposed to choose storage plan but we
# move this here temporarily
for k in task.storage_mounts.keys():
task.storage_plans[k] = storage.StorageType.S3
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved

cache = []
storage_mounts = task.storage_mounts
storage_plans = task.storage_plans
for store, mnt_path in storage_mounts.items():
storage_type = storage_plans[store]
if storage_type not in cache:
if storage_type is storage.StorageType.S3:
# TODO: allow for Storage mounting of different clouds
task.update_file_mounts({'~/.aws': '~/.aws'})
elif storage_type is storage.StorageType.GCS:
pass
elif storage_type is storage.StorageType.AZURE:
pass
else:
raise ValueError(f'Storage Type {storage_type} \
does not exist!')
cache.append(storage_type)
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved

if storage_type is storage.StorageType.S3:
task.update_file_mounts({
mnt_path: 's3://' + store.name + '/',
})
elif storage_type is storage.StorageType.GCS:
task.update_file_mounts({
mnt_path: 'gs://' + store.name + '/',
})
elif storage_type is storage.StorageType.AZURE:
pass
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
else:
raise ValueError(f'Storage Type {storage_type} \
does not exist!')

def sync_file_mounts(
self,
Expand All @@ -581,21 +613,20 @@ def sync_file_mounts(
# (download gsutil on remote, run gsutil on remote). Consider
# alternatives (smart_open, each provider's own sdk), a
# data-transfer container etc.
storage = cloud_stores.get_storage_from_path(src)
store = cloud_stores.get_storage_from_path(src)
# Sync 'src' to 'wrapped_dst', a safe-to-write "wrapped" path.
wrapped_dst = backend_utils.wrap_file_mount(dst)
if storage.is_directory(src):
sync = storage.make_sync_dir_command(source=src,
destination=wrapped_dst)
if store.is_directory(src):
sync = store.make_sync_dir_command(source=src,
destination=wrapped_dst)
# It is a directory so make sure it exists.
mkdir_for_wrapped_dst = f'mkdir -p {wrapped_dst}'
else:
sync = storage.make_sync_file_command(source=src,
destination=wrapped_dst)
sync = store.make_sync_file_command(source=src,
destination=wrapped_dst)
# It is a file so make sure *its parent dir* exists.
mkdir_for_wrapped_dst = \
f'mkdir -p {os.path.dirname(wrapped_dst)}'
# Goal: point dst --> wrapped_dst.
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
symlink_to_make = dst.rstrip('/')
dir_of_symlink = os.path.dirname(symlink_to_make)
# Below, use sudo in case the symlink needs sudo access to create.
Expand Down Expand Up @@ -878,6 +909,13 @@ def post_execute(self, handle: ResourceHandle, teardown: bool) -> None:
logger.info(
'Tip: `sky down` will delete launched TPU(s) as well.')

def teardown_storage(self, task: App) -> None:
storage_mounts = task.storage_mounts
if storage_mounts is not None:
for store, _ in storage_mounts.items():
if not store.persistent:
store.delete()
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved

def teardown(self, handle: ResourceHandle) -> None:
backend_utils.run(f'ray down -y {handle.cluster_yaml}')
if handle.tpu_delete_script is not None:
Expand Down