From 03869f44c764b278d2cccbafc3ff823d8d6c9ee8 Mon Sep 17 00:00:00 2001 From: daavoo Date: Fri, 2 Jun 2023 16:55:58 +0200 Subject: [PATCH] repro: Check for hash mismatch bewteen deleted dependencies and upstream outputs. Only if `--allow-missing` is passed. - Create `tests/func/repro` and extract `pull` and `allow_missing` to separate test files. Closes #9530 --- dvc/repo/reproduce.py | 3 +- dvc/stage/__init__.py | 29 +++++++--- tests/func/repro/__init__.py | 0 tests/func/{ => repro}/test_repro.py | 0 tests/func/repro/test_repro_allow_missing.py | 57 +++++++++++++++++++ tests/func/repro/test_repro_pull.py | 58 ++++++++++++++++++++ 6 files changed, 139 insertions(+), 8 deletions(-) create mode 100644 tests/func/repro/__init__.py rename tests/func/{ => repro}/test_repro.py (100%) create mode 100644 tests/func/repro/test_repro_allow_missing.py create mode 100644 tests/func/repro/test_repro_pull.py diff --git a/dvc/repo/reproduce.py b/dvc/repo/reproduce.py index aa4de4c9e7..66c3038931 100644 --- a/dvc/repo/reproduce.py +++ b/dvc/repo/reproduce.py @@ -163,7 +163,7 @@ def _reproduce_stages( # noqa: C901 steps = _get_steps(graph, stages, downstream, single_item) force_downstream = kwargs.pop("force_downstream", False) - result = [] + result: List["Stage"] = [] unchanged: List["Stage"] = [] # `ret` is used to add a cosmetic newline. ret: List["Stage"] = [] @@ -173,6 +173,7 @@ def _reproduce_stages( # noqa: C901 logger.info("") try: + kwargs["upstream"] = result + unchanged ret = _reproduce_stage(stage, **kwargs) if len(ret) == 0: diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index 050c38e190..24e1bf6a2a 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -294,22 +294,33 @@ def env(self) -> Env: return env - def changed_deps(self, allow_missing: bool = False) -> bool: + def changed_deps( + self, allow_missing: bool = False, upstream: Optional[List] = None + ) -> bool: if self.frozen: return False if self.is_callback or self.always_changed: return True - return self._changed_deps(allow_missing=allow_missing) + return self._changed_deps(allow_missing=allow_missing, upstream=upstream) @rwlocked(read=["deps"]) - def _changed_deps(self, allow_missing: bool = False) -> bool: + def _changed_deps( + self, allow_missing: bool = False, upstream: Optional[List] = None + ) -> bool: for dep in self.deps: status = dep.status() if status: if allow_missing and status[str(dep)] == "deleted": - continue + if upstream and any( + dep.fs_path == out.fs_path and dep.hash_info != out.hash_info + for stage in upstream + for out in stage.outs + ): + status[str(dep)] = "modified" + else: + continue logger.debug( "Dependency '%s' of %s changed because it is '%s'.", dep, @@ -343,12 +354,14 @@ def changed_stage(self) -> bool: return changed @rwlocked(read=["deps", "outs"]) - def changed(self, allow_missing: bool = False) -> bool: + def changed( + self, allow_missing: bool = False, upstream: Optional[List] = None + ) -> bool: is_changed = ( # Short-circuit order: stage md5 is fast, # deps are expected to change self.changed_stage() - or self.changed_deps(allow_missing=allow_missing) + or self.changed_deps(allow_missing=allow_missing, upstream=upstream) or self.changed_outs(allow_missing=allow_missing) ) if is_changed: @@ -403,7 +416,9 @@ def transfer( def reproduce(self, interactive=False, **kwargs) -> Optional["Stage"]: if not ( kwargs.get("force", False) - or self.changed(kwargs.get("allow_missing", False)) + or self.changed( + kwargs.get("allow_missing", False), kwargs.pop("upstream", None) + ) ): if not isinstance(self, PipelineStage) and self.is_data_source: logger.info("'%s' didn't change, skipping", self.addressing) diff --git a/tests/func/repro/__init__.py b/tests/func/repro/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/func/test_repro.py b/tests/func/repro/test_repro.py similarity index 100% rename from tests/func/test_repro.py rename to tests/func/repro/test_repro.py diff --git a/tests/func/repro/test_repro_allow_missing.py b/tests/func/repro/test_repro_allow_missing.py new file mode 100644 index 0000000000..26041781cf --- /dev/null +++ b/tests/func/repro/test_repro_allow_missing.py @@ -0,0 +1,57 @@ +from dvc.utils.fs import remove + + +def test_repro_allow_missing(tmp_dir, dvc): + tmp_dir.gen("fixed", "fixed") + dvc.stage.add(name="create-foo", cmd="echo foo > foo", deps=["fixed"], outs=["foo"]) + dvc.stage.add(name="copy-foo", cmd="cp foo bar", deps=["foo"], outs=["bar"]) + (create_foo, copy_foo) = dvc.reproduce() + + remove("foo") + remove(create_foo.outs[0].cache_path) + remove(dvc.stage_cache.cache_dir) + + ret = dvc.reproduce(allow_missing=True) + # both stages are skipped + assert not ret + + +def test_repro_allow_missing_and_pull(tmp_dir, dvc, mocker, local_remote): + tmp_dir.gen("fixed", "fixed") + dvc.stage.add(name="create-foo", cmd="echo foo > foo", deps=["fixed"], outs=["foo"]) + dvc.stage.add(name="copy-foo", cmd="cp foo bar", deps=["foo"], outs=["bar"]) + (create_foo,) = dvc.reproduce("create-foo") + + dvc.push() + + remove("foo") + remove(create_foo.outs[0].cache_path) + remove(dvc.stage_cache.cache_dir) + + ret = dvc.reproduce(pull=True, allow_missing=True) + # create-foo is skipped ; copy-foo pulls missing dep + assert len(ret) == 1 + + +def test_repro_allow_missing_upstream_stage_modified( + tmp_dir, dvc, mocker, local_remote +): + """https://github.com/iterative/dvc/issues/9530""" + tmp_dir.gen("params.yaml", "param: 1") + dvc.stage.add( + name="create-foo", cmd="echo ${param} > foo", params=["param"], outs=["foo"] + ) + dvc.stage.add(name="copy-foo", cmd="cp foo bar", deps=["foo"], outs=["bar"]) + dvc.reproduce() + + dvc.push() + + tmp_dir.gen("params.yaml", "param: 2") + (create_foo,) = dvc.reproduce("create-foo") + dvc.push() + remove("foo") + remove(create_foo.outs[0].cache_path) + + ret = dvc.reproduce(pull=True, allow_missing=True) + # create-foo is skipped ; copy-foo pulls modified dep + assert len(ret) == 1 diff --git a/tests/func/repro/test_repro_pull.py b/tests/func/repro/test_repro_pull.py new file mode 100644 index 0000000000..8d4dbbe5c3 --- /dev/null +++ b/tests/func/repro/test_repro_pull.py @@ -0,0 +1,58 @@ +import os + +from dvc.stage.cache import RunCacheNotSupported +from dvc.utils.fs import remove + + +def test_repro_pulls_mising_data_source(tmp_dir, dvc, mocker, local_remote): + (foo,) = tmp_dir.dvc_gen("foo", "foo") + + dvc.push() + + dvc.stage.add(name="copy-foo", cmd="cp foo bar", deps=["foo"], outs=["bar"]) + remove("foo") + remove(foo.outs[0].cache_path) + + assert dvc.reproduce(pull=True) + + +def test_repro_pulls_mising_import(tmp_dir, dvc, mocker, erepo_dir, local_remote): + with erepo_dir.chdir(): + erepo_dir.dvc_gen("foo", "foo", commit="first") + + foo_import = dvc.imp(os.fspath(erepo_dir), "foo") + + dvc.push() + + dvc.stage.add(name="copy-foo", cmd="cp foo bar", deps=["foo"], outs=["bar"]) + remove("foo") + remove(foo_import.outs[0].cache_path) + + assert dvc.reproduce(pull=True) + + +def test_repro_pulls_continue_without_run_cache(tmp_dir, dvc, mocker, local_remote): + (foo,) = tmp_dir.dvc_gen("foo", "foo") + + dvc.push() + mocker.patch.object( + dvc.stage_cache, "pull", side_effect=RunCacheNotSupported("foo") + ) + dvc.stage.add(name="copy-foo", cmd="cp foo bar", deps=["foo"], outs=["bar"]) + remove("foo") + remove(foo.outs[0].cache_path) + + assert dvc.reproduce(pull=True) + + +def test_repro_skip_pull_if_no_run_cache_is_passed(tmp_dir, dvc, mocker, local_remote): + (foo,) = tmp_dir.dvc_gen("foo", "foo") + + dvc.push() + spy_pull = mocker.spy(dvc.stage_cache, "pull") + dvc.stage.add(name="copy-foo", cmd="cp foo bar", deps=["foo"], outs=["bar"]) + remove("foo") + remove(foo.outs[0].cache_path) + + assert dvc.reproduce(pull=True, run_cache=False) + assert not spy_pull.called