Skip to content

Commit

Permalink
Merge pull request #651 from efiop/master
Browse files Browse the repository at this point in the history
[WIP] boto3 and local config
  • Loading branch information
efiop committed Apr 13, 2018
2 parents f178727 + 48cc9af commit 1b69557
Show file tree
Hide file tree
Showing 12 changed files with 140 additions and 325 deletions.
1 change: 1 addition & 0 deletions .appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ build: false
before_test:
- aws configure set aws_access_key_id "%aws_access_key_id%"
- aws configure set aws_secret_access_key "%aws_secret_access_key%"
- aws configure set region us-east-2
- openssl enc -d -aes-256-cbc -md md5 -k "%GCP_CREDS%" -in scripts\ci\gcp-creds.json.enc -out scripts\ci\gcp-creds.json & exit 0
- pip install -r test-requirements.txt

Expand Down
20 changes: 20 additions & 0 deletions dvc/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ def parse_args(argv=None):
nargs='?',
default=None,
help='Option value')
config_parser.add_argument(
'--local',
action='store_true',
default=False,
help='Use local config')
config_parser.set_defaults(func=CmdConfig)


Expand All @@ -262,6 +267,11 @@ def parse_args(argv=None):
remote_add_parser.add_argument(
'url',
help='Url')
remote_add_parser.add_argument(
'--local',
action='store_true',
default=False,
help='Use local config')
remote_add_parser.set_defaults(func=CmdRemoteAdd)


Expand All @@ -272,6 +282,11 @@ def parse_args(argv=None):
remote_remove_parser.add_argument(
'name',
help='Name')
remote_remove_parser.add_argument(
'--local',
action='store_true',
default=False,
help='Use local config')
remote_remove_parser.set_defaults(func=CmdRemoteRemove)


Expand All @@ -294,6 +309,11 @@ def parse_args(argv=None):
default=False,
action='store_true',
help='Unset option')
remote_modify_parser.add_argument(
'--local',
action='store_true',
default=False,
help='Use local config')
remote_modify_parser.set_defaults(func=CmdRemoteModify)


Expand Down
227 changes: 49 additions & 178 deletions dvc/cloud/aws.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import os
import math
import threading

from boto.s3.connection import S3Connection
import boto3
import botocore
try:
import httplib
except ImportError:
Expand All @@ -15,7 +17,6 @@
from dvc.config import Config
from dvc.logger import Logger
from dvc.progress import progress
from dvc.cloud.credentials_aws import AWSCredentials
from dvc.cloud.base import DataCloudError, DataCloudBase


Expand All @@ -28,92 +29,59 @@ def sizeof_fmt(num, suffix='B'):
return "%.1f%s%s" % (num, 'Y', suffix)


def percent_cb(name, part_complete, part_total, offset=0, multipart_total=None):
def percent_cb(name, complete, total):
""" Callback for updating target progress """
complete = offset + part_complete
total = multipart_total if multipart_total != None else part_total

Logger.debug('{}: {} transferred out of {}'.format(name,
sizeof_fmt(complete),
sizeof_fmt(total)))
progress.update_target(name, complete, total)


def create_cb(name, offset=0, multipart_total=None):
def create_cb(name):
""" Create callback function for multipart object """
return (lambda cur, tot: percent_cb(name, cur, tot, offset, multipart_total))
return (lambda cur, tot: percent_cb(name, cur, tot))


class Callback(object):
def __init__(self, name, total):
self.name = name
self.total = total
self.current = 0
self.lock = threading.Lock()

def __call__(self, byts):
with self.lock:
self.current += byts
progress.update_target(self.name, self.current, self.total)


class AWSKey(object):
def __init__(self, bucket, name):
self.name = name
self.bucket = bucket


class DataCloudAWS(DataCloudBase):
""" DataCloud class for Amazon Web Services """
REGEX = r'^s3://(?P<path>.*)$'

def __init__(self, cloud_settings):
super(DataCloudAWS, self).__init__(cloud_settings)
self._aws_creds = AWSCredentials(cloud_settings.cloud_config)

@property
def aws_region_host(self):
""" get the region host needed for s3 access
See notes http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region
"""

region = self._cloud_settings.cloud_config.get(Config.SECTION_AWS_REGION, None)
if region is None or region == '':
return 's3.amazonaws.com'
if region == 'us-east-1':
return 's3.amazonaws.com'
return 's3.%s.amazonaws.com' % region

def credential_paths(self, default):
"""
Try obtaining path to aws credentials from config file.
"""
paths = []
credpath = self._cloud_settings.cloud_config.get(Config.SECTION_AWS_CREDENTIALPATH, None)
if credpath is not None and len(credpath) > 0:
credpath = os.path.expanduser(credpath)
if os.path.isfile(credpath):
paths.append(credpath)
else:
Logger.warn('AWS CredentialPath "%s" not found;'
'falling back to default "%s"' % (credpath, default))
paths.append(default)
else:
paths.append(default)
return paths

def connect(self):
if all([self._aws_creds.access_key_id,
self._aws_creds.secret_access_key,
self.aws_region_host]):
conn = S3Connection(self._aws_creds.access_key_id,
self._aws_creds.secret_access_key,
host=self.aws_region_host)
else:
conn = S3Connection()
self.bucket = conn.lookup(self.storage_bucket)
if self.bucket is None:
self.s3 = boto3.resource('s3')
bucket = self.s3.Bucket(self.storage_bucket)
if bucket is None:
raise DataCloudError('Storage path {} is not setup correctly'.format(self.storage_bucket))

@staticmethod
def _upload_tracker(fname):
"""
File name for upload tracker.
"""
return fname + '.upload'
def create_cb_pull(self, name, key):
total = self.s3.Object(bucket_name=key.bucket, key=key.name).content_length
return Callback(name, total)

@staticmethod
def _download_tracker(fname):
"""
File name for download tracker.
"""
return fname + '.download'
def create_cb_push(self, name, fname):
total = os.path.getsize(fname)
return Callback(name, total)

def _pull_key(self, key, fname, no_progress_bar=False):
Logger.debug("Pulling key '{}' from bucket '{}' to file '{}'".format(key.name,
key.bucket.name,
key.bucket,
fname))
self._makedirs(fname)

Expand All @@ -124,19 +92,18 @@ def _pull_key(self, key, fname, no_progress_bar=False):
Logger.debug('File "{}" matches with "{}".'.format(fname, key.name))
return fname

Logger.debug('Downloading cache file from S3 "{}/{}" to "{}"'.format(key.bucket.name,
Logger.debug('Downloading cache file from S3 "{}/{}" to "{}"'.format(key.bucket,
key.name,
fname))

if no_progress_bar:
cb = None
else:
cb = create_cb(name)
cb = self.create_cb_pull(name, key)


res_h = ResumableDownloadHandler(tracker_file_name=self._download_tracker(tmp_file),
num_retries=10)
try:
key.get_contents_to_filename(tmp_file, cb=cb, res_download_handler=res_h)
self.s3.Object(key.bucket, key.name).download_file(tmp_file, Callback=cb)
except Exception as exc:
Logger.error('Failed to download "{}": {}'.format(key.name, exc))
return None
Expand All @@ -152,122 +119,26 @@ def _pull_key(self, key, fname, no_progress_bar=False):

def _get_key(self, path):
key_name = self.cache_file_key(path)
return self.bucket.get_key(key_name)

def _new_key(self, path):
key_name = self.cache_file_key(path)
return self.bucket.new_key(key_name)

def _write_upload_tracker(self, fname, mp_id):
"""
Write multipart id to upload tracker.
"""
try:
open(self._upload_tracker(fname), 'w+').write(mp_id)
except Exception as exc:
Logger.debug("Failed to write upload tracker file for {}: {}".format(fname, exc))

def _unlink_upload_tracker(self, fname):
"""
Remove upload tracker file.
"""
try:
os.unlink(self._upload_tracker(fname))
except Exception as exc:
Logger.debug("Failed to unlink upload tracker file for {}: {}".format(fname, exc))

def _resume_multipart(self, key, fname):
"""
Try resuming multipart upload.
"""
try:
mp_id = open(self._upload_tracker(fname), 'r').read()
except Exception as exc:
Logger.debug("Failed to read upload tracker file for {}: {}".format(fname, exc))
self.s3.Object(self.storage_bucket, key_name).get()
return AWSKey(self.storage_bucket, key_name)
except botocore.errorfactory.ClientError:
return None

for part in key.bucket.get_all_multipart_uploads():
if part.id != mp_id:
continue

Logger.debug("Found existing multipart {}".format(mp_id))
return part

return None

def _create_multipart(self, key, fname):
"""
Create multipart upload and save info to tracker file.
"""
multipart = key.bucket.initiate_multipart_upload(key.name)
self._write_upload_tracker(fname, multipart.id)
return multipart

def _get_multipart(self, key, fname):
"""
Try resuming multipart upload if supported.
"""
multipart = self._resume_multipart(key, fname)
if multipart != None:
return multipart

return self._create_multipart(key, fname)

@staticmethod
def _skip_part(multipart, part_num, size):
"""
Skip part of multipart upload if it has been already uploaded to the server.
"""
for part in multipart.get_all_parts():
if part.part_number == part_num and part.size == size:# and p.etag and p.last_modified
Logger.debug("Skipping part #{}".format(str(part_num)))
return True
return False

def _push_multipart(self, key, fname):
"""
Upload local file to cloud as a multipart upload.
"""
multipart = self._get_multipart(key, fname)

source_size = os.stat(fname).st_size
chunk_size = 50*1024*1024
chunk_count = int(math.ceil(source_size / float(chunk_size)))

with open(fname, 'rb') as fobj:
for i in range(chunk_count):
offset = i * chunk_size
left = source_size - offset
size = min([chunk_size, left])
part_num = i + 1

if self._skip_part(multipart, part_num, size):
continue

fobj.seek(offset)
name = os.path.relpath(fname, self._cloud_settings.cache.cache_dir)
cb = create_cb(name, offset, source_size)
multipart.upload_part_from_file(fp=fobj,
replace=False,
size=size,
num_cb=100,
part_num=part_num,
cb=cb)

if len(multipart.get_all_parts()) != chunk_count:
raise Exception("Couldn't upload all file parts")

multipart.complete_upload()
self._unlink_upload_tracker(fname)
def _new_key(self, path):
key_name = self.cache_file_key(path)
return AWSKey(self.storage_bucket, key_name)

def _push_key(self, key, path):
""" push, aws version """
name = os.path.relpath(path, self._cloud_settings.cache.cache_dir)
cb = self.create_cb_push(name, path)
try:
self._push_multipart(key, path)
self.s3.Object(key.bucket, key.name).upload_file(path, Callback=cb)
except Exception as exc:
Logger.error('Failed to upload "{}": {}'.format(path, exc))
return None

progress.finish_target(os.path.relpath(path, self._cloud_settings.cache.cache_dir))
progress.finish_target(name)

return path

0 comments on commit 1b69557

Please sign in to comment.