Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sky Data #63

Merged
merged 21 commits into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 32 additions & 0 deletions prototype/examples/playground/storage_playground.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from sky.data import storage


def test_bucket_creation():
storage_1 = storage.Storage(name='mluo-data', source='~/Downloads/temp/')

storage_1.get_or_copy_to_s3() # Transfers data from local to S3
storage_1.get_or_copy_to_gcs() # Transfers data from local to GCS


def test_bucket_deletion():
storage_1 = storage.Storage(name='mluo-data', source='~/Downloads/temp/')
storage_1.get_or_copy_to_s3()
storage_1.get_or_copy_to_gcs()
storage_1.delete() # Deletes Data


def test_bucket_transfer():
# First time upload to s3
storage_1 = storage.Storage(name='mluo-data', source='~/Downloads/temp/')
bucket_path = storage_1.get_or_copy_to_s3(
) # Transfers data from local to S3

storage_2 = storage.Storage(name='mluo-data', source=bucket_path)
storage_2.get_or_copy_to_s3() # Connects to existing S3 bucket
storage_2.get_or_copy_to_gcs() # Transfer data from S3 to Gs bucket


if __name__ == "__main__":
test_bucket_creation()
test_bucket_deletion()
test_bucket_transfer()
52 changes: 52 additions & 0 deletions prototype/examples/resnet_app_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import subprocess

import sky

with sky.Dag() as dag:
# The working directory contains all code and will be synced to remote.
workdir = '~/Downloads/tpu'
data_mount_path = '/tmp/imagenet'
subprocess.run(f'cd {workdir} && git checkout 222cc86',
shell=True,
check=True)

# The setup command. Will be run under the working directory.
setup = 'echo \"alias python=python3\" >> ~/.bashrc && \
echo \"alias pip3=pip\" >> ~/.bashrc && \
source ~/.bashrc && \
pip install --upgrade pip && \
pip install awscli botocore boto3 && \
conda init bash && \
conda activate resnet || \
(conda create -n resnet python=3.7 -y && \
conda activate resnet && \
pip install tensorflow==2.4.0 pyyaml && \
cd models && pip install -e .)'

# The command to run. Will be run under the working directory.
run = f'conda activate resnet && \
python -u models/official/resnet/resnet_main.py --use_tpu=False \
--mode=train --train_batch_size=256 --train_steps=250 \
--iterations_per_loop=125 \
--data_dir={data_mount_path} \
--model_dir=resnet-model-dir \
--amp --xla --loss_scale=128'

# If the backend to be added is not specified, then Sky optimizer will
# choose the backend bucket to be stored.
storage = sky.Storage(name="imagenet-bucket", source="s3://imagenet-bucket")

train = sky.Task(
'train',
workdir=workdir,
storage_mounts={storage: data_mount_path},
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
setup=setup,
run=run,
)
train.set_inputs('s3://imagenet-bucket', estimated_size_gigabytes=150)
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
train.set_outputs('resnet-model-dir', estimated_size_gigabytes=0.1)
train.set_resources({
sky.Resources(sky.AWS(), 'p3.2xlarge'),
})

sky.execute(dag)
2 changes: 2 additions & 0 deletions prototype/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ boto3
colorama
jinja2
networkx
oauth2client
pandas
pendulum
PrettyTable
Expand All @@ -14,3 +15,4 @@ docker
awscli==1.22.17
azure-cli
google-api-python-client
google-cloud-storage
4 changes: 4 additions & 0 deletions prototype/sky/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""The Sky package."""
import os

# Keep this order to avoid cyclic imports
from sky import backends
from sky import clouds
from sky.clouds.service_catalog import list_accelerators
Expand All @@ -10,6 +11,7 @@
from sky.task import ParTask, Task
from sky.registry import fill_in_launchable_resources
from sky.optimizer import Optimizer, OptimizeTarget
from sky.data import Storage, StorageType

__root_dir__ = os.path.dirname(os.path.abspath(__file__))

Expand All @@ -35,4 +37,6 @@
'fill_in_launchable_resources',
'list_accelerators',
'__root_dir__',
'Storage',
'StorageType',
]
6 changes: 6 additions & 0 deletions prototype/sky/backends/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def sync_file_mounts(
) -> None:
raise NotImplementedError

def add_storage_objects(self, task: App) -> None:
raise NotImplementedError

def run_post_setup(self, handle: ResourceHandle, post_setup_fn: PostSetupFn,
task: App) -> None:
raise NotImplementedError
Expand All @@ -49,6 +52,9 @@ def post_execute(self, handle: ResourceHandle, teardown: bool) -> None:
"""Post execute(): e.g., print helpful inspection messages."""
raise NotImplementedError

def teardown_storage(self, task: App) -> None:
raise NotImplementedError

def teardown(self, handle: ResourceHandle) -> None:
raise NotImplementedError

Expand Down
60 changes: 53 additions & 7 deletions prototype/sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from sky import resources as resources_lib
from sky import task as task_mod
from sky.backends import backend_utils
from sky.data import storage

App = backend_utils.App

Expand Down Expand Up @@ -554,6 +555,45 @@ def sync_workdir(self, handle: ResourceHandle, workdir: Path) -> None:
target=SKY_REMOTE_WORKDIR,
with_outputs=True)

def add_storage_objects(self, task: App) -> None:
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
# Hack: Hardcode storage_plans to AWS for optimal plan
# Optimizer is supposed to choose storage plan but we
# move this here temporarily
for k in task.storage_mounts.keys():
task.storage_plans[k] = storage.StorageType.S3
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved

cache = []
storage_mounts = task.storage_mounts
storage_plans = task.storage_plans
for store, mnt_path in storage_mounts.items():
storage_type = storage_plans[store]
if storage_type not in cache:
if storage_type is storage.StorageType.S3:
# TODO: allow for Storage mounting of different clouds
task.update_file_mounts({'~/.aws': '~/.aws'})
elif storage_type is storage.StorageType.GCS:
pass
elif storage_type is storage.StorageType.AZURE:
pass
else:
raise ValueError(f'Storage Type {storage_type} \
does not exist!')
cache.append(storage_type)
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved

if storage_type is storage.StorageType.S3:
task.update_file_mounts({
mnt_path: 's3://' + store.name + '/',
})
elif storage_type is storage.StorageType.GCS:
task.update_file_mounts({
mnt_path: 'gs://' + store.name + '/',
})
elif storage_type is storage.StorageType.AZURE:
pass
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
else:
raise ValueError(f'Storage Type {storage_type} \
does not exist!')

def sync_file_mounts(
self,
handle: ResourceHandle,
Expand All @@ -573,21 +613,20 @@ def sync_file_mounts(
# (download gsutil on remote, run gsutil on remote). Consider
# alternatives (smart_open, each provider's own sdk), a
# data-transfer container etc.
storage = cloud_stores.get_storage_from_path(src)
store = cloud_stores.get_storage_from_path(src)
# Sync 'src' to 'wrapped_dst', a safe-to-write "wrapped" path.
wrapped_dst = backend_utils.wrap_file_mount(dst)
if storage.is_directory(src):
sync = storage.make_sync_dir_command(source=src,
destination=wrapped_dst)
if store.is_directory(src):
sync = store.make_sync_dir_command(source=src,
destination=wrapped_dst)
# It is a directory so make sure it exists.
mkdir_for_wrapped_dst = f'mkdir -p {wrapped_dst}'
else:
sync = storage.make_sync_file_command(source=src,
destination=wrapped_dst)
sync = store.make_sync_file_command(source=src,
destination=wrapped_dst)
# It is a file so make sure *its parent dir* exists.
mkdir_for_wrapped_dst = \
f'mkdir -p {os.path.dirname(wrapped_dst)}'
# Goal: point dst --> wrapped_dst.
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
symlink_to_make = dst.rstrip('/')
dir_of_symlink = os.path.dirname(symlink_to_make)
# Below, use sudo in case the symlink needs sudo access to create.
Expand Down Expand Up @@ -870,6 +909,13 @@ def post_execute(self, handle: ResourceHandle, teardown: bool) -> None:
logger.info(
'Tip: `sky down` will delete launched TPU(s) as well.')

def teardown_storage(self, task: App) -> None:
storage_mounts = task.storage_mounts
if storage_mounts is not None:
for store, _ in storage_mounts.items():
if not store.persistent:
store.delete()
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved

def teardown(self, handle: ResourceHandle) -> None:
backend_utils.run(f'ray down -y {handle.cluster_yaml}')
if handle.tpu_delete_script is not None:
Expand Down
59 changes: 59 additions & 0 deletions prototype/sky/cloud_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
import subprocess
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to team: As sky.Storage becomes the standard method of uploading/interfacing with data in Sky, we should deprecate CloudStorage and move it's functionality to AbstractStore
cc @concretevitamin @michaelzhiluo

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, we can refactor in a later PR @romilbhardwaj @concretevitamin

import urllib.parse

import boto3

from sky.backends import backend_utils
from sky.data import data_utils


class CloudStorage(object):
Expand All @@ -35,6 +38,60 @@ def make_sync_file_command(self, source: str, destination: str) -> str:
raise NotImplementedError


class S3CloudStorage(CloudStorage):
"""AWS Cloud Storage."""

# List of commands to install AWS CLI
_GET_AWSCLI = [
'pip install awscli',
]

def is_directory(self, url: str) -> bool:
"""Returns whether S3 'url' is a directory.

In cloud object stores, a "directory" refers to a regular object whose
name is a prefix of other objects.
"""
s3 = boto3.resource('s3')
bucket_name, path = data_utils.split_s3_path(url)
bucket = s3.Bucket(bucket_name)

num_objects = 0
for obj in bucket.objects.filter(Prefix=path):
num_objects += 1
if obj.key == path:
return False
# If there are more than 1 object in filter, then it is a directory
if num_objects == 3:
return True

# A directory with few or no items
return True

def make_sync_dir_command(self, source: str, destination: str) -> str:
"""Downloads using AWS CLI.
"""
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
# AWS Sync by default uses 10 threads to upload files to the bucket.
# To increase parallelism, modify max_concurrent_requests in your
# aws config file (Default path: ~/.aws/config).
download_via_awscli = f'mkdir -p {destination} && \
aws s3 sync {source} {destination}'
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved

all_commands = list(self._GET_AWSCLI)
all_commands.append(download_via_awscli)
return ' && '.join(all_commands)

def make_sync_file_command(self, source: str, destination: str) -> str:
"""Downloads a file using AWS CLI.
"""
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
download_via_awscli = f'mkdir -p {destination} && \
aws s3 cp {source} {destination}'

all_commands = list(self._GET_AWSCLI)
all_commands.append(download_via_awscli)
return ' && '.join(all_commands)


class GcsCloudStorage(CloudStorage):
"""Google Cloud Storage."""

Expand Down Expand Up @@ -100,6 +157,7 @@ def make_sync_file_command(self, source: str, destination: str) -> str:
def get_storage_from_path(url: str) -> CloudStorage:
"""Returns a CloudStorage by identifying the scheme:// in a URL."""
result = urllib.parse.urlsplit(url)

if result.scheme not in _REGISTRY:
assert False, ('Scheme {} not found in'
' supported storage ({}); path {}'.format(
Expand All @@ -109,4 +167,5 @@ def get_storage_from_path(url: str) -> CloudStorage:

_REGISTRY = {
'gs': GcsCloudStorage(),
's3': S3CloudStorage(),
}
4 changes: 4 additions & 0 deletions prototype/sky/data/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Sky Backends."""
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
from sky.data.storage import Storage, StorageType

__all__ = ['Storage', 'StorageType']