-
-
Notifications
You must be signed in to change notification settings - Fork 620
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add S3 Cache to pants #4589
Add S3 Cache to pants #4589
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
from pants.cache.pinger import BestUrlSelector, Pinger | ||
from pants.cache.resolver import NoopResolver, Resolver, RESTfulResolver | ||
from pants.cache.restful_artifact_cache import RESTfulArtifactCache | ||
from pants.cache.s3_artifact_cache import S3ArtifactCache | ||
from pants.subsystem.subsystem import Subsystem | ||
|
||
|
||
|
@@ -59,7 +60,7 @@ def register_options(cls, register): | |
'instead of skipping them.') | ||
register('--resolver', advanced=True, choices=['none', 'rest'], default='none', | ||
help='Select which resolver strategy to use for discovering URIs that access ' | ||
'artifact caches. none: use URIs from static config options, i.e. ' | ||
'http(s) artifact caches. none: use URIs from static config options, i.e. ' | ||
'--read-from, --write-to. rest: look up URIs by querying a RESTful ' | ||
'URL, which is a remote address from --read-from, --write-to.') | ||
register('--read-from', advanced=True, type=list, default=default_cache, | ||
|
@@ -84,6 +85,12 @@ def register_options(cls, register): | |
help='number of times pinger tries a cache') | ||
register('--write-permissions', advanced=True, type=str, default=None, | ||
help='Permissions to use when writing artifacts to a local cache, in octal.') | ||
register('--s3-credential-file', advanced=True, type=str, | ||
default=os.path.expanduser('~/.pants/.s3credentials'), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a pants_configdir option that this should use instead.
|
||
help='File containing s3 credentials to use for s3 cache, in java properties format.') | ||
register('--s3-profile', advanced=True, type=str, | ||
default=None, | ||
help='Boto profile to use for accessing the S3 cache.') | ||
|
||
@classmethod | ||
def create_cache_factory_for_task(cls, task, pinger=None, resolver=None): | ||
|
@@ -218,9 +225,14 @@ def is_local(string_spec): | |
return string_spec.startswith('/') or string_spec.startswith('~') | ||
|
||
@staticmethod | ||
def is_remote(string_spec): | ||
def _is_s3(string_spec): | ||
return string_spec.startswith('s3://') | ||
|
||
@classmethod | ||
def is_remote(cls, string_spec): | ||
# both artifact cache and resolver use REST, add new protocols here once they are supported | ||
return string_spec.startswith('http://') or string_spec.startswith('https://') | ||
return (string_spec.startswith('http://') or string_spec.startswith('https://') or | ||
cls._is_s3(string_spec)) | ||
|
||
def _baseurl(self, url): | ||
parsed_url = urlparse.urlparse(url) | ||
|
@@ -265,13 +277,25 @@ def create_local_cache(parent_path): | |
dereference=self._options.dereference_symlinks) | ||
|
||
def create_remote_cache(remote_spec, local_cache): | ||
urls = self.get_available_urls(remote_spec.split('|')) | ||
|
||
if len(urls) > 0: | ||
best_url_selector = BestUrlSelector(['{}/{}'.format(url.rstrip('/'), self._stable_name) | ||
for url in urls]) | ||
local_cache = local_cache or TempLocalArtifactCache(artifact_root, compression) | ||
return RESTfulArtifactCache(artifact_root, best_url_selector, local_cache) | ||
urls = remote_spec.split('|') | ||
if len(urls) == 0: | ||
return | ||
|
||
local_cache = local_cache or TempLocalArtifactCache(artifact_root, compression) | ||
if any(map(self._is_s3, urls)): | ||
if len(urls) != 1: | ||
raise InvalidCacheSpecError('S3 Cache only supports a single entry, got: {0}'.format( | ||
remote_spec)) | ||
return S3ArtifactCache( | ||
self._options.s3_credential_file, | ||
self._options.s3_profile, artifact_root, urls[0], local_cache) | ||
|
||
available_urls = self.get_available_urls(urls) | ||
if len(available_urls) == 0: | ||
return | ||
best_url_selector = BestUrlSelector(['{}/{}'.format(url.rstrip('/'), self._stable_name) | ||
for url in available_urls]) | ||
return RESTfulArtifactCache(artifact_root, best_url_selector, local_cache) | ||
|
||
local_cache = create_local_cache(spec.local) if spec.local else None | ||
remote_cache = create_remote_cache(spec.remote, local_cache) if spec.remote else None | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
# coding=utf-8 | ||
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: copyright year |
||
# Licensed under the Apache License, Version 2.0 (see LICENSE). | ||
|
||
from __future__ import (absolute_import, division, generators, nested_scopes, print_function, | ||
unicode_literals, with_statement) | ||
|
||
import logging | ||
import os | ||
|
||
import boto3 | ||
from botocore import exceptions | ||
from botocore.config import Config | ||
from botocore.vendored.requests import ConnectionError, Timeout | ||
from botocore.vendored.requests.packages.urllib3.exceptions import ClosedPoolError | ||
from pyjavaproperties import Properties | ||
from six.moves.urllib.parse import urlparse | ||
|
||
from pants.cache.artifact_cache import ArtifactCache, NonfatalArtifactCacheError, UnreadableArtifact | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
_NETWORK_ERRORS = [ | ||
ConnectionError, Timeout, ClosedPoolError, | ||
exceptions.EndpointConnectionError, exceptions.ChecksumError | ||
] | ||
|
||
def _connect_to_s3(config_file, profile_name): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wrote something similar, Not a problem for this review, though |
||
# Yeah, I know it's gross but it spams the logs without it: | ||
boto3.set_stream_logger(name='boto3.resources', level=logging.WARN) | ||
boto3.set_stream_logger(name='botocore', level=logging.WARN) | ||
|
||
boto_kwargs = {} | ||
if profile_name: | ||
boto_kwargs['profile_name'] = profile_name | ||
|
||
try: | ||
with open(config_file, 'r') as f: | ||
p = Properties() | ||
p.load(f) | ||
|
||
access_key = p.get('accessKey') | ||
if access_key: | ||
logger.debug('Reading access key from {0}'.format(config_file)) | ||
boto_kwargs['aws_access_key_id'] = access_key | ||
|
||
secret_key = p.get('secretKey') | ||
if secret_key: | ||
logger.debug('Reading access key from {0}'.format(config_file)) | ||
boto_kwargs['aws_secret_access_key'] = secret_key | ||
except IOError: | ||
logger.debug('Could not load {0}, using ENV vars'.format(config_file)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add the IOError's message to the log message, so the user can figure out what happened. Also, should this be a |
||
|
||
session = boto3.Session(**boto_kwargs) | ||
config = Config(connect_timeout=4, read_timeout=4) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should these be options? |
||
return session.resource('s3', config=config) | ||
|
||
|
||
_READ_SIZE_BYTES = 4 * 1024 * 1024 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be an option? Or is it just super-standard to read 4MB chunks from S3? |
||
|
||
|
||
def iter_content(body): | ||
while True: | ||
chunk = body.read(_READ_SIZE_BYTES) | ||
if not chunk: | ||
break | ||
yield chunk | ||
|
||
|
||
def _not_found_error(e): | ||
if not isinstance(e, exceptions.ClientError): | ||
return False | ||
return e.response['Error']['Code'] in ('404', 'NoSuchKey') | ||
|
||
|
||
def _network_error(e): | ||
return any(isinstance(e, cls) for cls in _NETWORK_ERRORS) | ||
|
||
_NOT_FOUND = 0 | ||
_NETWORK = 1 | ||
_UNKNOWN = 2 | ||
|
||
|
||
def _log_and_classify_error(e, verb, cache_key): | ||
if _not_found_error(e): | ||
logger.debug('Not Found During {0} {1}'.format(verb, cache_key)) | ||
return _NOT_FOUND | ||
if _network_error(e): | ||
logger.debug('Failed to {0} (network) {1}: {2}'.format(verb, cache_key, str(e))) | ||
return _NETWORK | ||
logger.debug('Failed to {0} (client) {1}: {2}'.format(verb, cache_key, str(e))) | ||
return _UNKNOWN | ||
|
||
|
||
class S3ArtifactCache(ArtifactCache): | ||
"""An artifact cache that stores the artifacts on S3.""" | ||
|
||
def __init__(self, config_file, profile_name, artifact_root, s3_url, local): | ||
""" | ||
:param artifact_root: The path under which cacheable products will be read/written | ||
:param s3_url: URL of the form s3://bucket/path/to/store/artifacts | ||
:param BaseLocalArtifactCache local: local cache instance for storing and creating artifacts | ||
""" | ||
super(S3ArtifactCache, self).__init__(artifact_root) | ||
url = urlparse(s3_url) | ||
self._s3 = _connect_to_s3(config_file, profile_name) | ||
self._path = url.path | ||
if self._path.startswith('/'): | ||
self._path = self._path[1:] | ||
self._localcache = local | ||
self._bucket = url.netloc | ||
|
||
def try_insert(self, cache_key, paths): | ||
logger.debug('Insert {0}'.format(cache_key)) | ||
# Delegate creation of artifacts to the local cache | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: End sentence with a period. |
||
with self._localcache.insert_paths(cache_key, paths) as tarfile: | ||
with open(tarfile, 'rb') as infile: | ||
# Upload artifact to the remote cache. | ||
try: | ||
response = self._get_object(cache_key).put(Body=infile) | ||
response_status = response['ResponseMetadata']['HTTPStatusCode'] | ||
if response_status < 200 or response_status >= 300: | ||
raise NonfatalArtifactCacheError('Failed to PUT (http error) {0}: {1}'.format( | ||
cache_key, response_status)) | ||
except Exception as e: | ||
raise NonfatalArtifactCacheError( | ||
'Failed to PUT (core error) {0}: {1}'.format(cache_key, str(e))) | ||
|
||
def has(self, cache_key): | ||
logger.debug('Has {0}'.format(cache_key)) | ||
if self._localcache.has(cache_key): | ||
return True | ||
try: | ||
self._get_object(cache_key).load() | ||
return True | ||
except Exception as e: | ||
_log_and_classify_error(e, 'HEAD', cache_key) | ||
return False | ||
|
||
def use_cached_files(self, cache_key, results_dir=None): | ||
logger.debug('GET {0}'.format(cache_key)) | ||
if self._localcache.has(cache_key): | ||
return self._localcache.use_cached_files(cache_key, results_dir) | ||
|
||
s3_object = self._get_object(cache_key) | ||
try: | ||
get_result = s3_object.get() | ||
except Exception as e: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Be nice to catch a tighter exception here, but I think staying in sync with what the local_artifact_cache does was probably the right choice. |
||
_log_and_classify_error(e, 'GET', cache_key) | ||
return False | ||
|
||
# Delegate storage and extraction to local cache | ||
body = get_result['Body'] | ||
try: | ||
return self._localcache.store_and_use_artifact( | ||
cache_key, iter_content(body), results_dir) | ||
except Exception as e: | ||
result = _log_and_classify_error(e, 'GET', cache_key) | ||
if result == _UNKNOWN: | ||
return UnreadableArtifact(cache_key, e) | ||
return False | ||
finally: | ||
body.close() | ||
|
||
def delete(self, cache_key): | ||
logger.debug("Delete {0}".format(cache_key)) | ||
self._localcache.delete(cache_key) | ||
try: | ||
self._get_object(cache_key).delete() | ||
except Exception as e: | ||
_log_and_classify_error(e, 'DELETE', cache_key) | ||
|
||
def _get_object(self, cache_key): | ||
return self._s3.Object(self._bucket, self._path_for_key(cache_key)) | ||
|
||
def _path_for_key(self, cache_key): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you move |
||
return '{0}/{1}/{2}.tgz'.format(self._path, cache_key.id, cache_key.hash) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
boto is for sure the way to go - not a concern for me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no problem with this dep.