Permalink
Browse files

Refactoring of scala_compile.py and related code.

It's much more readable now.

Auditors: markcc@foursquare.com.
  • Loading branch information...
1 parent 5087195 commit 355d12ca572d042d67e18e78cbfbf5f3206123dc Benjy committed Feb 14, 2013
@@ -17,7 +17,6 @@
import httplib
import os
import shutil
-import traceback
import urlparse
from twitter.common.contextutil import open_tar, temporary_file
@@ -216,34 +215,44 @@ def has(self, cache_key):
return response is not None
def use_cached_files(self, cache_key):
- path = self._path_for_key(cache_key)
- response = self._request('GET', path)
- if response is None:
- return False
- expected_size = int(response.getheader('content-length', -1))
- if expected_size == -1:
- raise Exception, 'No content-length header in HTTP response'
- read_size = 4 * 1024 * 1024 # 4 MB
- done = False
- if self.context:
- self.context.log.info('Reading %d bytes' % expected_size)
- with temporary_file() as outfile:
- total_bytes = 0
- while not done:
- data = response.read(read_size)
- outfile.write(data)
- if len(data) < read_size:
- done = True
- total_bytes += len(data)
- if self.context:
- self.context.log.debug('Read %d bytes' % total_bytes)
- outfile.close()
- if total_bytes != expected_size:
- raise Exception, 'Read only %d bytes from %d expected' % (total_bytes, expected_size)
- mode = 'r:bz2' if self.compress else 'r'
- with open_tar(outfile.name, mode) as tarfile:
- tarfile.extractall(self.artifact_root)
- return True
+ # This implementation fetches the appropriate tarball and extracts it.
+ try:
+ # Send an HTTP request for the tarball.
+ path = self._path_for_key(cache_key)
+ response = self._request('GET', path)
+ if response is None:
+ return False
+ expected_size = int(response.getheader('content-length', -1))
+ if expected_size == -1:
+ raise Exception, 'No content-length header in HTTP response'
+ read_size = 4 * 1024 * 1024 # 4 MB
+ done = False
+ if self.context:
+ self.context.log.info('Reading %d bytes' % expected_size)
+ # Read the data in a loop.
+ with temporary_file() as outfile:
+ total_bytes = 0
+ while not done:
+ data = response.read(read_size)
+ outfile.write(data)
+ if len(data) < read_size:
+ done = True
+ total_bytes += len(data)
+ if self.context:
+ self.context.log.debug('Read %d bytes' % total_bytes)
+ outfile.close()
+ # Check the size.
+ if total_bytes != expected_size:
+ raise Exception, 'Read only %d bytes from %d expected' % (total_bytes, expected_size)
+ # Extract the tarfile.
+ mode = 'r:bz2' if self.compress else 'r'
+ with open_tar(outfile.name, mode) as tarfile:
+ tarfile.extractall(self.artifact_root)
+ return True
+ except Exception, e:
+ if self.context:
+ self.context.log.warn('Error while reading from artifact cache: %s' % e)
+ return False
def delete(self, cache_key):
path = self._path_for_key(cache_key)
@@ -19,9 +19,10 @@
import os
import sys
+from twitter.common.collections.orderedset import OrderedSet
from twitter.pants.base.artifact_cache import create_artifact_cache
from twitter.pants.base.build_invalidator import CacheKeyGenerator
-from twitter.pants.tasks.cache_manager import CacheManager
+from twitter.pants.tasks.cache_manager import CacheManager, InvalidationCheck
class TaskError(Exception):
@@ -118,8 +119,41 @@ def invalidated(self,
one VersionedTargetSet per target. It is up to the caller to do the right thing with whatever partitioning
it asks for.
- Yields an InvalidationCheck object reflecting the (partitioned) targets. If no exceptions are
- thrown by work in the block, the cache is updated for the targets.
+ Yields an InvalidationCheck object reflecting the (partitioned) targets.
+
+ If no exceptions are thrown by work in the block, the build cache is updated for the targets.
+ """
+ with self.invalidated_with_artifact_cache_check(targets, only_buildfiles,
+ invalidate_dependents, partition_size_hint) as check:
+ yield check[0]
+
+
+ @contextmanager
+ def invalidated_with_artifact_cache_check(self,
+ targets,
+ only_buildfiles = False,
+ invalidate_dependents = False,
+ partition_size_hint = sys.maxint):
+ """Checks targets for invalidation, first checking the artifact cache.
+ Subclasses call this to figure out what to work on.
+
+ targets: The targets to check for changes.
+
+ only_buildfiles: If True, then only the target's BUILD files are checked for changes, not its sources.
+
+ invalidate_dependents: If True then any targets depending on changed targets are invalidated.
+
+ partition_size_hint: Each VersionedTargetSet in the yielded list will represent targets containing roughly
+ this number of source files, if possible. Set to sys.maxint for a single VersionedTargetSet. Set to 0 for
+ one VersionedTargetSet per target. It is up to the caller to do the right thing with whatever partitioning
+ it asks for.
+
+ Yields a pair of (invalidation_check, cached_vts) where invalidation_check is an InvalidationCheck object
+ reflecting the (partitioned) targets, and cached_vts is a list of VersionedTargets that were satisfied
+ from the artifact cache.
+
+ If no exceptions are thrown by work in the block, the build cache is updated for the targets.
+ Note: the artifact cache is not updated, that must be done manually.
"""
extra_data = []
extra_data.append(self.invalidate_for())
@@ -133,12 +167,26 @@ def invalidated(self,
cache_manager = CacheManager(self._cache_key_generator, self._build_invalidator_dir,
invalidate_dependents, extra_data, only_externaldeps=only_buildfiles)
- invalidation_check = cache_manager.check(targets, partition_size_hint)
-
- num_invalid_partitions = len(invalidation_check.invalid_vts_partitioned)
+ unpartitioned_invalidation_check = cache_manager.check(targets)
+ cached_vts = []
+ if self._artifact_cache and self.context.options.read_from_artifact_cache:
+ for vt in unpartitioned_invalidation_check.invalid_vts:
+ if self._artifact_cache.use_cached_files(vt.cache_key):
+ self.context.log.info('Using cached artifacts for %s' % vt.targets)
+ vt.update()
+ cached_vts.append(vt)
+ else:
+ self.context.log.info('No cached artifacts for %s' % vt.targets)
+
+ invalid_vts = list(OrderedSet(unpartitioned_invalidation_check.invalid_vts) - set(cached_vts))
+ # Now that we've checked the cache, partition whatever is still invalid.
+ partitioned_invalidation_check = \
+ InvalidationCheck(unpartitioned_invalidation_check.all_vts, invalid_vts, partition_size_hint)
+
+ num_invalid_partitions = len(partitioned_invalidation_check.invalid_vts_partitioned)
num_invalid_targets = 0
num_invalid_sources = 0
- for vt in invalidation_check.invalid_vts:
+ for vt in partitioned_invalidation_check.invalid_vts:
if not vt.valid:
num_invalid_targets += len(vt.targets)
num_invalid_sources += vt.cache_key.num_sources
@@ -149,51 +197,22 @@ def invalidated(self,
(num_invalid_sources, num_invalid_targets, num_invalid_partitions))
# Yield the result, and then update the cache.
- yield invalidation_check
+ yield partitioned_invalidation_check, cached_vts
if not self.dry_run:
- for vt in invalidation_check.invalid_vts:
+ for vt in partitioned_invalidation_check.invalid_vts:
vt.update() # In case the caller doesn't update.
- @contextmanager
- def check_artifact_cache(self, versioned_targets, build_artifacts):
- """See if we have required artifacts in the cache.
-
- If we do (and reading from the artifact cache is enabled) then we copy the artifacts from the cache.
- If we don't (and writing to the artifact cache is enabled) then we will copy the artifacts into
- the cache when the context is exited.
-
- Therefore the usage idiom is as follows:
-
- with self.check_artifact_cache(...) as in_cache:
- if not in_cache:
- ... build the necessary artifacts ...
-
- versioned_targets: a VersionedTargetSet representing a specific version of a set of targets.
-
- build_artifacts: a list of paths to which the artifacts will be written. These must be under pants_workdir.
+ def update_artifact_cache(self, vts, build_artifacts):
+ """Write to the artifact cache, if we're configured to.
- Returns False if the caller must build the artifacts, True otherwise.
+ vts - a single VersionedTargetSet.
+ build_artifacts - the paths to the artifacts for the VersionedTargetSet.
"""
- if self._artifact_cache is None:
- yield False
- return
- artifact_key = versioned_targets.cache_key
- targets = versioned_targets.targets
- using_cached = False
- if self.context.options.read_from_artifact_cache:
- if self._artifact_cache.use_cached_files(artifact_key):
- self.context.log.info('Using cached artifacts for %s' % targets)
- using_cached = True
- else:
- self.context.log.info('No cached artifacts for %s' % targets)
-
- yield using_cached
-
- if not using_cached and self.context.options.write_to_artifact_cache:
- if self.context.options.verify_artifact_cache:
- pass # TODO: verification logic
- self.context.log.info('Caching artifacts for %s' % str(targets))
- self._artifact_cache.insert(artifact_key, build_artifacts)
+ if self._artifact_cache and self.context.options.write_to_artifact_cache:
+ if self.context.options.verify_artifact_cache:
+ pass # TODO: Verify that the artifact we just built is identical to the cached one.
+ self.context.log.info('Caching artifacts for %s' % str(vts.targets))
+ self._artifact_cache.insert(vts.cache_key, build_artifacts)
__all__ = (
Oops, something went wrong. Retry.

0 comments on commit 355d12c

Please sign in to comment.