Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More granular read-write lock requests in the run() and reproduce() methods of the Stage class #5815

Merged
merged 7 commits into from
Aug 9, 2021
22 changes: 18 additions & 4 deletions dvc/stage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ def changed_deps(self):

return self._changed_deps()

@rwlocked(read=["deps"])
def _changed_deps(self):
for dep in self.deps:
status = dep.status()
Expand All @@ -328,6 +329,7 @@ def _changed_deps(self):
return True
return False

@rwlocked(read=["outs"])
def changed_outs(self):
for out in self.outs:
status = out.status()
Expand Down Expand Up @@ -513,7 +515,7 @@ def commit(self, allow_missing=False, filter_info=None):
if link_failures:
raise CacheLinkError(link_failures)

@rwlocked(read=["deps"], write=["outs"])
@rwlocked(read=["deps", "outs"])
def run(
self,
dry=False,
Expand All @@ -527,16 +529,16 @@ def run(

if not self.frozen and self.is_import:
jobs = kwargs.get("jobs", None)
sync_import(self, dry, force, jobs)
self._sync_import(dry, force, jobs)
elif not self.frozen and self.cmd:
run_stage(self, dry, force, **kwargs)
self._run_stage(dry, force, **kwargs)
else:
args = (
("outputs", "frozen ") if self.frozen else ("data sources", "")
)
logger.info("Verifying %s in %s%s", *args, self)
if not dry:
check_missing_outputs(self)
self._check_missing_outputs()

if not dry:
if kwargs.get("checkpoint_func", None):
Expand All @@ -545,6 +547,18 @@ def run(
if not no_commit:
self.commit(allow_missing=allow_missing)

@rwlocked(read=["deps"], write=["outs"])
def _run_stage(self, dry, force, **kwargs):
return run_stage(self, dry, force, **kwargs)

@rwlocked(read=["deps"], write=["outs"])
def _sync_import(self, dry, force, jobs):
sync_import(self, dry, force, jobs)

@rwlocked(read=["outs"])
def _check_missing_outputs(self):
check_missing_outputs(self)

def filter_outs(self, path_info):
def _func(o):
return path_info.isin_or_eq(o.path_info)
Expand Down