diff --git a/3rdparty/python/requirements.txt b/3rdparty/python/requirements.txt index e31739052c1..4a5936c777a 100644 --- a/3rdparty/python/requirements.txt +++ b/3rdparty/python/requirements.txt @@ -1,5 +1,6 @@ ansicolors==1.0.2 beautifulsoup4>=4.3.2,<4.4 +boto3==1.4.4 cffi==1.7.0 coverage>=4.3.4,<4.4 docutils>=0.12,<0.13 @@ -8,6 +9,7 @@ futures==3.0.5 isort==4.2.5 Markdown==2.1.1 mock==2.0.0 +moto==0.4.31 packaging==16.8 pathspec==0.5.0 pep8==1.6.2 @@ -15,6 +17,7 @@ pex==1.2.6 psutil==4.3.0 pyflakes==1.1.0 Pygments==1.4 +pyjavaproperties==0.6 pystache==0.5.3 pytest-cov>=2.4,<2.5 pytest>=3.0.7,<4.0 diff --git a/src/python/pants/cache/BUILD b/src/python/pants/cache/BUILD index ef33925985f..c52bbf96bdc 100644 --- a/src/python/pants/cache/BUILD +++ b/src/python/pants/cache/BUILD @@ -3,6 +3,8 @@ python_library( dependencies = [ + '3rdparty/python:boto3', + '3rdparty/python:pyjavaproperties', '3rdparty/python:requests', '3rdparty/python:six', 'src/python/pants/base:deprecated', diff --git a/src/python/pants/cache/cache_setup.py b/src/python/pants/cache/cache_setup.py index baf3ba3fe4e..50aae36a0b8 100644 --- a/src/python/pants/cache/cache_setup.py +++ b/src/python/pants/cache/cache_setup.py @@ -19,6 +19,7 @@ from pants.cache.pinger import BestUrlSelector, Pinger from pants.cache.resolver import NoopResolver, Resolver, RESTfulResolver from pants.cache.restful_artifact_cache import RESTfulArtifactCache +from pants.cache.s3_artifact_cache import S3ArtifactCache from pants.subsystem.subsystem import Subsystem @@ -59,7 +60,7 @@ def register_options(cls, register): 'instead of skipping them.') register('--resolver', advanced=True, choices=['none', 'rest'], default='none', help='Select which resolver strategy to use for discovering URIs that access ' - 'artifact caches. none: use URIs from static config options, i.e. ' + 'http(s) artifact caches. none: use URIs from static config options, i.e. ' '--read-from, --write-to. rest: look up URIs by querying a RESTful ' 'URL, which is a remote address from --read-from, --write-to.') register('--read-from', advanced=True, type=list, default=default_cache, @@ -84,6 +85,12 @@ def register_options(cls, register): help='number of times pinger tries a cache') register('--write-permissions', advanced=True, type=str, default=None, help='Permissions to use when writing artifacts to a local cache, in octal.') + register('--s3-credential-file', advanced=True, type=str, + default=os.path.expanduser('~/.pants/.s3credentials'), + help='File containing s3 credentials to use for s3 cache, in java properties format.') + register('--s3-profile', advanced=True, type=str, + default=None, + help='Boto profile to use for accessing the S3 cache.') @classmethod def create_cache_factory_for_task(cls, task, pinger=None, resolver=None): @@ -218,9 +225,14 @@ def is_local(string_spec): return string_spec.startswith('/') or string_spec.startswith('~') @staticmethod - def is_remote(string_spec): + def _is_s3(string_spec): + return string_spec.startswith('s3://') + + @classmethod + def is_remote(cls, string_spec): # both artifact cache and resolver use REST, add new protocols here once they are supported - return string_spec.startswith('http://') or string_spec.startswith('https://') + return (string_spec.startswith('http://') or string_spec.startswith('https://') or + cls._is_s3(string_spec)) def _baseurl(self, url): parsed_url = urlparse.urlparse(url) @@ -265,13 +277,25 @@ def create_local_cache(parent_path): dereference=self._options.dereference_symlinks) def create_remote_cache(remote_spec, local_cache): - urls = self.get_available_urls(remote_spec.split('|')) - - if len(urls) > 0: - best_url_selector = BestUrlSelector(['{}/{}'.format(url.rstrip('/'), self._stable_name) - for url in urls]) - local_cache = local_cache or TempLocalArtifactCache(artifact_root, compression) - return RESTfulArtifactCache(artifact_root, best_url_selector, local_cache) + urls = remote_spec.split('|') + if len(urls) == 0: + return + + local_cache = local_cache or TempLocalArtifactCache(artifact_root, compression) + if any(map(self._is_s3, urls)): + if len(urls) != 1: + raise InvalidCacheSpecError('S3 Cache only supports a single entry, got: {0}'.format( + remote_spec)) + return S3ArtifactCache( + self._options.s3_credential_file, + self._options.s3_profile, artifact_root, urls[0], local_cache) + + available_urls = self.get_available_urls(urls) + if len(available_urls) == 0: + return + best_url_selector = BestUrlSelector(['{}/{}'.format(url.rstrip('/'), self._stable_name) + for url in available_urls]) + return RESTfulArtifactCache(artifact_root, best_url_selector, local_cache) local_cache = create_local_cache(spec.local) if spec.local else None remote_cache = create_remote_cache(spec.remote, local_cache) if spec.remote else None diff --git a/src/python/pants/cache/s3_artifact_cache.py b/src/python/pants/cache/s3_artifact_cache.py new file mode 100644 index 00000000000..1898d5023cd --- /dev/null +++ b/src/python/pants/cache/s3_artifact_cache.py @@ -0,0 +1,178 @@ +# coding=utf-8 +# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md). +# Licensed under the Apache License, Version 2.0 (see LICENSE). + +from __future__ import (absolute_import, division, generators, nested_scopes, print_function, + unicode_literals, with_statement) + +import logging +import os + +import boto3 +from botocore import exceptions +from botocore.config import Config +from botocore.vendored.requests import ConnectionError, Timeout +from botocore.vendored.requests.packages.urllib3.exceptions import ClosedPoolError +from pyjavaproperties import Properties +from six.moves.urllib.parse import urlparse + +from pants.cache.artifact_cache import ArtifactCache, NonfatalArtifactCacheError, UnreadableArtifact + + +logger = logging.getLogger(__name__) + +_NETWORK_ERRORS = [ + ConnectionError, Timeout, ClosedPoolError, + exceptions.EndpointConnectionError, exceptions.ChecksumError +] + +def _connect_to_s3(config_file, profile_name): + # Yeah, I know it's gross but it spams the logs without it: + boto3.set_stream_logger(name='boto3.resources', level=logging.WARN) + boto3.set_stream_logger(name='botocore', level=logging.WARN) + + boto_kwargs = {} + if profile_name: + boto_kwargs['profile_name'] = profile_name + + try: + with open(config_file, 'r') as f: + p = Properties() + p.load(f) + + access_key = p.get('accessKey') + if access_key: + logger.debug('Reading access key from {0}'.format(config_file)) + boto_kwargs['aws_access_key_id'] = access_key + + secret_key = p.get('secretKey') + if secret_key: + logger.debug('Reading access key from {0}'.format(config_file)) + boto_kwargs['aws_secret_access_key'] = secret_key + except IOError: + logger.debug('Could not load {0}, using ENV vars'.format(config_file)) + + session = boto3.Session(**boto_kwargs) + config = Config(connect_timeout=4, read_timeout=4) + return session.resource('s3', config=config) + + +_READ_SIZE_BYTES = 4 * 1024 * 1024 + + +def iter_content(body): + while True: + chunk = body.read(_READ_SIZE_BYTES) + if not chunk: + break + yield chunk + + +def _not_found_error(e): + if not isinstance(e, exceptions.ClientError): + return False + return e.response['Error']['Code'] in ('404', 'NoSuchKey') + + +def _network_error(e): + return any(isinstance(e, cls) for cls in _NETWORK_ERRORS) + +_NOT_FOUND = 0 +_NETWORK = 1 +_UNKNOWN = 2 + + +def _log_and_classify_error(e, verb, cache_key): + if _not_found_error(e): + logger.debug('Not Found During {0} {1}'.format(verb, cache_key)) + return _NOT_FOUND + if _network_error(e): + logger.debug('Failed to {0} (network) {1}: {2}'.format(verb, cache_key, str(e))) + return _NETWORK + logger.debug('Failed to {0} (client) {1}: {2}'.format(verb, cache_key, str(e))) + return _UNKNOWN + + +class S3ArtifactCache(ArtifactCache): + """An artifact cache that stores the artifacts on S3.""" + + def __init__(self, config_file, profile_name, artifact_root, s3_url, local): + """ + :param artifact_root: The path under which cacheable products will be read/written + :param s3_url: URL of the form s3://bucket/path/to/store/artifacts + :param BaseLocalArtifactCache local: local cache instance for storing and creating artifacts + """ + super(S3ArtifactCache, self).__init__(artifact_root) + url = urlparse(s3_url) + self._s3 = _connect_to_s3(config_file, profile_name) + self._path = url.path + if self._path.startswith('/'): + self._path = self._path[1:] + self._localcache = local + self._bucket = url.netloc + + def try_insert(self, cache_key, paths): + logger.debug('Insert {0}'.format(cache_key)) + # Delegate creation of artifacts to the local cache + with self._localcache.insert_paths(cache_key, paths) as tarfile: + with open(tarfile, 'rb') as infile: + # Upload artifact to the remote cache. + try: + response = self._get_object(cache_key).put(Body=infile) + response_status = response['ResponseMetadata']['HTTPStatusCode'] + if response_status < 200 or response_status >= 300: + raise NonfatalArtifactCacheError('Failed to PUT (http error) {0}: {1}'.format( + cache_key, response_status)) + except Exception as e: + raise NonfatalArtifactCacheError( + 'Failed to PUT (core error) {0}: {1}'.format(cache_key, str(e))) + + def has(self, cache_key): + logger.debug('Has {0}'.format(cache_key)) + if self._localcache.has(cache_key): + return True + try: + self._get_object(cache_key).load() + return True + except Exception as e: + _log_and_classify_error(e, 'HEAD', cache_key) + return False + + def use_cached_files(self, cache_key, results_dir=None): + logger.debug('GET {0}'.format(cache_key)) + if self._localcache.has(cache_key): + return self._localcache.use_cached_files(cache_key, results_dir) + + s3_object = self._get_object(cache_key) + try: + get_result = s3_object.get() + except Exception as e: + _log_and_classify_error(e, 'GET', cache_key) + return False + + # Delegate storage and extraction to local cache + body = get_result['Body'] + try: + return self._localcache.store_and_use_artifact( + cache_key, iter_content(body), results_dir) + except Exception as e: + result = _log_and_classify_error(e, 'GET', cache_key) + if result == _UNKNOWN: + return UnreadableArtifact(cache_key, e) + return False + finally: + body.close() + + def delete(self, cache_key): + logger.debug("Delete {0}".format(cache_key)) + self._localcache.delete(cache_key) + try: + self._get_object(cache_key).delete() + except Exception as e: + _log_and_classify_error(e, 'DELETE', cache_key) + + def _get_object(self, cache_key): + return self._s3.Object(self._bucket, self._path_for_key(cache_key)) + + def _path_for_key(self, cache_key): + return '{0}/{1}/{2}.tgz'.format(self._path, cache_key.id, cache_key.hash) diff --git a/tests/python/pants_test/cache/BUILD b/tests/python/pants_test/cache/BUILD index 2830c88b5c6..e253bbeb02e 100644 --- a/tests/python/pants_test/cache/BUILD +++ b/tests/python/pants_test/cache/BUILD @@ -100,6 +100,19 @@ python_tests( ] ) +python_tests( + name = 's3_artifact_cache', + sources = ['test_s3_artifact_cache.py'], + dependencies = [ + '3rdparty/python:boto3', + '3rdparty/python:moto', + 'src/python/pants/cache', + 'src/python/pants/invalidation', + 'src/python/pants/util:contextutil', + 'src/python/pants/util:dirutil', + ] +) + python_tests( name = 'cache_cleanup_integration', sources = ['test_cache_cleanup_integration.py'], diff --git a/tests/python/pants_test/cache/test_cache_setup.py b/tests/python/pants_test/cache/test_cache_setup.py index 2a85331bbfa..1eee8409d07 100644 --- a/tests/python/pants_test/cache/test_cache_setup.py +++ b/tests/python/pants_test/cache/test_cache_setup.py @@ -16,6 +16,7 @@ from pants.cache.local_artifact_cache import LocalArtifactCache from pants.cache.resolver import Resolver from pants.cache.restful_artifact_cache import RESTfulArtifactCache +from pants.cache.s3_artifact_cache import S3ArtifactCache from pants.subsystem.subsystem import Subsystem from pants.task.task import Task from pants.util.contextutil import temporary_dir @@ -164,6 +165,9 @@ def check(expected_type, spec, resolver=None): check(RESTfulArtifactCache, [cachedir, 'http://localhost/bar']) check(RESTfulArtifactCache, [cachedir, 'http://localhost/bar'], resolver=self.resolver) + check(S3ArtifactCache, ['s3://some-bucket/bar']) + check(S3ArtifactCache, [cachedir, 's3://some-bucket/bar']) + with self.assertRaises(CacheSpecFormatError): mk_cache(['foo']) @@ -179,6 +183,12 @@ def check(expected_type, spec, resolver=None): with self.assertRaises(TooManyCacheSpecsError): mk_cache([tmpdir, self.REMOTE_URI_1, self.REMOTE_URI_2]) + with self.assertRaises(InvalidCacheSpecError): + mk_cache(['s3://some-bucket/bar|http://localhost/bar']) + + with self.assertRaises(InvalidCacheSpecError): + mk_cache(['s3://some-bucket/bar|s3://some-other-bucket/foo']) + def test_read_cache_available(self): self.assertEquals(None, self.cache_factory.read_cache_available()) diff --git a/tests/python/pants_test/cache/test_s3_artifact_cache.py b/tests/python/pants_test/cache/test_s3_artifact_cache.py new file mode 100644 index 00000000000..49b978aea69 --- /dev/null +++ b/tests/python/pants_test/cache/test_s3_artifact_cache.py @@ -0,0 +1,198 @@ +# coding=utf-8 +# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md). +# Licensed under the Apache License, Version 2.0 (see LICENSE). + +from __future__ import (absolute_import, division, generators, nested_scopes, print_function, + unicode_literals, with_statement) + +import os +from contextlib import contextmanager + +import boto3 +import pytest +from moto import mock_s3 + +from pants.cache.artifact_cache import UnreadableArtifact +from pants.cache.local_artifact_cache import LocalArtifactCache, TempLocalArtifactCache +from pants.cache.s3_artifact_cache import S3ArtifactCache +from pants.invalidation.build_invalidator import CacheKey +from pants.util.contextutil import temporary_dir, temporary_file +from pants.util.dirutil import safe_mkdir + + +_TEST_CONTENT1 = b'fraggle' +_TEST_CONTENT2 = b'gobo' + +_TEST_BUCKET = 'verst-test-bucket' + + +@pytest.yield_fixture(scope="function") +def local_artifact_root(): + with temporary_dir() as artifact_root: + yield artifact_root + + +@pytest.yield_fixture(scope="function") +def local_cache(local_artifact_root): + with temporary_dir() as cache_root: + yield LocalArtifactCache( + local_artifact_root, cache_root, compression=1) + + +@pytest.fixture( + scope="function", + params=[True, False], + ids=['temp-local-cache', 'local-cache'] +) +def tmp_and_local_cache(request, local_artifact_root, local_cache): + if request.param: + return TempLocalArtifactCache(local_artifact_root, 0) + else: + return local_cache + + +@pytest.yield_fixture(scope="function", autouse=True) +def s3_fixture(): + mock_s3().start() + + try: + s3 = boto3.resource('s3') + s3.create_bucket(Bucket=_TEST_BUCKET) + yield s3 + finally: + mock_s3().stop() + + +@pytest.fixture(scope="function") +def s3_cache_instance(local_artifact_root, local_cache): + with temporary_dir() as config_root: + return S3ArtifactCache( + os.path.join(config_root, 'non-existant'), None, + local_artifact_root, 's3://' + _TEST_BUCKET, local_cache) + + +@pytest.fixture(scope="function") +def tmp_and_local_s3_cache_instance( + local_artifact_root, tmp_and_local_cache): + with temporary_dir() as config_root: + return S3ArtifactCache( + os.path.join(config_root, 'non-existant'), None, + local_artifact_root, 's3://' + _TEST_BUCKET, tmp_and_local_cache) + + +@pytest.fixture() +def cache_key(): + return CacheKey('some_test_key', '1dfa0d08e47406038dda4ca5019c05c7977cb28c') + + +@pytest.yield_fixture() +def artifact_path(local_artifact_root): + with setup_test_file(local_artifact_root) as path: + yield path + + +@pytest.yield_fixture(scope="function") +def other_machine_cache(): + with temporary_dir() as artifact_root: + with temporary_dir() as cache_root: + local_cache = LocalArtifactCache( + artifact_root, cache_root, compression=1) + + with temporary_dir() as config_root: + yield S3ArtifactCache( + os.path.join(config_root, 'non-existant'), None, + artifact_root, 's3://' + _TEST_BUCKET, local_cache) + + +@contextmanager +def setup_test_file(parent): + with temporary_file(parent) as f: + # Write the file. + f.write(_TEST_CONTENT1) + path = f.name + f.close() + yield path + + +def test_basic_combined_cache( + tmp_and_local_s3_cache_instance, cache_key, artifact_path): + instance = tmp_and_local_s3_cache_instance + assert not instance.has(cache_key) + assert not instance.use_cached_files(cache_key) + + instance.insert(cache_key, [artifact_path]) + assert instance.has(cache_key) + + # Stomp it. + with open(artifact_path, 'w') as outfile: + outfile.write(_TEST_CONTENT2) + + # Recover it from the cache. + assert instance.use_cached_files(cache_key) + + # Check that it was recovered correctly. + with open(artifact_path, 'r') as infile: + content = infile.read() + assert content == _TEST_CONTENT1 + + # Delete it. + instance.delete(cache_key) + assert not instance.has(cache_key) + + +def test_multi_machine_combined( + s3_cache_instance, other_machine_cache, cache_key, + local_cache, artifact_path): + # Insert it into S3 from the other machine. + other_machine_cache.insert(cache_key, [artifact_path]) + + # Our machine doesn't have it ... + assert not local_cache.has(cache_key) + assert not local_cache.has(cache_key) + + # But can use it. + assert s3_cache_instance.has(cache_key) + assert s3_cache_instance.use_cached_files(cache_key) + + # And now we should have backfilled local: + assert local_cache.has(cache_key) + assert local_cache.use_cached_files(cache_key) + + +def test_corrupted_cached_file_cleaned_up( + local_artifact_root, + s3_fixture, s3_cache_instance, other_machine_cache, + cache_key): + local_results_dir = os.path.join(local_artifact_root, 'a/sub/dir') + remote_results_dir = os.path.join( + other_machine_cache.artifact_root, 'a/sub/dir') + safe_mkdir(local_results_dir) + safe_mkdir(remote_results_dir) + assert os.path.exists(local_results_dir) + assert os.path.exists(remote_results_dir) + + with setup_test_file(remote_results_dir) as path: + other_machine_cache.insert(cache_key, [path]) + + object = s3_fixture.Object(_TEST_BUCKET, s3_cache_instance._path_for_key(cache_key)) + object.put(Body=b'not a valid tgz any more') + + result = s3_cache_instance.use_cached_files( + cache_key, results_dir=local_results_dir) + + assert isinstance(result, UnreadableArtifact) + + # The local artifact should not have been stored, and the results_dir should exist, + # but be empty. + + assert os.path.exists(local_results_dir) + assert len(os.listdir(local_results_dir)) == 0 + + +def test_delete_non_existant_key(s3_cache_instance): + s3_cache_instance.delete(CacheKey('foo', 'bar')) + # Should not raise an exception + + +def test_use_cached_files_non_existant_key(s3_cache_instance): + assert not s3_cache_instance.use_cached_files(CacheKey('foo', 'bar'))