Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sky Data #63

Merged
merged 21 commits into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
60 changes: 60 additions & 0 deletions prototype/examples/resnet_app_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import subprocess

import sky
from sky import clouds, Storage
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved

import time_estimators

with sky.Dag() as dag:
# The working directory contains all code and will be synced to remote.
workdir = '~/Downloads/tpu'
data_mount_path = '/tmp/imagenet'
subprocess.run(f'cd {workdir} && git checkout 222cc86',
shell=True,
check=True)

# The setup command. Will be run under the working directory.
setup = 'echo \"alias python=python3\" >> ~/.bashrc && \
echo \"alias pip3=pip\" >> ~/.bashrc && \
source ~/.bashrc && \
pip install --upgrade pip && \
pip install awscli botocore boto3 && \
conda init bash && \
conda activate resnet || \
(conda create -n resnet python=3.7 -y && \
conda activate resnet && \
pip install tensorflow==2.4.0 pyyaml && \
cd models && pip install -e .)'

# The command to run. Will be run under the working directory.
run = f'conda activate resnet && \
python -u models/official/resnet/resnet_main.py --use_tpu=False \
--mode=train --train_batch_size=256 --train_steps=250 \
--iterations_per_loop=125 \
--data_dir={data_mount_path} \
--model_dir=resnet-model-dir \
--amp --xla --loss_scale=128'

storage = Storage(name="imagenet-bucket",
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
source_path="s3://imagenet-bucket",
default_mount_path=data_mount_path)
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
train = sky.Task(
'train',
workdir=workdir,
storage=storage,
setup=setup,
run=run,
)
train.set_inputs('s3://imagenet-bucket', estimated_size_gigabytes=150)
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
train.set_outputs('resnet-model-dir', estimated_size_gigabytes=0.1)
train.set_resources({
##### Fully specified
sky.Resources(clouds.AWS(), 'p3.2xlarge'),
})

# Optionally, specify a time estimator: Resources -> time in seconds.
# train.set_time_estimator(time_estimators.resnet50_estimate_runtime)

dag = sky.Optimizer.optimize(dag, minimize=sky.Optimizer.COST)
# sky.execute(dag, dryrun=True)
sky.execute(dag)
34 changes: 34 additions & 0 deletions prototype/examples/storage_playground.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from sky import Storage
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved


michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
def test_bucket_creation():
storage = Storage(name="mluo-data",
source_path="~/Downloads/temp/",
default_mount_path="/tmp/mluo-data")

storage.add_backend("AWS")
storage.add_backend("GCP")
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved


def test_bucket_deletion():
storage = Storage(name="mluo-data",
source_path="~/Downloads/temp/",
default_mount_path="/tmp/mluo-data")
storage.add_backend("AWS")
storage.add_backend("GCP")
storage.cleanup()


def test_bucket_transfer():

# First time upload to s3
storage = Storage(name="mluo-data",
source_path="~/Downloads/temp/",
default_mount_path="/tmp/mluo-data")
storage.add_backend("AWS")

storage = Storage(name="mluo-data",
source_path="s3://mluo-data",
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
default_mount_path="/tmp/data/")
storage.add_backend("AWS")
storage.add_backend("GCP")
2 changes: 2 additions & 0 deletions prototype/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ boto3
colorama
jinja2
networkx
oauth2client
pandas
pendulum
PrettyTable
Expand All @@ -14,3 +15,4 @@ docker
awscli==1.22.17
azure-cli
google-api-python-client
google-cloud-storage
2 changes: 2 additions & 0 deletions prototype/sky/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from sky.resources import Resources
from sky.task import ParTask, Task
from sky.registry import fill_in_launchable_resources
from sky.storage import Storage
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
from sky.optimizer import Optimizer, OptimizeTarget

__root_dir__ = os.path.dirname(os.path.abspath(__file__))
Expand All @@ -35,4 +36,5 @@
'fill_in_launchable_resources',
'list_accelerators',
'__root_dir__',
'Storage',
]
8 changes: 8 additions & 0 deletions prototype/sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,14 @@ def sync_workdir(self, handle: ResourceHandle, workdir: Path) -> None:
# deprecated.
_run(f'ray rsync_up {handle} {workdir}/ {SKY_REMOTE_WORKDIR}')

def add_storage_backend(self, task: App, cloud_type: str) -> None:
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
storage = task.storage
storage.add_backend(cloud_type)
if cloud_type == 'AWS':
task.append_file_mount('~/.aws', '~/.aws')
task.append_file_mount(storage.default_mount_path,
's3://' + storage.name + '/')

def sync_file_mounts(
self,
handle: ResourceHandle,
Expand Down
192 changes: 192 additions & 0 deletions prototype/sky/backends/data_transfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
"""Data Transfer between 4 Sources:
- Local (User's laptop or lab machine)
- AWS - S3 Bucket
- GCP - GCS Bucket
- Azure - Azure blob bucket

Currently implemented:
- Local -> S3
- S3 -> Local
- Local -> GCS
- GCS -> Local
- S3 -> GCS

TODO:
- All combinations of Azure Transfer
- GCS -> S3
"""
# pylint: disable=maybe-no-member
from datetime import datetime
import glob
import json
from multiprocessing.pool import ThreadPool
import os
from typing import Any

import boto3
from googleapiclient import discovery
from google.cloud import storage
from oauth2client.client import GoogleCredentials
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved

StorageBackend = Any
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved


def s3_to_gcs(aws_backend: StorageBackend, gcs_backend: StorageBackend) -> None:
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
"""Creates a one-time transfer from Amazon S3 to Google Cloud Storage.
Can be viewed from: https://console.cloud.google.com/transfer/cloud

Args:
aws_backend: StorageBackend; AWS Backend that contains a
corresponding S3 bucket
gcs_backend: StorageBackend; GCS Backend that contains a
corresponding GCS bucket
"""
credentials = GoogleCredentials.get_application_default()
storagetransfer = discovery.build('storagetransfer',
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
'v1',
credentials=credentials)

session = boto3.Session()
aws_credentials = session.get_credentials().get_frozen_credentials()

# Update cloud bucket IAM role to allow for data transfer
project_id = 'intercloud-320520'
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
storage_account = storagetransfer.googleServiceAccounts().get(
projectId=project_id).execute()
_add_bucket_iam_member(gcs_backend.name, 'roles/storage.admin',
'serviceAccount:' + storage_account['accountEmail'])

starttime = datetime.utcnow()
transfer_job = {
'description': f'Transferring data from S3 Bucket \
{aws_backend.name} to GCS Bucket {gcs_backend.name}',
'status': 'ENABLED',
'projectId': project_id,
'schedule': {
'scheduleStartDate': {
'day': starttime.day,
'month': starttime.month,
'year': starttime.year,
},
'scheduleEndDate': {
'day': starttime.day,
'month': starttime.month,
'year': starttime.year,
},
},
'transferSpec': {
'awsS3DataSource': {
'bucketName': aws_backend.name,
'awsAccessKey': {
'accessKeyId': aws_credentials.access_key,
'secretAccessKey': aws_credentials.secret_key,
}
},
'gcsDataSink': {
'bucketName': gcs_backend.name,
}
}
}

result = storagetransfer.transferJobs().create(body=transfer_job).execute()
print(f'AWS -> GCS Transfer Job: {json.dumps(result, indent=4)}')
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved


def local_to_gcs(local_path: str, gcs_backend: StorageBackend) -> None:
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
"""Creates a one-time transfer from Local to GCS.

Args:
local_path: str; Local path on user's device
gcs_backend: StorageBackend; GCS Backend that contains a
corresponding GCS bucket
"""
assert local_path is not None
local_path = os.path.expanduser(local_path)
all_paths = glob.glob(local_path + '/**', recursive=True)
del all_paths[0]

def _upload_thread(local_file):
remote_path = local_file.replace(local_path, '')
print(f'Uploading {local_file} to {remote_path}')
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
if os.path.isfile(local_file):
blob = gcs_backend.bucket.blob(remote_path)
blob.upload_from_filename(local_file)

pool = ThreadPool(processes=32)
pool.map(_upload_thread, all_paths)


def local_to_s3(local_path: str, aws_backend: StorageBackend) -> None:
"""Creates a one-time transfer from Local to S3.

Args:
local_path: str; Local path on user's device
aws_backend: StorageBackend; AWS Backend that contains a
corresponding S3 bucket
"""
assert local_path is not None
local_path = os.path.expanduser(local_path)
all_paths = glob.glob(local_path + '/**', recursive=True)
del all_paths[0]

def _upload_thread(local_file):
remote_path = local_file.replace(local_path, '')
print(f'Uploading {local_file} to {remote_path}')
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
if os.path.isfile(local_file):
aws_backend.client.upload_file(local_file, aws_backend.name,
remote_path)

pool = ThreadPool(processes=32)
pool.map(_upload_thread, all_paths)
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved


def s3_to_local(aws_backend: StorageBackend, local_path: str) -> None:
"""Creates a one-time transfer from S3 to Local.

Args:
aws_backend: StorageBackend; AWS Backend that contains a
corresponding S3 bucket
local_path: str; Local path on user's device
"""
assert local_path is not None
local_path = os.path.expanduser(local_path)
for obj in aws_backend.bucket.objects.filter():
if obj.key[-1] == '/':
continue
path = os.path.join(local_path, obj.key)
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
print(f'Downloading {obj.key} to {path}')
aws_backend.bucket.download_file(obj.key, path)


def gcs_to_local(gcs_backend: StorageBackend, local_path: str) -> None:
"""Creates a one-time transfer from GCS to Local.

Args:
gcs_backend: StorageBackend; GCS Backend that contains a
corresponding GCS bucket
local_path: str; Local path on user's device
"""
assert local_path is not None
local_path = os.path.expanduser(local_path)
for obj in gcs_backend.bucket.list_blobs():
if obj.name[-1] == '/':
continue
path = os.path.join(local_path, obj.name)
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
print(f'Downloading {obj.name} to {path}')
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
obj.download_to_filename(path)
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved


def _add_bucket_iam_member(bucket_name, role, member):
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)

policy = bucket.get_iam_policy(requested_policy_version=3)
policy.bindings.append({'role': role, 'members': {member}})

bucket.set_iam_policy(policy)

print(f'Added {member} with role {role} to {bucket_name}.')
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
47 changes: 47 additions & 0 deletions prototype/sky/backends/data_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Miscellaneous Utils for Sky Data
"""
import boto3
from google.cloud import storage


def split_s3_path(s3_path):
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
"""Splits S3 Path into Bucket name and Relative Path to Bucket

Args:
s3_path: str; S3 Path, e.g. s3://imagenet/train/
"""
path_parts = s3_path.replace('s3://', '').split('/')
bucket = path_parts.pop(0)
key = '/'.join(path_parts)
return bucket, key


def split_gcs_path(gcs_path):
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
"""Splits GCS Path into Bucket name and Relative Path to Bucket

Args:
gcs_path: str; GCS Path, e.g. gcs://imagenet/train/
"""
path_parts = gcs_path.replace('gcs://', '').split('/')
bucket = path_parts.pop(0)
key = '/'.join(path_parts)
return bucket, key


def create_s3_client(region: str = 'us-east-2'):
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
"""Helper method that connects to Boto3 client for S3 Bucket

Args:
region: str; Region name, e.g. us-west-1, us-east-2
"""
return boto3.client('s3', region_name=region)


def create_gcs_client():
michaelzhiluo marked this conversation as resolved.
Show resolved Hide resolved
"""Helper method that connects to GCS Storage Client for
GCS Bucket

Args:
region: str; Region name, e.g. us-central1, us-west1
"""
return storage.Client()