Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sky Data #63

Merged
merged 21 commits into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 31 additions & 0 deletions prototype/examples/playground/storage_playground.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from sky.data import storage


def test_bucket_creation():
storage_1 = storage.Storage(name='mluo-data', source='~/Downloads/temp/')
storage_1.get_or_copy_to_s3() # Transfers data from local to S3
storage_1.get_or_copy_to_gcs() # Transfers data from local to GCS


def test_bucket_deletion():
storage_1 = storage.Storage(name='mluo-data', source='~/Downloads/temp/')
storage_1.get_or_copy_to_s3()
storage_1.get_or_copy_to_gcs()
storage_1.delete() # Deletes Data


def test_bucket_transfer():
# First time upload to s3
storage_1 = storage.Storage(name='mluo-data', source='~/Downloads/temp/')
bucket_path = storage_1.get_or_copy_to_s3(
) # Transfers data from local to S3

storage_2 = storage.Storage(name='mluo-data', source=bucket_path)
storage_2.get_or_copy_to_s3()
storage_2.get_or_copy_to_gcs() # Transfer data from S3 to Gs bucket


if __name__ == '__main__':
test_bucket_creation()
test_bucket_deletion()
test_bucket_transfer()
57 changes: 57 additions & 0 deletions prototype/examples/resnet_app_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import subprocess

import sky

with sky.Dag() as dag:
# The working directory contains all code and will be synced to remote.
workdir = '~/Downloads/tpu'
data_mount_path = '/tmp/imagenet'
subprocess.run(f'cd {workdir} && git checkout 222cc86',
shell=True,
check=True)

# The setup command. Will be run under the working directory.
setup = 'echo \"alias python=python3\" >> ~/.bashrc && \
echo \"alias pip3=pip\" >> ~/.bashrc && \
source ~/.bashrc && \
pip install --upgrade pip && \
pip install awscli botocore boto3 && \
conda init bash && \
conda activate resnet || \
(conda create -n resnet python=3.7 -y && \
conda activate resnet && \
pip install tensorflow==2.4.0 pyyaml && \
cd models && pip install -e .)'

# The command to run. Will be run under the working directory.
run = f'conda activate resnet && \
python -u models/official/resnet/resnet_main.py --use_tpu=False \
--mode=train --train_batch_size=256 --train_steps=250 \
--iterations_per_loop=125 \
--data_dir={data_mount_path} \
--model_dir=resnet-model-dir \
--amp --xla --loss_scale=128'

# If the backend to be added is not specified, then Sky optimizer will
# choose the backend bucket to be stored.
storage = sky.Storage(name="imagenet-bucket", source="s3://imagenet-bucket")
# Can also be from a local dir
# storage = sky.Storage(name="imagenet-bucket", source="~/imagenet-data/")

train = sky.Task(
'train',
workdir=workdir,
setup=setup,
run=run,
)
train.set_storage_mounts({
storage: data_mount_path,
})

train.set_inputs('s3://imagenet-bucket', estimated_size_gigabytes=150)
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
train.set_outputs('resnet-model-dir', estimated_size_gigabytes=0.1)
train.set_resources({
sky.Resources(sky.AWS(), 'p3.2xlarge'),
})

sky.execute(dag)
2 changes: 2 additions & 0 deletions prototype/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ boto3
colorama
jinja2
networkx
oauth2client
pandas
pendulum
PrettyTable
Expand All @@ -14,3 +15,4 @@ docker
awscli==1.22.17
azure-cli
google-api-python-client
google-cloud-storage
4 changes: 4 additions & 0 deletions prototype/sky/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""The Sky package."""
import os

# Keep this order to avoid cyclic imports
from sky import backends
from sky import clouds
from sky.clouds.service_catalog import list_accelerators
Expand All @@ -10,6 +11,7 @@
from sky.task import ParTask, Task
from sky.registry import fill_in_launchable_resources
from sky.optimizer import Optimizer, OptimizeTarget
from sky.data import Storage, StorageType

__root_dir__ = os.path.dirname(os.path.abspath(__file__))

Expand All @@ -35,4 +37,6 @@
'fill_in_launchable_resources',
'list_accelerators',
'__root_dir__',
'Storage',
'StorageType',
]
6 changes: 6 additions & 0 deletions prototype/sky/backends/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def sync_file_mounts(
) -> None:
raise NotImplementedError

def add_storage_objects(self, task: App) -> None:
raise NotImplementedError

def run_post_setup(self, handle: ResourceHandle, post_setup_fn: PostSetupFn,
task: App) -> None:
raise NotImplementedError
Expand All @@ -49,6 +52,9 @@ def post_execute(self, handle: ResourceHandle, teardown: bool) -> None:
"""Post execute(): e.g., print helpful inspection messages."""
raise NotImplementedError

def teardown_ephemeral_storage(self, task: App) -> None:
raise NotImplementedError

def teardown(self, handle: ResourceHandle) -> None:
raise NotImplementedError

Expand Down
7 changes: 7 additions & 0 deletions prototype/sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,13 @@ def post_execute(self, handle: ResourceHandle, teardown: bool) -> None:
logger.info(
'Tip: `sky down` will delete launched TPU(s) as well.')

def teardown_ephemeral_storage(self, task: App) -> None:
storage_mounts = task.storage_mounts
if storage_mounts is not None:
for storage, _ in storage_mounts.items():
if not storage.persistent:
storage.delete()

def teardown(self, handle: ResourceHandle) -> None:
backend_utils.run(f'ray down -y {handle.cluster_yaml}')
if handle.tpu_delete_script is not None:
Expand Down
57 changes: 57 additions & 0 deletions prototype/sky/cloud_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
import subprocess
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to team: As sky.Storage becomes the standard method of uploading/interfacing with data in Sky, we should deprecate CloudStorage and move it's functionality to AbstractStore
cc @concretevitamin @michaelzhiluo

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, we can refactor in a later PR @romilbhardwaj @concretevitamin

import urllib.parse

import boto3

from sky.backends import backend_utils
from sky.data import data_utils


class CloudStorage(object):
Expand All @@ -35,6 +38,58 @@ def make_sync_file_command(self, source: str, destination: str) -> str:
raise NotImplementedError


class S3CloudStorage(CloudStorage):
"""AWS Cloud Storage."""

# List of commands to install AWS CLI
_GET_AWSCLI = [
'pip install awscli',
]

def is_directory(self, url: str) -> bool:
"""Returns whether S3 'url' is a directory.

In cloud object stores, a "directory" refers to a regular object whose
name is a prefix of other objects.
"""
s3 = boto3.resource('s3')
bucket_name, path = data_utils.split_s3_path(url)
bucket = s3.Bucket(bucket_name)

num_objects = 0
for obj in bucket.objects.filter(Prefix=path):
num_objects += 1
if obj.key == path:
return False
# If there are more than 1 object in filter, then it is a directory
if num_objects == 3:
return True

# A directory with few or no items
return True

def make_sync_dir_command(self, source: str, destination: str) -> str:
"""Downloads using AWS CLI."""
# AWS Sync by default uses 10 threads to upload files to the bucket.
# To increase parallelism, modify max_concurrent_requests in your
# aws config file (Default path: ~/.aws/config).
download_via_awscli = f'mkdir -p {destination} && \
aws s3 sync {source} {destination}'
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved

all_commands = list(self._GET_AWSCLI)
all_commands.append(download_via_awscli)
return ' && '.join(all_commands)

def make_sync_file_command(self, source: str, destination: str) -> str:
"""Downloads a file using AWS CLI."""
download_via_awscli = f'mkdir -p {destination} && \
aws s3 cp {source} {destination}'

all_commands = list(self._GET_AWSCLI)
all_commands.append(download_via_awscli)
return ' && '.join(all_commands)


class GcsCloudStorage(CloudStorage):
"""Google Cloud Storage."""

Expand Down Expand Up @@ -100,6 +155,7 @@ def make_sync_file_command(self, source: str, destination: str) -> str:
def get_storage_from_path(url: str) -> CloudStorage:
"""Returns a CloudStorage by identifying the scheme:// in a URL."""
result = urllib.parse.urlsplit(url)

if result.scheme not in _REGISTRY:
assert False, ('Scheme {} not found in'
' supported storage ({}); path {}'.format(
Expand All @@ -109,4 +165,5 @@ def get_storage_from_path(url: str) -> CloudStorage:

_REGISTRY = {
'gs': GcsCloudStorage(),
's3': S3CloudStorage(),
}
4 changes: 4 additions & 0 deletions prototype/sky/data/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Sky Data."""
from sky.data.storage import Storage, StorageType

__all__ = ['Storage', 'StorageType']
109 changes: 109 additions & 0 deletions prototype/sky/data/data_transfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""Data Transfer between 4 Sources:
- Local (User's laptop or lab machine)
- AWS - S3 Bucket
- GCP - GCS Bucket
- Azure - Azure blob bucket

Currently implemented:
- Local -> S3
- S3 -> Local
- Local -> GCS
- GCS -> Local
- S3 -> GCS

TODO:
- All combinations of Azure Transfer
- GCS -> S3
"""
from datetime import datetime
import json
import os
from typing import Any

import boto3
from googleapiclient import discovery
from google.cloud import storage
from oauth2client.client import GoogleCredentials

from sky import logging

logger = logging.init_logger(__name__)

S3Store = Any
GcsStore = Any


def s3_to_gcs(s3_store: S3Store, gs_store: GcsStore) -> None:
"""Creates a one-time transfer from Amazon S3 to Google Cloud Storage.
Can be viewed from: https://console.cloud.google.com/transfer/cloud

Args:
s3_store: S3Store; AWS S3 Store that contains a
corresponding S3 bucket
gs_store: GcsStore; GCP Gs Store that contains a
corresponding GCS bucket
"""
credentials = GoogleCredentials.get_application_default()
storagetransfer = discovery.build(serviceName='storagetransfer',
version='v1',
credentials=credentials)

session = boto3.Session()
aws_credentials = session.get_credentials().get_frozen_credentials()

with open(os.environ['GOOGLE_APPLICATION_CREDENTIALS'], 'r') as fp:
gcp_credentials = json.load(fp)
project_id = gcp_credentials['project_id']

# Update cloud bucket IAM role to allow for data transfer
storage_account = storagetransfer.googleServiceAccounts().get(
projectId=project_id).execute()
_add_bucket_iam_member(gs_store.name, 'roles/storage.admin',
'serviceAccount:' + storage_account['accountEmail'])

starttime = datetime.utcnow()
transfer_job = {
'description': f'Transferring data from S3 Bucket \
{s3_store.name} to GCS Bucket {gs_store.name}',
'status': 'ENABLED',
'projectId': project_id,
'schedule': {
'scheduleStartDate': {
'day': starttime.day,
'month': starttime.month,
'year': starttime.year,
},
'scheduleEndDate': {
'day': starttime.day,
'month': starttime.month,
'year': starttime.year,
},
},
'transferSpec': {
'awsS3DataSource': {
'bucketName': s3_store.name,
'awsAccessKey': {
'accessKeyId': aws_credentials.access_key,
'secretAccessKey': aws_credentials.secret_key,
}
},
'gcsDataSink': {
'bucketName': gs_store.name,
}
}
}

result = storagetransfer.transferJobs().create(body=transfer_job).execute()
logger.info(f'AWS -> GCS Transfer Job: {json.dumps(result, indent=4)}')


def _add_bucket_iam_member(bucket_name: str, role: str, member: str) -> None:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)

policy = bucket.get_iam_policy(requested_policy_version=3)
policy.bindings.append({'role': role, 'members': {member}})

bucket.set_iam_policy(policy)

logger.info(f'Added {member} with role {role} to {bucket_name}.')