Skip to content

Commit

Permalink
More granular read-write lock requests in the run() and reproduce() m…
Browse files Browse the repository at this point in the history
…ethods of the Stage class (#5815)

* Remove rwlock decorator for the run() method of the Stage class.

Instead of pre-emptively grabbing read locks on the dependencies
and write locks on the outputs when calling the run() method,
delegate the locking mechanism to other methods.
This results in more selective locking that is less likely
to cause a LockError when executing steps in parallel.

* Remove rwlocked decorator from reproduce() method of Stage class.

* Reinstate read-lock (not write) for run method of Stage class

* fix bugs and typos

Co-authored-by: Ruslan Kuprieiev <kupruser@gmail.com>
  • Loading branch information
maximerischard and efiop committed Aug 9, 2021
1 parent d97a19e commit 63b710b
Showing 1 changed file with 18 additions and 4 deletions.
22 changes: 18 additions & 4 deletions dvc/stage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ def changed_deps(self):

return self._changed_deps()

@rwlocked(read=["deps"])
def _changed_deps(self):
for dep in self.deps:
status = dep.status()
Expand All @@ -328,6 +329,7 @@ def _changed_deps(self):
return True
return False

@rwlocked(read=["outs"])
def changed_outs(self):
for out in self.outs:
status = out.status()
Expand Down Expand Up @@ -513,7 +515,7 @@ def commit(self, allow_missing=False, filter_info=None):
if link_failures:
raise CacheLinkError(link_failures)

@rwlocked(read=["deps"], write=["outs"])
@rwlocked(read=["deps", "outs"])
def run(
self,
dry=False,
Expand All @@ -527,16 +529,16 @@ def run(

if not self.frozen and self.is_import:
jobs = kwargs.get("jobs", None)
sync_import(self, dry, force, jobs)
self._sync_import(dry, force, jobs)
elif not self.frozen and self.cmd:
run_stage(self, dry, force, **kwargs)
self._run_stage(dry, force, **kwargs)
else:
args = (
("outputs", "frozen ") if self.frozen else ("data sources", "")
)
logger.info("Verifying %s in %s%s", *args, self)
if not dry:
check_missing_outputs(self)
self._check_missing_outputs()

if not dry:
if kwargs.get("checkpoint_func", None):
Expand All @@ -545,6 +547,18 @@ def run(
if not no_commit:
self.commit(allow_missing=allow_missing)

@rwlocked(read=["deps"], write=["outs"])
def _run_stage(self, dry, force, **kwargs):
return run_stage(self, dry, force, **kwargs)

@rwlocked(read=["deps"], write=["outs"])
def _sync_import(self, dry, force, jobs):
sync_import(self, dry, force, jobs)

@rwlocked(read=["outs"])
def _check_missing_outputs(self):
check_missing_outputs(self)

def filter_outs(self, path_info):
def _func(o):
return path_info.isin_or_eq(o.path_info)
Expand Down

0 comments on commit 63b710b

Please sign in to comment.