Skip to content

Commit

Permalink
Ensure that invalid vts have results_dir cleaned before passing to ta… (
Browse files Browse the repository at this point in the history
pantsbuild#4139)

Tasks that sometimes fail due to outside factors (download failures,
resolve issues, etc) often would call safe_mkdir(vt.results_dir, clean=True)
in order to wipe possibly truncated or crufty state.

But since vt.results_dir is a symlink, that replaced it with a real dir.
That ended up breaking caching, since:
    * Product pipeline was consuming from the vt.results_dir
    * But the artifacts in the cache were created using vt.current_results_dir,
       which no longer got the output.

If a target is invalid, the cache_manager will now wipe the results_dir prior
to passing to the task, removing the need for tasks to clean it themself.
It also now checks to make sure that the results_dir is legal before marking
a VT valid and passing to the artifact_cache.

Also:
* Raise an Exception if relative_symlink tries to unlink a concrete file
* Copy over previous results *only* after checking for cache hits
    * Before this change every file in compile.zinc results_dir was copied twice
       per cache hit, and then thrown away. Now it only copies after a cache miss.

The majority of the change is added test coverage to cover the original
and added behavior.
  • Loading branch information
mateor authored and stuhood committed Dec 20, 2016
1 parent b2768c0 commit b1d7d08
Show file tree
Hide file tree
Showing 12 changed files with 375 additions and 114 deletions.
4 changes: 4 additions & 0 deletions src/python/pants/cache/local_artifact_cache.py
Expand Up @@ -67,6 +67,10 @@ def store_and_use_artifact(self, cache_key, src, results_dir=None):
tarball = self._store_tarball(cache_key, tmp.name)
artifact = self._artifact(tarball)

# NOTE(mateo): The two clean=True args passed in this method are likely safe, since the cache will by
# definition be dealing with unique results_dir, as opposed to the stable vt.results_dir (aka 'current').
# But if by chance it's passed the stable results_dir, safe_makedir(clean=True) will silently convert it
# from a symlink to a real dir and cause mysterious 'Operation not permitted' errors until the workdir is cleaned.
if results_dir is not None:
safe_mkdir(results_dir, clean=True)

Expand Down
85 changes: 50 additions & 35 deletions src/python/pants/invalidation/cache_manager.py
Expand Up @@ -13,7 +13,7 @@
from pants.build_graph.build_graph import sort_targets
from pants.build_graph.target import Target
from pants.invalidation.build_invalidator import BuildInvalidator, CacheKeyGenerator
from pants.util.dirutil import relative_symlink, safe_mkdir
from pants.util.dirutil import relative_symlink, safe_mkdir, safe_rmtree


class VersionedTargetSet(object):
Expand All @@ -25,6 +25,9 @@ class VersionedTargetSet(object):
built together into a single artifact.
"""

class IllegalResultsDir(Exception):
"""Indicate a problem interacting with a versioned target results directory."""

@staticmethod
def from_versioned_targets(versioned_targets):
"""
Expand Down Expand Up @@ -83,7 +86,7 @@ def has_results_dir(self):

@property
def has_previous_results_dir(self):
return self._previous_results_dir is not None
return self._previous_results_dir is not None and os.path.isdir(self._previous_results_dir)

@property
def results_dir(self):
Expand Down Expand Up @@ -113,12 +116,29 @@ def previous_results_dir(self):
TODO: Exposing old results is a bit of an abstraction leak, because ill-behaved Tasks could
mutate them.
"""
if self._previous_results_dir is None:
if not self.has_previous_results_dir:
raise ValueError('There is no previous_results_dir for: {}'.format(self))
return self._previous_results_dir

def ensure_legal(self):
"""Return True as long as the state does not break any internal contracts."""
# Do our best to provide complete feedback, it's easy to imagine the frustration of flipping between error states.
if self._results_dir:
errors = ''
if not os.path.islink(self._results_dir):
errors += '\nThe results_dir is no longer a symlink:\n\t* {}'.format(self._results_dir)
if not os.path.isdir(self._current_results_dir):
errors += '\nThe current_results_dir directory was not found\n\t* {}'.format(self._current_results_dir)
if errors:
raise self.IllegalResultsDir(
'\nThe results_dirs state should not be manually cleaned or recreated by tasks.\n{}'.format(errors)
)
return True

def live_dirs(self):
"""Yields directories that must exist for this VersionedTarget to function."""
# The only caller of this function is the workdir cleaning pipeline. It is not clear that the previous_results_dir
# should be returned for that purpose. And, by the time this is called, the contents have already been copied.
if self.has_results_dir:
yield self.results_dir
yield self.current_results_dir
Expand Down Expand Up @@ -168,42 +188,35 @@ def _results_dir_path(self, root_dir, key, stable):
self._STABLE_DIR_NAME if stable else sha1(key.hash).hexdigest()[:12]
)

def create_results_dir(self, root_dir, allow_incremental):
"""Ensures that a results_dir exists under the given root_dir for this versioned target.
def create_results_dir(self, root_dir):
"""Ensure that the empty results directory and a stable symlink exist for these versioned targets."""
self._current_results_dir = self._results_dir_path(root_dir, self.cache_key, stable=False)
self._results_dir = self._results_dir_path(root_dir, self.cache_key, stable=True)

If incremental=True, attempts to clone the results_dir for the previous version of this target
to the new results dir. Otherwise, simply ensures that the results dir exists.
"""
# Generate unique and stable directory paths for this cache key.
current_dir = self._results_dir_path(root_dir, self.cache_key, stable=False)
self._current_results_dir = current_dir
stable_dir = self._results_dir_path(root_dir, self.cache_key, stable=True)
self._results_dir = stable_dir
if self.valid:
# If the target is valid, both directories can be assumed to exist.
return

# Clone from the previous results_dir if incremental, or initialize.
previous_dir = self._use_previous_dir(allow_incremental, root_dir, current_dir)
if previous_dir is not None:
self.is_incremental = True
self._previous_results_dir = previous_dir
shutil.copytree(previous_dir, current_dir)
else:
safe_mkdir(current_dir)
if not self.valid:
# Clean the workspace for invalid vts.
safe_mkdir(self._current_results_dir, clean=True)
relative_symlink(self._current_results_dir, self._results_dir)
self.ensure_legal()

# Finally, create the stable symlink.
relative_symlink(current_dir, stable_dir)
def copy_previous_results(self, root_dir):
"""Use the latest valid results_dir as the starting contents of the current results_dir.
def _use_previous_dir(self, allow_incremental, root_dir, current_dir):
if not allow_incremental or not self.previous_cache_key:
# Not incremental.
return None
previous_dir = self._results_dir_path(root_dir, self.previous_cache_key, stable=False)
if not os.path.isdir(previous_dir) or os.path.isdir(current_dir):
# Could be useful, but no previous results are present.
Should be called after the cache is checked, since previous_results are not useful if there is a cached artifact.
"""
# TODO(mateo): An immediate followup removes the root_dir param, it is identical to the task.workdir.
# TODO(mateo): This should probably be managed by the task, which manages the rest of the incremental support.
if not self.previous_cache_key:
return None
return previous_dir
previous_path = self._results_dir_path(root_dir, self.previous_cache_key, stable=False)
if os.path.isdir(previous_path):
self.is_incremental = True
safe_rmtree(self._current_results_dir)
shutil.copytree(previous_path, self._current_results_dir)
safe_mkdir(self._current_results_dir)
relative_symlink(self._current_results_dir, self.results_dir)
# Set the self._previous last, so that it is only True after the copy completed.
self._previous_results_dir = previous_path

def __repr__(self):
return 'VT({}, {})'.format(self.target.id, 'valid' if self.valid else 'invalid')
Expand Down Expand Up @@ -264,11 +277,13 @@ def __init__(self,
def update(self, vts):
"""Mark a changed or invalidated VersionedTargetSet as successfully processed."""
for vt in vts.versioned_targets:
vt.ensure_legal()
if not vt.valid:
self._invalidator.update(vt.cache_key)
vt.valid = True
self._artifact_write_callback(vt)
if not vts.valid:
vts.ensure_legal()
self._invalidator.update(vts.cache_key)
vts.valid = True
self._artifact_write_callback(vts)
Expand Down
13 changes: 7 additions & 6 deletions src/python/pants/task/task.py
Expand Up @@ -418,13 +418,17 @@ def invalidated(self,
invalidation_report.add_vts(cache_manager, vts.targets, vts.cache_key, vts.valid,
phase='pre-check')

# Cache has been checked to create the full list of invalid VTs. Only copy previous_results for this subset of VTs.
for vts in invalidation_check.invalid_vts:
if self.incremental:
vts.copy_previous_results(self.workdir)

# Yield the result, and then mark the targets as up to date.
yield invalidation_check

if invalidation_report:
for vts in invalidation_check.all_vts:
invalidation_report.add_vts(cache_manager, vts.targets, vts.cache_key, vts.valid,
phase='post-check')
invalidation_report.add_vts(cache_manager, vts.targets, vts.cache_key, vts.valid, phase='post-check')

for vt in invalidation_check.invalid_vts:
vt.update()
Expand Down Expand Up @@ -464,7 +468,7 @@ def _maybe_create_results_dirs(self, vts):
"""If `cache_target_dirs`, create results_dirs for the given versioned targets."""
if self.create_target_dirs:
for vt in vts:
vt.create_results_dir(self.workdir, allow_incremental=self.incremental)
vt.create_results_dir(self.workdir)

def check_artifact_cache_for(self, invalidation_check):
"""Decides which VTS to check the artifact cache for.
Expand Down Expand Up @@ -496,11 +500,8 @@ def do_check_artifact_cache(self, vts, post_process_cached_vts=None):
read_cache = self._cache_factory.get_read_cache()
items = [(read_cache, vt.cache_key, vt.current_results_dir if self.cache_target_dirs else None)
for vt in vts]

res = self.context.subproc_map(call_use_cached_files, items)

self._maybe_create_results_dirs(vts)

cached_vts = []
uncached_vts = []
uncached_causes = []
Expand Down
2 changes: 1 addition & 1 deletion src/python/pants/task/testrunner_task_mixin.py
Expand Up @@ -24,7 +24,7 @@ def register_options(cls, register):
super(TestRunnerTaskMixin, cls).register_options(register)
register('--skip', type=bool, help='Skip running tests.')
register('--timeouts', type=bool, default=True,
help='Enable test target timeouts. If timeouts are enabled then tests with a'
help='Enable test target timeouts. If timeouts are enabled then tests with a '
'timeout= parameter set on their target will time out after the given number of '
'seconds if not completed. If no timeout is set, then either the default timeout '
'is used or no timeout is configured. In the current implementation, all the '
Expand Down
4 changes: 4 additions & 0 deletions src/python/pants/util/dirutil.py
Expand Up @@ -290,6 +290,10 @@ def relative_symlink(source_path, link_path):
raise ValueError("Path for link:{} must be absolute".format(link_path))
if source_path == link_path:
raise ValueError("Path for link is identical to source:{}".format(source_path))
# The failure state below had a long life as an uncaught error. No behavior was changed here, it just adds a catch.
# Raising an exception does differ from absolute_symlink, which takes the liberty of deleting existing directories.
if os.path.isdir(link_path) and not os.path.islink(link_path):
raise ValueError("Path for link would overwrite an existing directory: {}".format(link_path))
try:
if os.path.lexists(link_path):
os.unlink(link_path)
Expand Down
3 changes: 2 additions & 1 deletion tests/python/pants_test/cache/BUILD
Expand Up @@ -46,7 +46,6 @@ python_tests(
'src/python/pants/base:payload',
'src/python/pants/build_graph:build_graph',
'src/python/pants/cache:cache',
'src/python/pants/util:dirutil',
'tests/python/pants_test/tasks:task_test_base',
],
)
Expand All @@ -67,6 +66,8 @@ python_library(
sources = ['cache_server.py'],
dependencies = [
'3rdparty/python:six',
'src/python/pants/util:contextutil',
'src/python/pants/util:dirutil',
'tests/python/pants_test/testutils:file_test_util',
]
)
Expand Down
6 changes: 2 additions & 4 deletions tests/python/pants_test/cache/test_caching.py
Expand Up @@ -11,7 +11,6 @@
from pants.build_graph.target import Target
from pants.cache.cache_setup import CacheSetup
from pants.task.task import Task
from pants.util.dirutil import safe_rmtree
from pants_test.tasks.task_test_base import TaskTestBase


Expand All @@ -25,7 +24,6 @@ def __init__(self, address, source, *args, **kwargs):


class DummyTask(Task):
"""A task that appends the content of a DummyLibrary's source into its results_dir."""
options_scope = 'dummy'

@property
Expand Down Expand Up @@ -75,8 +73,8 @@ def test_cache_read_from(self):
# Executing the task for the first time the vt is expected to be in the invalid_vts list
self.assertGreater(len(invalid_vts), 0)
first_vt = invalid_vts[0]
# Delete .pants.d
safe_rmtree(self.task._workdir)
# Mark the target invalid.
self.target.mark_invalidation_hash_dirty()
all_vts2, invalid_vts2 = self.task.execute()
# Check that running the task a second time results in a valid vt,
# implying the artifact cache was hit.
Expand Down
1 change: 1 addition & 0 deletions tests/python/pants_test/invalidation/BUILD
Expand Up @@ -7,6 +7,7 @@ python_tests(
sources = ['test_cache_manager.py'],
dependencies = [
'src/python/pants/invalidation',
'src/python/pants/util:dirutil',
'tests/python/pants_test/testutils:mock_logger',
'tests/python/pants_test/tasks:task_test_base',
]
Expand Down

0 comments on commit b1d7d08

Please sign in to comment.