diff --git a/dvc/dependency/param.py b/dvc/dependency/param.py index 6e01a65ffe..d982295f97 100644 --- a/dvc/dependency/param.py +++ b/dvc/dependency/param.py @@ -41,6 +41,15 @@ def __init__(self, stage, path, params): info=info, ) + def _dyn_load(self, values=None): + """Load params values dynamically.""" + if not values: + return + for param in self.params: + value = values.get(param) + if value: + self.info[param] = value + def save(self): super().save() self.info = self.save_info() diff --git a/dvc/schema.py b/dvc/schema.py index d6a24d1473..4e06197873 100644 --- a/dvc/schema.py +++ b/dvc/schema.py @@ -42,4 +42,5 @@ COMPILED_SINGLE_STAGE_SCHEMA = Schema(SINGLE_STAGE_SCHEMA) COMPILED_MULTI_STAGE_SCHEMA = Schema(MULTI_STAGE_SCHEMA) +COMPILED_LOCK_FILE_STAGE_SCHEMA = Schema(LOCK_FILE_STAGE_SCHEMA) COMPILED_LOCKFILE_SCHEMA = Schema(LOCKFILE_SCHEMA) diff --git a/dvc/serialize.py b/dvc/serialize.py index ec004e5860..09172f7765 100644 --- a/dvc/serialize.py +++ b/dvc/serialize.py @@ -90,9 +90,8 @@ def to_pipeline_file(stage: "PipelineStage"): } -def to_lockfile(stage: "PipelineStage"): +def to_single_stage_lockfile(stage: "Stage") -> dict: assert stage.cmd - assert stage.name res = OrderedDict([("cmd", stage.cmd)]) params, deps = get_params_deps(stage) @@ -112,7 +111,12 @@ def to_lockfile(stage: "PipelineStage"): if outs: res["outs"] = outs - return {stage.name: res} + return res + + +def to_lockfile(stage: "PipelineStage") -> dict: + assert stage.name + return {stage.name: to_single_stage_lockfile(stage)} def to_single_stage_file(stage: "Stage"): diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index e1c629abd8..e6afdeaa74 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -648,19 +648,31 @@ def run( else: if not dry: - if not force and not ignore_build_cache: - self.repo.stage_cache.restore(self) - - if ( + stage_cache = self.repo.stage_cache + stage_cached = ( not force and not self.is_callback and not self.always_changed and self._already_cached() - ): + ) + use_build_cache = False + if not stage_cached: + self._save_deps() + use_build_cache = ( + not force + and not ignore_build_cache + and stage_cache.is_cached(self) + ) + + if use_build_cache: + # restore stage from build cache + self.repo.stage_cache.restore(self) + stage_cached = self._outs_cached() + + if stage_cached: logger.info("Stage is cached, skipping.") self.checkout() else: - self._save_deps() logger.info("Running command:\n\t{}".format(self.cmd)) self._run() @@ -744,7 +756,13 @@ def status(self, check_updates=False): return {} def _already_cached(self): - return all(not dep.changed() for dep in self.deps) and all( + return self._deps_cached() and self._outs_cached() + + def _deps_cached(self): + return all(not dep.changed() for dep in self.deps) + + def _outs_cached(self): + return all( not out.changed_cache() if out.use_cache else not out.changed() for out in self.outs ) diff --git a/dvc/stage/cache.py b/dvc/stage/cache.py index e4ceacb2c0..a18920f473 100644 --- a/dvc/stage/cache.py +++ b/dvc/stage/cache.py @@ -2,53 +2,37 @@ import yaml import logging -from voluptuous import Schema, Required, Invalid +from voluptuous import Invalid +from dvc.schema import COMPILED_LOCK_FILE_STAGE_SCHEMA +from dvc.serialize import to_single_stage_lockfile +from dvc.stage.loader import StageLoader from dvc.utils.fs import makedirs from dvc.utils import relpath, dict_sha256 +from dvc.utils.stage import dump_stage_file logger = logging.getLogger(__name__) -SCHEMA = Schema( - { - Required("cmd"): str, - Required("deps"): {str: str}, - Required("outs"): {str: str}, - } -) - def _get_cache_hash(cache, key=False): - return dict_sha256( - { - "cmd": cache["cmd"], - "deps": cache["deps"], - "outs": list(cache["outs"].keys()) if key else cache["outs"], - } - ) + if key: + cache["outs"] = [out["path"] for out in cache.get("outs", [])] + return dict_sha256(cache) def _get_stage_hash(stage): - if not stage.cmd or not stage.deps or not stage.outs: + if not (stage.cmd and stage.deps and stage.outs): return None for dep in stage.deps: - if dep.scheme != "local" or not dep.def_path or not dep.get_checksum(): + if not (dep.scheme == "local" and dep.def_path and dep.get_checksum()): return None for out in stage.outs: if out.scheme != "local" or not out.def_path or out.persist: return None - return _get_cache_hash(_create_cache(stage), key=True) - - -def _create_cache(stage): - return { - "cmd": stage.cmd, - "deps": {dep.def_path: dep.get_checksum() for dep in stage.deps}, - "outs": {out.def_path: out.get_checksum() for out in stage.outs}, - } + return _get_cache_hash(to_single_stage_lockfile(stage), key=True) class StageCache: @@ -66,7 +50,7 @@ def _load_cache(self, key, value): try: with open(path, "r") as fobj: - return SCHEMA(yaml.safe_load(fobj)) + return COMPILED_LOCK_FILE_STAGE_SCHEMA(yaml.safe_load(fobj)) except FileNotFoundError: return None except (yaml.error.YAMLError, Invalid): @@ -95,30 +79,25 @@ def save(self, stage): if not cache_key: return - cache = _create_cache(stage) + cache = to_single_stage_lockfile(stage) cache_value = _get_cache_hash(cache) if self._load_cache(cache_key, cache_value): return # sanity check - SCHEMA(cache) + COMPILED_LOCK_FILE_STAGE_SCHEMA(cache) path = self._get_cache_path(cache_key, cache_value) dpath = os.path.dirname(path) makedirs(dpath, exist_ok=True) - with open(path, "w+") as fobj: - yaml.dump(cache, fobj) + dump_stage_file(path, cache) + + def is_cached(self, stage): + return bool(self._load(stage)) def restore(self, stage): cache = self._load(stage) if not cache: return - - deps = {dep.def_path: dep for dep in stage.deps} - for def_path, checksum in cache["deps"].items(): - deps[def_path].checksum = checksum - - outs = {out.def_path: out for out in stage.outs} - for def_path, checksum in cache["outs"].items(): - outs[def_path].checksum = checksum + StageLoader.fill_from_lock(stage, cache) diff --git a/dvc/stage/loader.py b/dvc/stage/loader.py index 02912842d5..f2e24cd695 100644 --- a/dvc/stage/loader.py +++ b/dvc/stage/loader.py @@ -50,7 +50,7 @@ def filter(self, item=None): return self.__class__(self.dvcfile, data, self.lockfile_data) @staticmethod - def _fill_lock_checksums(stage, lock_data): + def fill_from_lock(stage, lock_data): from .params import StageParams items = chain( @@ -63,6 +63,12 @@ def _fill_lock_checksums(stage, lock_data): for key in [StageParams.PARAM_DEPS, StageParams.PARAM_OUTS] } for key, item in items: + if isinstance(item, ParamsDependency): + # load the params with values inside lock dynamically + params = lock_data.get("params", {}).get(item.def_path, {}) + item._dyn_load(params) + continue + item.checksum = ( checksums.get(key, {}) .get(item.def_path, {}) @@ -70,7 +76,7 @@ def _fill_lock_checksums(stage, lock_data): ) @classmethod - def _load_params(cls, stage, pipeline_params, lock_params=None): + def _load_params(cls, stage, pipeline_params): """ File in pipeline file is expected to be in following format: ``` @@ -94,11 +100,6 @@ def _load_params(cls, stage, pipeline_params, lock_params=None): - 15000 - 123 ``` - - So, here, we merge these two formats into one (ignoring one's only - specified on lockfile but missing on pipeline file), and load the - `ParamsDependency` for the given stage. - In the list of `params` inside pipeline file, if any of the item is dict-like, the key will be treated as separate params file and it's values to be part of that params file, else, the item is considered @@ -106,21 +107,18 @@ def _load_params(cls, stage, pipeline_params, lock_params=None): (From example above: `lr` is considered to be part of `params.yaml` whereas `process.bow` to be part of `params2.yaml`.) - """ - res = defaultdict(lambda: defaultdict(dict)) - lock_params = lock_params or {} - - def get_value(file, param): - return lock_params.get(file, {}).get(param) + We only load the keys here, lockfile bears the values which are used + to compare between the actual params from the file in the workspace. + """ + res = defaultdict(list) for key in pipeline_params: if isinstance(key, str): path = DEFAULT_PARAMS_FILE - res[path][key] = get_value(path, key) + res[path].append(key) elif isinstance(key, dict): path = first(key) - for k in key[path]: - res[path][k] = get_value(path, k) + res[path].extend(key[path]) stage.deps += dependency.loadd_from( stage, @@ -139,12 +137,12 @@ def load_stage(cls, dvcfile, name, stage_data, lock_data): params = stage_data.pop("params", {}) stage._fill_stage_dependencies(**stage_data) stage._fill_stage_outputs(**stage_data) - cls._load_params(stage, params, lock_data.get("params")) + cls._load_params(stage, params) if lock_data: stage.cmd_changed = lock_data.get( Stage.PARAM_CMD ) != stage_data.get(Stage.PARAM_CMD) - cls._fill_lock_checksums(stage, lock_data) + cls.fill_from_lock(stage, lock_data) return stage diff --git a/tests/func/test_lockfile.py b/tests/func/test_lockfile.py index f91f03c188..909ed299d9 100644 --- a/tests/func/test_lockfile.py +++ b/tests/func/test_lockfile.py @@ -1,4 +1,3 @@ -import os from collections import OrderedDict from operator import itemgetter from textwrap import dedent @@ -175,8 +174,11 @@ def test_params_dump(tmp_dir, dvc, run_head): assert not dvc.reproduce(stage.addressing) (tmp_dir / PIPELINE_LOCK).unlink() - # XXX: temporary workaround due to lack of params support in build cache - remove(os.path.join(dvc.cache.local.cache_dir, "stages")) + assert dvc.reproduce(stage.addressing) == [stage] + assert_eq_lockfile(initial_content, read_lock_file()) + # remove build-cache and check if the same structure is built + for item in [dvc.stage_cache.cache_dir, PIPELINE_LOCK]: + remove(item) assert dvc.reproduce(stage.addressing) == [stage] assert_eq_lockfile(initial_content, read_lock_file()) diff --git a/tests/unit/test_stage.py b/tests/unit/test_stage.py index 982d549a35..c7d41a6ef0 100644 --- a/tests/unit/test_stage.py +++ b/tests/unit/test_stage.py @@ -116,12 +116,12 @@ def test_stage_cache(tmp_dir, dvc, run_copy, mocker): cache_dir = os.path.join( dvc.stage_cache.cache_dir, - "ec", - "ec5b6d8dea9136dbb62d93a95c777f87e6c54b0a6bee839554acb99fdf23d2b1", + "75", + "75f8a9097d76293ff4b3684d52e4ad0e83686d31196f27eb0b2ea9fd5085565e", ) cache_file = os.path.join( cache_dir, - "09f9eb17fdb1ee7f8566b3c57394cee060eaf28075244bc6058612ac91fdf04a", + "c1747e52065bc7801262fdaed4d63f5775e5da304008bd35e2fea4e6b1ccb272", ) assert os.path.isdir(cache_dir)