Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions dvc/dependency/param.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ def __init__(self, stage, path, params):
info=info,
)

def _dyn_load(self, values=None):
"""Load params values dynamically."""
if not values:
return
for param in self.params:
value = values.get(param)
if value:
self.info[param] = value

def save(self):
super().save()
self.info = self.save_info()
Expand Down
1 change: 1 addition & 0 deletions dvc/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@

COMPILED_SINGLE_STAGE_SCHEMA = Schema(SINGLE_STAGE_SCHEMA)
COMPILED_MULTI_STAGE_SCHEMA = Schema(MULTI_STAGE_SCHEMA)
COMPILED_LOCK_FILE_STAGE_SCHEMA = Schema(LOCK_FILE_STAGE_SCHEMA)
COMPILED_LOCKFILE_SCHEMA = Schema(LOCKFILE_SCHEMA)
10 changes: 7 additions & 3 deletions dvc/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,8 @@ def to_pipeline_file(stage: "PipelineStage"):
}


def to_lockfile(stage: "PipelineStage"):
def to_single_stage_lockfile(stage: "Stage") -> dict:
assert stage.cmd
assert stage.name

res = OrderedDict([("cmd", stage.cmd)])
params, deps = get_params_deps(stage)
Expand All @@ -112,7 +111,12 @@ def to_lockfile(stage: "PipelineStage"):
if outs:
res["outs"] = outs

return {stage.name: res}
return res


def to_lockfile(stage: "PipelineStage") -> dict:
assert stage.name
return {stage.name: to_single_stage_lockfile(stage)}


def to_single_stage_file(stage: "Stage"):
Expand Down
32 changes: 25 additions & 7 deletions dvc/stage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,19 +648,31 @@ def run(

else:
if not dry:
if not force and not ignore_build_cache:
self.repo.stage_cache.restore(self)

if (
stage_cache = self.repo.stage_cache
stage_cached = (
not force
and not self.is_callback
and not self.always_changed
and self._already_cached()
):
)
use_build_cache = False
if not stage_cached:
self._save_deps()
use_build_cache = (
not force
and not ignore_build_cache
and stage_cache.is_cached(self)
)

if use_build_cache:
# restore stage from build cache
self.repo.stage_cache.restore(self)
stage_cached = self._outs_cached()

if stage_cached:
logger.info("Stage is cached, skipping.")
self.checkout()
else:
self._save_deps()
logger.info("Running command:\n\t{}".format(self.cmd))
self._run()

Expand Down Expand Up @@ -744,7 +756,13 @@ def status(self, check_updates=False):
return {}

def _already_cached(self):
return all(not dep.changed() for dep in self.deps) and all(
return self._deps_cached() and self._outs_cached()

def _deps_cached(self):
return all(not dep.changed() for dep in self.deps)

def _outs_cached(self):
return all(
not out.changed_cache() if out.use_cache else not out.changed()
for out in self.outs
)
Expand Down
59 changes: 19 additions & 40 deletions dvc/stage/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,37 @@
import yaml
import logging

from voluptuous import Schema, Required, Invalid
from voluptuous import Invalid

from dvc.schema import COMPILED_LOCK_FILE_STAGE_SCHEMA
from dvc.serialize import to_single_stage_lockfile
from dvc.stage.loader import StageLoader
from dvc.utils.fs import makedirs
from dvc.utils import relpath, dict_sha256
from dvc.utils.stage import dump_stage_file

logger = logging.getLogger(__name__)

SCHEMA = Schema(
{
Required("cmd"): str,
Required("deps"): {str: str},
Required("outs"): {str: str},
}
)


def _get_cache_hash(cache, key=False):
return dict_sha256(
{
"cmd": cache["cmd"],
"deps": cache["deps"],
"outs": list(cache["outs"].keys()) if key else cache["outs"],
}
)
if key:
cache["outs"] = [out["path"] for out in cache.get("outs", [])]
return dict_sha256(cache)


def _get_stage_hash(stage):
if not stage.cmd or not stage.deps or not stage.outs:
if not (stage.cmd and stage.deps and stage.outs):
return None

for dep in stage.deps:
if dep.scheme != "local" or not dep.def_path or not dep.get_checksum():
if not (dep.scheme == "local" and dep.def_path and dep.get_checksum()):
return None

for out in stage.outs:
if out.scheme != "local" or not out.def_path or out.persist:
return None

return _get_cache_hash(_create_cache(stage), key=True)


def _create_cache(stage):
return {
"cmd": stage.cmd,
"deps": {dep.def_path: dep.get_checksum() for dep in stage.deps},
"outs": {out.def_path: out.get_checksum() for out in stage.outs},
}
return _get_cache_hash(to_single_stage_lockfile(stage), key=True)


class StageCache:
Expand All @@ -66,7 +50,7 @@ def _load_cache(self, key, value):

try:
with open(path, "r") as fobj:
return SCHEMA(yaml.safe_load(fobj))
return COMPILED_LOCK_FILE_STAGE_SCHEMA(yaml.safe_load(fobj))
except FileNotFoundError:
return None
except (yaml.error.YAMLError, Invalid):
Expand Down Expand Up @@ -95,30 +79,25 @@ def save(self, stage):
if not cache_key:
return

cache = _create_cache(stage)
cache = to_single_stage_lockfile(stage)
cache_value = _get_cache_hash(cache)

if self._load_cache(cache_key, cache_value):
return

# sanity check
SCHEMA(cache)
COMPILED_LOCK_FILE_STAGE_SCHEMA(cache)

path = self._get_cache_path(cache_key, cache_value)
dpath = os.path.dirname(path)
makedirs(dpath, exist_ok=True)
with open(path, "w+") as fobj:
yaml.dump(cache, fobj)
dump_stage_file(path, cache)

def is_cached(self, stage):
return bool(self._load(stage))

def restore(self, stage):
cache = self._load(stage)
if not cache:
return

deps = {dep.def_path: dep for dep in stage.deps}
for def_path, checksum in cache["deps"].items():
deps[def_path].checksum = checksum

outs = {out.def_path: out for out in stage.outs}
for def_path, checksum in cache["outs"].items():
outs[def_path].checksum = checksum
StageLoader.fill_from_lock(stage, cache)
34 changes: 16 additions & 18 deletions dvc/stage/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def filter(self, item=None):
return self.__class__(self.dvcfile, data, self.lockfile_data)

@staticmethod
def _fill_lock_checksums(stage, lock_data):
def fill_from_lock(stage, lock_data):
from .params import StageParams

items = chain(
Expand All @@ -63,14 +63,20 @@ def _fill_lock_checksums(stage, lock_data):
for key in [StageParams.PARAM_DEPS, StageParams.PARAM_OUTS]
}
for key, item in items:
if isinstance(item, ParamsDependency):
# load the params with values inside lock dynamically
params = lock_data.get("params", {}).get(item.def_path, {})
item._dyn_load(params)
continue

item.checksum = (
checksums.get(key, {})
.get(item.def_path, {})
.get(item.checksum_type)
)

@classmethod
def _load_params(cls, stage, pipeline_params, lock_params=None):
def _load_params(cls, stage, pipeline_params):
"""
File in pipeline file is expected to be in following format:
```
Expand All @@ -94,33 +100,25 @@ def _load_params(cls, stage, pipeline_params, lock_params=None):
- 15000
- 123
```

So, here, we merge these two formats into one (ignoring one's only
specified on lockfile but missing on pipeline file), and load the
`ParamsDependency` for the given stage.

In the list of `params` inside pipeline file, if any of the item is
dict-like, the key will be treated as separate params file and it's
values to be part of that params file, else, the item is considered
as part of the `params.yaml` which is a default file.

(From example above: `lr` is considered to be part of `params.yaml`
whereas `process.bow` to be part of `params2.yaml`.)
"""
res = defaultdict(lambda: defaultdict(dict))
lock_params = lock_params or {}

def get_value(file, param):
return lock_params.get(file, {}).get(param)

We only load the keys here, lockfile bears the values which are used
to compare between the actual params from the file in the workspace.
"""
res = defaultdict(list)
for key in pipeline_params:
if isinstance(key, str):
path = DEFAULT_PARAMS_FILE
res[path][key] = get_value(path, key)
res[path].append(key)
elif isinstance(key, dict):
path = first(key)
for k in key[path]:
res[path][k] = get_value(path, k)
res[path].extend(key[path])

stage.deps += dependency.loadd_from(
stage,
Expand All @@ -139,12 +137,12 @@ def load_stage(cls, dvcfile, name, stage_data, lock_data):
params = stage_data.pop("params", {})
stage._fill_stage_dependencies(**stage_data)
stage._fill_stage_outputs(**stage_data)
cls._load_params(stage, params, lock_data.get("params"))
cls._load_params(stage, params)
if lock_data:
stage.cmd_changed = lock_data.get(
Stage.PARAM_CMD
) != stage_data.get(Stage.PARAM_CMD)
cls._fill_lock_checksums(stage, lock_data)
cls.fill_from_lock(stage, lock_data)

return stage

Expand Down
8 changes: 5 additions & 3 deletions tests/func/test_lockfile.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
from collections import OrderedDict
from operator import itemgetter
from textwrap import dedent
Expand Down Expand Up @@ -175,8 +174,11 @@ def test_params_dump(tmp_dir, dvc, run_head):
assert not dvc.reproduce(stage.addressing)

(tmp_dir / PIPELINE_LOCK).unlink()
# XXX: temporary workaround due to lack of params support in build cache
remove(os.path.join(dvc.cache.local.cache_dir, "stages"))
assert dvc.reproduce(stage.addressing) == [stage]
assert_eq_lockfile(initial_content, read_lock_file())

# remove build-cache and check if the same structure is built
for item in [dvc.stage_cache.cache_dir, PIPELINE_LOCK]:
remove(item)
assert dvc.reproduce(stage.addressing) == [stage]
assert_eq_lockfile(initial_content, read_lock_file())
6 changes: 3 additions & 3 deletions tests/unit/test_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ def test_stage_cache(tmp_dir, dvc, run_copy, mocker):

cache_dir = os.path.join(
dvc.stage_cache.cache_dir,
"ec",
"ec5b6d8dea9136dbb62d93a95c777f87e6c54b0a6bee839554acb99fdf23d2b1",
"75",
"75f8a9097d76293ff4b3684d52e4ad0e83686d31196f27eb0b2ea9fd5085565e",
)
cache_file = os.path.join(
cache_dir,
"09f9eb17fdb1ee7f8566b3c57394cee060eaf28075244bc6058612ac91fdf04a",
"c1747e52065bc7801262fdaed4d63f5775e5da304008bd35e2fea4e6b1ccb272",
)

assert os.path.isdir(cache_dir)
Expand Down