Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

boto3 and local config #651

Merged
merged 2 commits into from
Apr 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ build: false
before_test:
- aws configure set aws_access_key_id "%aws_access_key_id%"
- aws configure set aws_secret_access_key "%aws_secret_access_key%"
- aws configure set region us-east-2
- openssl enc -d -aes-256-cbc -md md5 -k "%GCP_CREDS%" -in scripts\ci\gcp-creds.json.enc -out scripts\ci\gcp-creds.json & exit 0
- pip install -r test-requirements.txt

Expand Down
20 changes: 20 additions & 0 deletions dvc/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ def parse_args(argv=None):
nargs='?',
default=None,
help='Option value')
config_parser.add_argument(
'--local',
action='store_true',
default=False,
help='Use local config')
config_parser.set_defaults(func=CmdConfig)


Expand All @@ -262,6 +267,11 @@ def parse_args(argv=None):
remote_add_parser.add_argument(
'url',
help='Url')
remote_add_parser.add_argument(
'--local',
action='store_true',
default=False,
help='Use local config')
remote_add_parser.set_defaults(func=CmdRemoteAdd)


Expand All @@ -272,6 +282,11 @@ def parse_args(argv=None):
remote_remove_parser.add_argument(
'name',
help='Name')
remote_remove_parser.add_argument(
'--local',
action='store_true',
default=False,
help='Use local config')
remote_remove_parser.set_defaults(func=CmdRemoteRemove)


Expand All @@ -294,6 +309,11 @@ def parse_args(argv=None):
default=False,
action='store_true',
help='Unset option')
remote_modify_parser.add_argument(
'--local',
action='store_true',
default=False,
help='Use local config')
remote_modify_parser.set_defaults(func=CmdRemoteModify)


Expand Down
227 changes: 49 additions & 178 deletions dvc/cloud/aws.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import os
import math
import threading

from boto.s3.connection import S3Connection
import boto3
import botocore
try:
import httplib
except ImportError:
Expand All @@ -15,7 +17,6 @@
from dvc.config import Config
from dvc.logger import Logger
from dvc.progress import progress
from dvc.cloud.credentials_aws import AWSCredentials
from dvc.cloud.base import DataCloudError, DataCloudBase


Expand All @@ -28,92 +29,59 @@ def sizeof_fmt(num, suffix='B'):
return "%.1f%s%s" % (num, 'Y', suffix)


def percent_cb(name, part_complete, part_total, offset=0, multipart_total=None):
def percent_cb(name, complete, total):
""" Callback for updating target progress """
complete = offset + part_complete
total = multipart_total if multipart_total != None else part_total

Logger.debug('{}: {} transferred out of {}'.format(name,
sizeof_fmt(complete),
sizeof_fmt(total)))
progress.update_target(name, complete, total)


def create_cb(name, offset=0, multipart_total=None):
def create_cb(name):
""" Create callback function for multipart object """
return (lambda cur, tot: percent_cb(name, cur, tot, offset, multipart_total))
return (lambda cur, tot: percent_cb(name, cur, tot))


class Callback(object):
def __init__(self, name, total):
self.name = name
self.total = total
self.current = 0
self.lock = threading.Lock()

def __call__(self, byts):
with self.lock:
self.current += byts
progress.update_target(self.name, self.current, self.total)


class AWSKey(object):
def __init__(self, bucket, name):
self.name = name
self.bucket = bucket


class DataCloudAWS(DataCloudBase):
""" DataCloud class for Amazon Web Services """
REGEX = r'^s3://(?P<path>.*)$'

def __init__(self, cloud_settings):
super(DataCloudAWS, self).__init__(cloud_settings)
self._aws_creds = AWSCredentials(cloud_settings.cloud_config)

@property
def aws_region_host(self):
""" get the region host needed for s3 access

See notes http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region
"""

region = self._cloud_settings.cloud_config.get(Config.SECTION_AWS_REGION, None)
if region is None or region == '':
return 's3.amazonaws.com'
if region == 'us-east-1':
return 's3.amazonaws.com'
return 's3.%s.amazonaws.com' % region

def credential_paths(self, default):
"""
Try obtaining path to aws credentials from config file.
"""
paths = []
credpath = self._cloud_settings.cloud_config.get(Config.SECTION_AWS_CREDENTIALPATH, None)
if credpath is not None and len(credpath) > 0:
credpath = os.path.expanduser(credpath)
if os.path.isfile(credpath):
paths.append(credpath)
else:
Logger.warn('AWS CredentialPath "%s" not found;'
'falling back to default "%s"' % (credpath, default))
paths.append(default)
else:
paths.append(default)
return paths

def connect(self):
if all([self._aws_creds.access_key_id,
self._aws_creds.secret_access_key,
self.aws_region_host]):
conn = S3Connection(self._aws_creds.access_key_id,
self._aws_creds.secret_access_key,
host=self.aws_region_host)
else:
conn = S3Connection()
self.bucket = conn.lookup(self.storage_bucket)
if self.bucket is None:
self.s3 = boto3.resource('s3')
bucket = self.s3.Bucket(self.storage_bucket)
if bucket is None:
raise DataCloudError('Storage path {} is not setup correctly'.format(self.storage_bucket))

@staticmethod
def _upload_tracker(fname):
"""
File name for upload tracker.
"""
return fname + '.upload'
def create_cb_pull(self, name, key):
total = self.s3.Object(bucket_name=key.bucket, key=key.name).content_length
return Callback(name, total)

@staticmethod
def _download_tracker(fname):
"""
File name for download tracker.
"""
return fname + '.download'
def create_cb_push(self, name, fname):
total = os.path.getsize(fname)
return Callback(name, total)

def _pull_key(self, key, fname, no_progress_bar=False):
Logger.debug("Pulling key '{}' from bucket '{}' to file '{}'".format(key.name,
key.bucket.name,
key.bucket,
fname))
self._makedirs(fname)

Expand All @@ -124,19 +92,18 @@ def _pull_key(self, key, fname, no_progress_bar=False):
Logger.debug('File "{}" matches with "{}".'.format(fname, key.name))
return fname

Logger.debug('Downloading cache file from S3 "{}/{}" to "{}"'.format(key.bucket.name,
Logger.debug('Downloading cache file from S3 "{}/{}" to "{}"'.format(key.bucket,
key.name,
fname))

if no_progress_bar:
cb = None
else:
cb = create_cb(name)
cb = self.create_cb_pull(name, key)


res_h = ResumableDownloadHandler(tracker_file_name=self._download_tracker(tmp_file),
num_retries=10)
try:
key.get_contents_to_filename(tmp_file, cb=cb, res_download_handler=res_h)
self.s3.Object(key.bucket, key.name).download_file(tmp_file, Callback=cb)
except Exception as exc:
Logger.error('Failed to download "{}": {}'.format(key.name, exc))
return None
Expand All @@ -152,122 +119,26 @@ def _pull_key(self, key, fname, no_progress_bar=False):

def _get_key(self, path):
key_name = self.cache_file_key(path)
return self.bucket.get_key(key_name)

def _new_key(self, path):
key_name = self.cache_file_key(path)
return self.bucket.new_key(key_name)

def _write_upload_tracker(self, fname, mp_id):
"""
Write multipart id to upload tracker.
"""
try:
open(self._upload_tracker(fname), 'w+').write(mp_id)
except Exception as exc:
Logger.debug("Failed to write upload tracker file for {}: {}".format(fname, exc))

def _unlink_upload_tracker(self, fname):
"""
Remove upload tracker file.
"""
try:
os.unlink(self._upload_tracker(fname))
except Exception as exc:
Logger.debug("Failed to unlink upload tracker file for {}: {}".format(fname, exc))

def _resume_multipart(self, key, fname):
"""
Try resuming multipart upload.
"""
try:
mp_id = open(self._upload_tracker(fname), 'r').read()
except Exception as exc:
Logger.debug("Failed to read upload tracker file for {}: {}".format(fname, exc))
self.s3.Object(self.storage_bucket, key_name).get()
return AWSKey(self.storage_bucket, key_name)
except botocore.errorfactory.ClientError:
return None

for part in key.bucket.get_all_multipart_uploads():
if part.id != mp_id:
continue

Logger.debug("Found existing multipart {}".format(mp_id))
return part

return None

def _create_multipart(self, key, fname):
"""
Create multipart upload and save info to tracker file.
"""
multipart = key.bucket.initiate_multipart_upload(key.name)
self._write_upload_tracker(fname, multipart.id)
return multipart

def _get_multipart(self, key, fname):
"""
Try resuming multipart upload if supported.
"""
multipart = self._resume_multipart(key, fname)
if multipart != None:
return multipart

return self._create_multipart(key, fname)

@staticmethod
def _skip_part(multipart, part_num, size):
"""
Skip part of multipart upload if it has been already uploaded to the server.
"""
for part in multipart.get_all_parts():
if part.part_number == part_num and part.size == size:# and p.etag and p.last_modified
Logger.debug("Skipping part #{}".format(str(part_num)))
return True
return False

def _push_multipart(self, key, fname):
"""
Upload local file to cloud as a multipart upload.
"""
multipart = self._get_multipart(key, fname)

source_size = os.stat(fname).st_size
chunk_size = 50*1024*1024
chunk_count = int(math.ceil(source_size / float(chunk_size)))

with open(fname, 'rb') as fobj:
for i in range(chunk_count):
offset = i * chunk_size
left = source_size - offset
size = min([chunk_size, left])
part_num = i + 1

if self._skip_part(multipart, part_num, size):
continue

fobj.seek(offset)
name = os.path.relpath(fname, self._cloud_settings.cache.cache_dir)
cb = create_cb(name, offset, source_size)
multipart.upload_part_from_file(fp=fobj,
replace=False,
size=size,
num_cb=100,
part_num=part_num,
cb=cb)

if len(multipart.get_all_parts()) != chunk_count:
raise Exception("Couldn't upload all file parts")

multipart.complete_upload()
self._unlink_upload_tracker(fname)
def _new_key(self, path):
key_name = self.cache_file_key(path)
return AWSKey(self.storage_bucket, key_name)

def _push_key(self, key, path):
""" push, aws version """
name = os.path.relpath(path, self._cloud_settings.cache.cache_dir)
cb = self.create_cb_push(name, path)
try:
self._push_multipart(key, path)
self.s3.Object(key.bucket, key.name).upload_file(path, Callback=cb)
except Exception as exc:
Logger.error('Failed to upload "{}": {}'.format(path, exc))
return None

progress.finish_target(os.path.relpath(path, self._cloud_settings.cache.cache_dir))
progress.finish_target(name)

return path