Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add S3 Cache to pants #4589

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions 3rdparty/python/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
ansicolors==1.0.2
beautifulsoup4>=4.3.2,<4.4
boto3==1.4.4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

boto is for sure the way to go - not a concern for me.

Copy link
Sponsor Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no problem with this dep.

cffi==1.7.0
coverage>=4.3.4,<4.4
docutils>=0.12,<0.13
Expand All @@ -8,13 +9,15 @@ futures==3.0.5
isort==4.2.5
Markdown==2.1.1
mock==2.0.0
moto==0.4.31
packaging==16.8
pathspec==0.5.0
pep8==1.6.2
pex==1.2.6
psutil==4.3.0
pyflakes==1.1.0
Pygments==1.4
pyjavaproperties==0.6
pystache==0.5.3
pytest-cov>=2.4,<2.5
pytest>=3.0.7,<4.0
Expand Down
2 changes: 2 additions & 0 deletions src/python/pants/cache/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

python_library(
dependencies = [
'3rdparty/python:boto3',
'3rdparty/python:pyjavaproperties',
'3rdparty/python:requests',
'3rdparty/python:six',
'src/python/pants/base:deprecated',
Expand Down
44 changes: 34 additions & 10 deletions src/python/pants/cache/cache_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from pants.cache.pinger import BestUrlSelector, Pinger
from pants.cache.resolver import NoopResolver, Resolver, RESTfulResolver
from pants.cache.restful_artifact_cache import RESTfulArtifactCache
from pants.cache.s3_artifact_cache import S3ArtifactCache
from pants.subsystem.subsystem import Subsystem


Expand Down Expand Up @@ -59,7 +60,7 @@ def register_options(cls, register):
'instead of skipping them.')
register('--resolver', advanced=True, choices=['none', 'rest'], default='none',
help='Select which resolver strategy to use for discovering URIs that access '
'artifact caches. none: use URIs from static config options, i.e. '
'http(s) artifact caches. none: use URIs from static config options, i.e. '
'--read-from, --write-to. rest: look up URIs by querying a RESTful '
'URL, which is a remote address from --read-from, --write-to.')
register('--read-from', advanced=True, type=list, default=default_cache,
Expand All @@ -84,6 +85,12 @@ def register_options(cls, register):
help='number of times pinger tries a cache')
register('--write-permissions', advanced=True, type=str, default=None,
help='Permissions to use when writing artifacts to a local cache, in octal.')
register('--s3-credential-file', advanced=True, type=str,
default=os.path.expanduser('~/.pants/.s3credentials'),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a pants_configdir option that this should use instead.

(Pdb) self.get_options().pants_configdir
'/Users/mateo/.config/pants'

help='File containing s3 credentials to use for s3 cache, in java properties format.')
register('--s3-profile', advanced=True, type=str,
default=None,
help='Boto profile to use for accessing the S3 cache.')

@classmethod
def create_cache_factory_for_task(cls, task, pinger=None, resolver=None):
Expand Down Expand Up @@ -218,9 +225,14 @@ def is_local(string_spec):
return string_spec.startswith('/') or string_spec.startswith('~')

@staticmethod
def is_remote(string_spec):
def _is_s3(string_spec):
return string_spec.startswith('s3://')

@classmethod
def is_remote(cls, string_spec):
# both artifact cache and resolver use REST, add new protocols here once they are supported
return string_spec.startswith('http://') or string_spec.startswith('https://')
return (string_spec.startswith('http://') or string_spec.startswith('https://') or
cls._is_s3(string_spec))

def _baseurl(self, url):
parsed_url = urlparse.urlparse(url)
Expand Down Expand Up @@ -265,13 +277,25 @@ def create_local_cache(parent_path):
dereference=self._options.dereference_symlinks)

def create_remote_cache(remote_spec, local_cache):
urls = self.get_available_urls(remote_spec.split('|'))

if len(urls) > 0:
best_url_selector = BestUrlSelector(['{}/{}'.format(url.rstrip('/'), self._stable_name)
for url in urls])
local_cache = local_cache or TempLocalArtifactCache(artifact_root, compression)
return RESTfulArtifactCache(artifact_root, best_url_selector, local_cache)
urls = remote_spec.split('|')
if len(urls) == 0:
return

local_cache = local_cache or TempLocalArtifactCache(artifact_root, compression)
if any(map(self._is_s3, urls)):
if len(urls) != 1:
raise InvalidCacheSpecError('S3 Cache only supports a single entry, got: {0}'.format(
remote_spec))
return S3ArtifactCache(
self._options.s3_credential_file,
self._options.s3_profile, artifact_root, urls[0], local_cache)

available_urls = self.get_available_urls(urls)
if len(available_urls) == 0:
return
best_url_selector = BestUrlSelector(['{}/{}'.format(url.rstrip('/'), self._stable_name)
for url in available_urls])
return RESTfulArtifactCache(artifact_root, best_url_selector, local_cache)

local_cache = create_local_cache(spec.local) if spec.local else None
remote_cache = create_remote_cache(spec.remote, local_cache) if spec.remote else None
Expand Down
178 changes: 178 additions & 0 deletions src/python/pants/cache/s3_artifact_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# coding=utf-8
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: copyright year

# Licensed under the Apache License, Version 2.0 (see LICENSE).

from __future__ import (absolute_import, division, generators, nested_scopes, print_function,
unicode_literals, with_statement)

import logging
import os

import boto3
from botocore import exceptions
from botocore.config import Config
from botocore.vendored.requests import ConnectionError, Timeout
from botocore.vendored.requests.packages.urllib3.exceptions import ClosedPoolError
from pyjavaproperties import Properties
from six.moves.urllib.parse import urlparse

from pants.cache.artifact_cache import ArtifactCache, NonfatalArtifactCacheError, UnreadableArtifact


logger = logging.getLogger(__name__)

_NETWORK_ERRORS = [
ConnectionError, Timeout, ClosedPoolError,
exceptions.EndpointConnectionError, exceptions.ChecksumError
]

def _connect_to_s3(config_file, profile_name):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote something similar, android_config_util.py, as an intern - at some point there should be a general interface for non-Pants config reads for secrets like these.

Not a problem for this review, though

# Yeah, I know it's gross but it spams the logs without it:
boto3.set_stream_logger(name='boto3.resources', level=logging.WARN)
boto3.set_stream_logger(name='botocore', level=logging.WARN)

boto_kwargs = {}
if profile_name:
boto_kwargs['profile_name'] = profile_name

try:
with open(config_file, 'r') as f:
p = Properties()
p.load(f)

access_key = p.get('accessKey')
if access_key:
logger.debug('Reading access key from {0}'.format(config_file))
boto_kwargs['aws_access_key_id'] = access_key

secret_key = p.get('secretKey')
if secret_key:
logger.debug('Reading access key from {0}'.format(config_file))
boto_kwargs['aws_secret_access_key'] = secret_key
except IOError:
logger.debug('Could not load {0}, using ENV vars'.format(config_file))
Copy link
Sponsor Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add the IOError's message to the log message, so the user can figure out what happened.

Also, should this be a warn? Or is this expected behavior in some cases? If the latter, it seems like asking for trouble to be unable to distinguish between expected behavior and an error... Perhaps we should expect that the config_file, if not None, must be readable?


session = boto3.Session(**boto_kwargs)
config = Config(connect_timeout=4, read_timeout=4)
Copy link
Sponsor Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be options?

return session.resource('s3', config=config)


_READ_SIZE_BYTES = 4 * 1024 * 1024
Copy link
Sponsor Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an option? Or is it just super-standard to read 4MB chunks from S3?



def iter_content(body):
while True:
chunk = body.read(_READ_SIZE_BYTES)
if not chunk:
break
yield chunk


def _not_found_error(e):
if not isinstance(e, exceptions.ClientError):
return False
return e.response['Error']['Code'] in ('404', 'NoSuchKey')


def _network_error(e):
return any(isinstance(e, cls) for cls in _NETWORK_ERRORS)

_NOT_FOUND = 0
_NETWORK = 1
_UNKNOWN = 2


def _log_and_classify_error(e, verb, cache_key):
if _not_found_error(e):
logger.debug('Not Found During {0} {1}'.format(verb, cache_key))
return _NOT_FOUND
if _network_error(e):
logger.debug('Failed to {0} (network) {1}: {2}'.format(verb, cache_key, str(e)))
return _NETWORK
logger.debug('Failed to {0} (client) {1}: {2}'.format(verb, cache_key, str(e)))
return _UNKNOWN


class S3ArtifactCache(ArtifactCache):
"""An artifact cache that stores the artifacts on S3."""

def __init__(self, config_file, profile_name, artifact_root, s3_url, local):
"""
:param artifact_root: The path under which cacheable products will be read/written
:param s3_url: URL of the form s3://bucket/path/to/store/artifacts
:param BaseLocalArtifactCache local: local cache instance for storing and creating artifacts
"""
super(S3ArtifactCache, self).__init__(artifact_root)
url = urlparse(s3_url)
self._s3 = _connect_to_s3(config_file, profile_name)
self._path = url.path
if self._path.startswith('/'):
self._path = self._path[1:]
self._localcache = local
self._bucket = url.netloc

def try_insert(self, cache_key, paths):
logger.debug('Insert {0}'.format(cache_key))
# Delegate creation of artifacts to the local cache
Copy link
Sponsor Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: End sentence with a period.

with self._localcache.insert_paths(cache_key, paths) as tarfile:
with open(tarfile, 'rb') as infile:
# Upload artifact to the remote cache.
try:
response = self._get_object(cache_key).put(Body=infile)
response_status = response['ResponseMetadata']['HTTPStatusCode']
if response_status < 200 or response_status >= 300:
raise NonfatalArtifactCacheError('Failed to PUT (http error) {0}: {1}'.format(
cache_key, response_status))
except Exception as e:
raise NonfatalArtifactCacheError(
'Failed to PUT (core error) {0}: {1}'.format(cache_key, str(e)))

def has(self, cache_key):
logger.debug('Has {0}'.format(cache_key))
if self._localcache.has(cache_key):
return True
try:
self._get_object(cache_key).load()
return True
except Exception as e:
_log_and_classify_error(e, 'HEAD', cache_key)
return False

def use_cached_files(self, cache_key, results_dir=None):
logger.debug('GET {0}'.format(cache_key))
if self._localcache.has(cache_key):
return self._localcache.use_cached_files(cache_key, results_dir)

s3_object = self._get_object(cache_key)
try:
get_result = s3_object.get()
except Exception as e:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be nice to catch a tighter exception here, but I think staying in sync with what the local_artifact_cache does was probably the right choice.

_log_and_classify_error(e, 'GET', cache_key)
return False

# Delegate storage and extraction to local cache
body = get_result['Body']
try:
return self._localcache.store_and_use_artifact(
cache_key, iter_content(body), results_dir)
except Exception as e:
result = _log_and_classify_error(e, 'GET', cache_key)
if result == _UNKNOWN:
return UnreadableArtifact(cache_key, e)
return False
finally:
body.close()

def delete(self, cache_key):
logger.debug("Delete {0}".format(cache_key))
self._localcache.delete(cache_key)
try:
self._get_object(cache_key).delete()
except Exception as e:
_log_and_classify_error(e, 'DELETE', cache_key)

def _get_object(self, cache_key):
return self._s3.Object(self._bucket, self._path_for_key(cache_key))

def _path_for_key(self, cache_key):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move _cache_file_for_key up to the base class instead please?

return '{0}/{1}/{2}.tgz'.format(self._path, cache_key.id, cache_key.hash)
13 changes: 13 additions & 0 deletions tests/python/pants_test/cache/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,19 @@ python_tests(
]
)

python_tests(
name = 's3_artifact_cache',
sources = ['test_s3_artifact_cache.py'],
dependencies = [
'3rdparty/python:boto3',
'3rdparty/python:moto',
'src/python/pants/cache',
'src/python/pants/invalidation',
'src/python/pants/util:contextutil',
'src/python/pants/util:dirutil',
]
)

python_tests(
name = 'cache_cleanup_integration',
sources = ['test_cache_cleanup_integration.py'],
Expand Down
10 changes: 10 additions & 0 deletions tests/python/pants_test/cache/test_cache_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from pants.cache.local_artifact_cache import LocalArtifactCache
from pants.cache.resolver import Resolver
from pants.cache.restful_artifact_cache import RESTfulArtifactCache
from pants.cache.s3_artifact_cache import S3ArtifactCache
from pants.subsystem.subsystem import Subsystem
from pants.task.task import Task
from pants.util.contextutil import temporary_dir
Expand Down Expand Up @@ -164,6 +165,9 @@ def check(expected_type, spec, resolver=None):
check(RESTfulArtifactCache, [cachedir, 'http://localhost/bar'])
check(RESTfulArtifactCache, [cachedir, 'http://localhost/bar'], resolver=self.resolver)

check(S3ArtifactCache, ['s3://some-bucket/bar'])
check(S3ArtifactCache, [cachedir, 's3://some-bucket/bar'])

with self.assertRaises(CacheSpecFormatError):
mk_cache(['foo'])

Expand All @@ -179,6 +183,12 @@ def check(expected_type, spec, resolver=None):
with self.assertRaises(TooManyCacheSpecsError):
mk_cache([tmpdir, self.REMOTE_URI_1, self.REMOTE_URI_2])

with self.assertRaises(InvalidCacheSpecError):
mk_cache(['s3://some-bucket/bar|http://localhost/bar'])

with self.assertRaises(InvalidCacheSpecError):
mk_cache(['s3://some-bucket/bar|s3://some-other-bucket/foo'])

def test_read_cache_available(self):
self.assertEquals(None, self.cache_factory.read_cache_available())

Expand Down
Loading