Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a 'current' symlink to the task-versioned prefix of the workdir. #4220

Merged
merged 4 commits into from Feb 1, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -82,7 +82,8 @@ def test_go_thrift_gen_simple(self):
self.assert_success(pants_run)

# Fetch the hash for task impl version.
go_thrift_contents = os.listdir(os.path.join(workdir, 'gen', 'go-thrift'))
go_thrift_contents = [p for p in os.listdir(os.path.join(workdir, 'gen', 'go-thrift'))
if p != 'current'] # Ignore the 'current' symlink.
self.assertEqual(len(go_thrift_contents), 1)
hash_dir = go_thrift_contents[0]

Expand Down
54 changes: 28 additions & 26 deletions src/python/pants/invalidation/cache_manager.py
Expand Up @@ -13,7 +13,7 @@
from pants.build_graph.build_graph import sort_targets
from pants.build_graph.target import Target
from pants.invalidation.build_invalidator import BuildInvalidator, CacheKeyGenerator
from pants.util.dirutil import relative_symlink, safe_mkdir, safe_rmtree
from pants.util.dirutil import relative_symlink, safe_delete, safe_mkdir, safe_rmtree


class VersionedTargetSet(object):
Expand Down Expand Up @@ -157,8 +157,6 @@ class VersionedTarget(VersionedTargetSet):
:API: public
"""

_STABLE_DIR_NAME = 'current'

def __init__(self, cache_manager, target, cache_key):
"""
:API: public
Expand All @@ -172,26 +170,10 @@ def __init__(self, cache_manager, target, cache_key):
super(VersionedTarget, self).__init__(cache_manager, [self])
self.id = target.id

def _results_dir_path(self, root_dir, key, stable):
"""Return a results directory path for the given key.

:param key: A CacheKey to generate an id for.
:param stable: True to use a stable subdirectory, false to use a portion of the cache key to
generate a path unique to the key.
"""
task_version = self._cache_manager.task_version
# TODO: Shorten cache_key hashes in general?
return os.path.join(
root_dir,
sha1(task_version).hexdigest()[:12],
key.id,
self._STABLE_DIR_NAME if stable else sha1(key.hash).hexdigest()[:12]
)

def create_results_dir(self, root_dir):
def create_results_dir(self):
"""Ensure that the empty results directory and a stable symlink exist for these versioned targets."""
self._current_results_dir = self._results_dir_path(root_dir, self.cache_key, stable=False)
self._results_dir = self._results_dir_path(root_dir, self.cache_key, stable=True)
self._current_results_dir = self._cache_manager.results_dir_path(self.cache_key, stable=False)
self._results_dir = self._cache_manager.results_dir_path(self.cache_key, stable=True)

if not self.valid:
# Clean the workspace for invalid vts.
Expand All @@ -208,7 +190,7 @@ def copy_previous_results(self, root_dir):
# TODO(mateo): This should probably be managed by the task, which manages the rest of the incremental support.
if not self.previous_cache_key:
return None
previous_path = self._results_dir_path(root_dir, self.previous_cache_key, stable=False)
previous_path = self._cache_manager.results_dir_path(self.previous_cache_key, stable=False)
if os.path.isdir(previous_path):
self.is_incremental = True
safe_rmtree(self._current_results_dir)
Expand Down Expand Up @@ -253,7 +235,10 @@ class InvalidationCacheManager(object):
class CacheValidationError(Exception):
"""Indicates a problem accessing the cache."""

_STABLE_DIR_NAME = 'current'

def __init__(self,
results_dir_root,
cache_key_generator,
build_invalidator_dir,
invalidate_dependents,
Expand All @@ -274,6 +259,13 @@ def __init__(self,
self._artifact_write_callback = artifact_write_callback
self.invalidation_report = invalidation_report

# Create the task-versioned prefix of the results dir, and a stable symlink to it (useful when debugging).
self._results_dir_prefix = os.path.join(results_dir_root, sha1(self._task_version).hexdigest()[:12])
safe_mkdir(self._results_dir_prefix)
stable_prefix = os.path.join(results_dir_root, self._STABLE_DIR_NAME)
safe_delete(stable_prefix)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Obviously doesn't hurt, but I think relative_symlink will gracefully repoint the existing symlink without requiring the delete.

We had to more anal about it in create_results_dirs because we explicitly wanted to catch places where the symlink had been replaced by a real dir.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, and I responded to your earlier question here: c509d80#r99345117, in case that is hard to find otherwise.

relative_symlink(self._results_dir_prefix, stable_prefix)

def update(self, vts):
"""Mark a changed or invalidated VersionedTargetSet as successfully processed."""
for vt in vts.versioned_targets:
Expand Down Expand Up @@ -316,9 +308,19 @@ def check(self,
def task_name(self):
return self._task_name

@property
def task_version(self):
return self._task_version
def results_dir_path(self, key, stable):
"""Return a results directory path for the given key.

:param key: A CacheKey to generate an id for.
:param stable: True to use a stable subdirectory, false to use a portion of the cache key to
generate a path unique to the key.
"""
# TODO: Shorten cache_key hashes in general?
return os.path.join(
self._results_dir_prefix,
key.id,
self._STABLE_DIR_NAME if stable else sha1(key.hash).hexdigest()[:12]
)

def wrap_targets(self, targets, topological_order=False):
"""Wrap targets and their computed cache keys in VersionedTargets.
Expand Down
33 changes: 10 additions & 23 deletions src/python/pants/task/task.py
Expand Up @@ -261,26 +261,6 @@ def invalidate(self):
"""Invalidates all targets for this task."""
BuildInvalidator(self._build_invalidator_dir).force_invalidate_all()

def create_cache_manager(self, invalidate_dependents, fingerprint_strategy=None):
"""Creates a cache manager that can be used to invalidate targets on behalf of this task.

Use this if you need to check for invalid targets but can't use the contextmanager created by
invalidated(), e.g., because you don't want to mark the targets as valid when done.

invalidate_dependents: If True then any targets depending on changed targets are invalidated.
fingerprint_strategy: A FingerprintStrategy instance, which can do per task, finer grained
fingerprinting of a given Target.
"""

return InvalidationCacheManager(self._cache_key_generator,
self._build_invalidator_dir,
invalidate_dependents,
fingerprint_strategy=fingerprint_strategy,
invalidation_report=self.context.invalidation_report,
task_name=type(self).__name__,
task_version=self.implementation_version_str(),
artifact_write_callback=self.maybe_write_artifact)

@property
def create_target_dirs(self):
"""Whether to create a results_dir per VersionedTarget in the workdir of the Task.
Expand Down Expand Up @@ -357,8 +337,15 @@ def invalidated(self,
"""

fingerprint_strategy = fingerprint_strategy or TaskIdentityFingerprintStrategy(self)
cache_manager = self.create_cache_manager(invalidate_dependents,
fingerprint_strategy=fingerprint_strategy)
cache_manager = InvalidationCacheManager(self.workdir,
self._cache_key_generator,
self._build_invalidator_dir,
invalidate_dependents,
fingerprint_strategy=fingerprint_strategy,
invalidation_report=self.context.invalidation_report,
task_name=type(self).__name__,
task_version=self.implementation_version_str(),
artifact_write_callback=self.maybe_write_artifact)

invalidation_check = cache_manager.check(targets, topological_order=topological_order)

Expand Down Expand Up @@ -452,7 +439,7 @@ def _maybe_create_results_dirs(self, vts):
"""If `cache_target_dirs`, create results_dirs for the given versioned targets."""
if self.create_target_dirs:
for vt in vts:
vt.create_results_dir(self.workdir)
vt.create_results_dir()

def check_artifact_cache_for(self, invalidation_check):
"""Decides which VTS to check the artifact cache for.
Expand Down
Expand Up @@ -144,13 +144,14 @@ class Main {}"""))

root = os.path.join(workdir, 'compile', 'zinc')

versioned_root = os.path.join(root, os.listdir(root)[0])
self.assertEqual(len(os.listdir(root)), 1, 'Expected 1 task version.')
task_versions = [p for p in os.listdir(root) if p != 'current']
self.assertEqual(len(task_versions), 1, 'Expected 1 task version.')
versioned_root = os.path.join(root, task_versions[0])

target_root = os.path.join(root, os.listdir(root)[0])
self.assertEqual(len(os.listdir(target_root)), 1, 'Expected 1 target.')
per_target_dirs = os.listdir(versioned_root)
self.assertEqual(len(per_target_dirs), 1, 'Expected 1 target.')
target_workdir_root = os.path.join(versioned_root, per_target_dirs[0])

target_workdir_root = os.path.join(versioned_root, os.listdir(versioned_root)[0])
target_workdirs = os.listdir(target_workdir_root)
self.assertEqual(len(target_workdirs), 3, 'Expected 3 workdirs (current, and two versioned).')
self.assertIn('current', target_workdirs)
Expand Down
11 changes: 6 additions & 5 deletions tests/python/pants_test/invalidation/test_cache_manager.py
Expand Up @@ -36,8 +36,9 @@ def setUp(self):
super(InvalidationCacheManagerTest, self).setUp()
self._dir = tempfile.mkdtemp()
self.cache_manager = InvalidationCacheManager(
results_dir_root=os.path.join(self._dir, 'results'),
cache_key_generator=CacheKeyGenerator(),
build_invalidator_dir=self ._dir,
build_invalidator_dir=os.path.join(self._dir, 'build_invalidator'),
invalidate_dependents=True,
)

Expand All @@ -56,7 +57,7 @@ def make_vt(self, invalid=False):
return vt

def task_execute(self, vt):
vt.create_results_dir(self._dir)
vt.create_results_dir()
task_output = os.path.join(vt.results_dir, 'a_file')
self.create_file(task_output, 'foo')

Expand Down Expand Up @@ -94,7 +95,7 @@ def test_invalid_vts_are_cleaned(self):
vt.force_invalidate()
self.assertFalse(self.is_empty(vt.results_dir))

vt.create_results_dir(self._dir)
vt.create_results_dir()
self.assertTrue(self.has_symlinked_result_dir(vt))
self.assertTrue(self.is_empty(vt.results_dir))

Expand All @@ -104,7 +105,7 @@ def test_valid_vts_are_not_cleaned(self):
self.assertFalse(self.is_empty(vt.results_dir))
file_names = os.listdir(vt.results_dir)

vt.create_results_dir(self._dir)
vt.create_results_dir()
self.assertFalse(self.is_empty(vt.results_dir))
self.assertTrue(self.has_symlinked_result_dir(vt))

Expand Down Expand Up @@ -145,7 +146,7 @@ def test_exception_for_invalid_vt_result_dirs(self):
# This only is caught here if the VT is still invalid for some reason, otherwise it's caught by the update() method.
vt.force_invalidate()
with self.assertRaisesRegexp(ValueError, r'Path for link.*overwrite an existing directory*'):
vt.create_results_dir(self._dir)
vt.create_results_dir()

def test_raises_for_clobbered_symlink(self):
vt = self.make_vt()
Expand Down