From 381346492dcb1eb2e0e4c1583e76b4c0350c02fc Mon Sep 17 00:00:00 2001 From: Ruslan Kuprieiev Date: Tue, 21 Apr 2020 03:05:14 +0300 Subject: [PATCH 1/2] dvc: introduce local build cache This patch introduces `.dvc/cache/stages` that is used to store previous runs and their results, which could then be reused later when we stumble upon the same command with the same deps and outs. Format of build cache entries is single-line json, which is readable by humans and might also be used for lock files discussed in #1871. Related to #1871 Local part of #1234 --- dvc/output/base.py | 5 +- dvc/repo/reproduce.py | 3 +- dvc/repo/run.py | 5 +- dvc/stage/__init__.py | 14 ++++-- dvc/stage/cache.py | 75 +++++++++++++++++++++++++++++ tests/func/__pycache__/tmpvpl_3i8b | Bin 0 -> 1955 bytes tests/func/test_repro.py | 4 +- 7 files changed, 98 insertions(+), 8 deletions(-) create mode 100644 dvc/stage/cache.py create mode 100644 tests/func/__pycache__/tmpvpl_3i8b diff --git a/dvc/output/base.py b/dvc/output/base.py index 2e555119cb..87cfd224c3 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -155,6 +155,9 @@ def checksum(self): def checksum(self, checksum): self.info[self.remote.PARAM_CHECKSUM] = checksum + def get_checksum(self): + return self.remote.get_checksum(self.path_info) + @property def is_dir_checksum(self): return self.remote.is_dir_checksum(self.checksum) @@ -167,7 +170,7 @@ def save_info(self): return self.remote.save_info(self.path_info) def changed_checksum(self): - return self.checksum != self.remote.get_checksum(self.path_info) + return self.checksum != self.get_checksum() def changed_cache(self, filter_info=None): if not self.use_cache or not self.checksum: diff --git a/dvc/repo/reproduce.py b/dvc/repo/reproduce.py index 8aa95af4a2..4d2fa5dde1 100644 --- a/dvc/repo/reproduce.py +++ b/dvc/repo/reproduce.py @@ -102,7 +102,6 @@ def _reproduce_stages( G, stages, downstream=False, - ignore_build_cache=False, single_item=False, **kwargs ): @@ -172,7 +171,7 @@ def _reproduce_stages( try: ret = _reproduce_stage(stage, **kwargs) - if len(ret) != 0 and ignore_build_cache: + if len(ret) != 0 and kwargs.get("ignore_build_cache", False): # NOTE: we are walking our pipeline from the top to the # bottom. If one stage is changed, it will be reproduced, # which tells us that we should force reproducing all of diff --git a/dvc/repo/run.py b/dvc/repo/run.py index 806cc8da65..7de4c4c698 100644 --- a/dvc/repo/run.py +++ b/dvc/repo/run.py @@ -57,6 +57,9 @@ def run(self, fname=None, no_exec=False, **kwargs): self.check_modified_graph([stage], self.pipeline_stages) if not no_exec: - stage.run(no_commit=kwargs.get("no_commit", False)) + stage.run( + no_commit=kwargs.get("no_commit", False), + ignore_build_cache=kwargs.get("ignore_build_cache", False), + ) dvcfile.dump(stage, update_dvcfile=True) return stage diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index 1120772ed6..17989497ab 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -24,7 +24,7 @@ MissingDep, MissingDataSource, ) -from . import params +from . import params, cache as stage_cache from dvc.utils import dict_md5 from dvc.utils import fix_env from dvc.utils import relpath @@ -482,6 +482,8 @@ def save(self): self.md5 = self._compute_md5() + stage_cache.save(self) + @staticmethod def _changed_entries(entries): return [ @@ -608,7 +610,9 @@ def _run(self): raise StageCmdFailedError(self) @rwlocked(read=["deps"], write=["outs"]) - def run(self, dry=False, no_commit=False, force=False): + def run( + self, dry=False, no_commit=False, force=False, ignore_build_cache=False + ): if (self.cmd or self.is_import) and not self.locked and not dry: self.remove_outs(ignore_remove=False, force=False) @@ -643,16 +647,20 @@ def run(self, dry=False, no_commit=False, force=False): self.check_missing_outputs() else: - logger.info("Running command:\n\t{}".format(self.cmd)) if not dry: + if not force and not ignore_build_cache: + stage_cache.restore(self) + if ( not force and not self.is_callback and not self.always_changed and self._already_cached() ): + logger.info("Stage is cached, skipping.") self.checkout() else: + logger.info("Running command:\n\t{}".format(self.cmd)) self._run() if not dry: diff --git a/dvc/stage/cache.py b/dvc/stage/cache.py new file mode 100644 index 0000000000..6f567a847f --- /dev/null +++ b/dvc/stage/cache.py @@ -0,0 +1,75 @@ +import os +import json +import hashlib + +from dvc.utils.fs import makedirs + + +def _sha256(string): + return hashlib.sha256(string.encode()).hexdigest() + + +def _get_hash(stage): + if not stage.cmd or not stage.deps or not stage.outs: + return None + + string = _sha256(stage.cmd) + for dep in stage.deps: + if not dep.def_path or not dep.get_checksum(): + return None + + string += _sha256(dep.def_path) + string += _sha256(dep.get_checksum()) + + for out in stage.outs: + if not out.def_path or out.persist: + return None + + string += _sha256(out.def_path) + + return _sha256(string) + + +def _get_cache(stage): + return { + "cmd": stage.cmd, + "deps": {dep.def_path: dep.get_checksum() for dep in stage.deps}, + "outs": {out.def_path: out.get_checksum() for out in stage.outs}, + } + + +def _get_cache_path(stage): + sha = _get_hash(stage) + if not sha: + return None + + cache_dir = os.path.join(stage.repo.cache.local.cache_dir, "stages") + + return os.path.join(cache_dir, sha[:2], sha) + + +def save(stage): + path = _get_cache_path(stage) + if not path or os.path.exists(path): + return + + dpath = os.path.dirname(path) + makedirs(dpath, exist_ok=True) + with open(path, "w+") as fobj: + json.dump(_get_cache(stage), fobj) + + +def restore(stage): + path = _get_cache_path(stage) + if not path or not os.path.exists(path): + return + + with open(path, "r") as fobj: + cache = json.load(fobj) + + outs = {out.def_path: out for out in stage.outs} + for def_path, checksum in cache["outs"].items(): + outs[def_path].checksum = checksum + + for dep in stage.deps: + dep.save() diff --git a/tests/func/__pycache__/tmpvpl_3i8b b/tests/func/__pycache__/tmpvpl_3i8b new file mode 100644 index 0000000000000000000000000000000000000000..7bc8bc6420380458a07651d5a8ed3c473b9228a2 GIT binary patch literal 1955 zcmb7F&5s*36t`z4lgZ>`cl)ul1yxZ|QAa9Cv&&MbSk!{bjiqAChoXwnGX_l(R2zSgf9-bwtl z-me2+VJ@pOkJXsZ>a4+&`sD6@_~ETPz*qKFG~$6ZTbJ86y50NJ!2ws@NuJ`vG7zdRAYT#jAm3N16!gVFd z2T^h?zNhkgC>AwvA+WTwGMTv@vt+4*=_+(q?YyXl`3w5iG&{)hhZ&fM2AEw2kWKU9 zScU#Rbh!yc5k(7#X@R*k?Z}1L(!IbE2o??yFwkZX2HL?5eH4G z?kHAWKy($CN_L0StYpJ{swPv_nH*{lbA8N z(2boRP_(lk1wFFnHrQ()(Yd3n!a7gpmBJ}12iDvzpym!d@hPKFd3Nf|t59K-*-0gJ z=TL=(S5%pEgnpX3`_=)O`-I9ZRekC)uW*Ny)o{0=wQ!iPYES(IIdJ7Spsy>spyCN| z4dgNlEA|BOgFz5L{A3Wo0Rk%e1n~^3VFdEJu_qvykY9b_&LJl2 zmXutmkNHCpEB;v|vN#)ow)qg)kUyFv@gP>SoyfpI*H&ZReh?*7-rmbIezgsEB$B*6 zg?_0-wPR^?muCO3M4nwl%WUlpBE?6!fSSQukwq!jZW3oa%LDq{+T9K4?DNZ4CWqGp z>3_9k+PiJ!t_AW6jyuOHV3w-hQneMxs|LFr$iSb&uf1C??!jv9sKW_2g(B)Uv$duFXh08+G2*RYt=#P;*iB%4>$Bm+#J+<;hM0zU zq!jC@^OiCxHgM|SETLf}^hO8|ND>M@5%4HYBEfY7)ZzVUoTxaH5F~tJq&^qQaXw9$ zfrg_bzaJ%`31cmk(U9W~x>aUA&a>M>FFxQ1`1F#dV(q&b@={9U|} W@--kHIYm6HL7_Po{jcUap7S^1ks~<( literal 0 HcmV?d00001 diff --git a/tests/func/test_repro.py b/tests/func/test_repro.py index eecc3e4001..901a04a961 100644 --- a/tests/func/test_repro.py +++ b/tests/func/test_repro.py @@ -1290,7 +1290,9 @@ def test(self): ["repro", self._get_stage_target(self.stage), "--no-commit"] ) self.assertEqual(ret, 0) - self.assertFalse(os.path.exists(self.dvc.cache.local.cache_dir)) + self.assertEqual( + os.listdir(self.dvc.cache.local.cache_dir), ["stages"] + ) class TestReproAlreadyCached(TestRepro): From a3fc0b0dd47953464848c7ad5b2a092d4ec66ed9 Mon Sep 17 00:00:00 2001 From: "Restyled.io" Date: Wed, 22 Apr 2020 15:49:55 +0000 Subject: [PATCH 2/2] Restyled by black --- dvc/repo/reproduce.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/dvc/repo/reproduce.py b/dvc/repo/reproduce.py index 4d2fa5dde1..0436a697bf 100644 --- a/dvc/repo/reproduce.py +++ b/dvc/repo/reproduce.py @@ -99,11 +99,7 @@ def reproduce( def _reproduce_stages( - G, - stages, - downstream=False, - single_item=False, - **kwargs + G, stages, downstream=False, single_item=False, **kwargs ): r"""Derive the evaluation of the given node for the given graph.