Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 2 additions & 18 deletions dvc/data_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,7 @@ def _init_remote(self, name):
return Remote(self.repo, name=name)

def push(
self,
cache,
jobs=None,
remote=None,
show_checksums=False,
run_cache=False,
self, cache, jobs=None, remote=None, show_checksums=False,
):
"""Push data items in a cloud-agnostic way.

Expand All @@ -67,20 +62,12 @@ def push(
"""
remote = self.get_remote(remote, "push")

if run_cache:
self.repo.stage_cache.push(remote)

return self.repo.cache.local.push(
cache, jobs=jobs, remote=remote, show_checksums=show_checksums,
)

def pull(
self,
cache,
jobs=None,
remote=None,
show_checksums=False,
run_cache=False,
self, cache, jobs=None, remote=None, show_checksums=False,
):
"""Pull data items in a cloud-agnostic way.

Expand All @@ -94,9 +81,6 @@ def pull(
"""
remote = self.get_remote(remote, "pull")

if run_cache:
self.repo.stage_cache.pull(remote)

downloaded_items_num = self.repo.cache.local.pull(
cache, jobs=jobs, remote=remote, show_checksums=show_checksums
)
Expand Down
7 changes: 7 additions & 0 deletions dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ def used_cache(
force=False,
jobs=None,
recursive=False,
used_run_cache=None,
):
"""Get the stages related to the given target and collect
the `info` of its outputs.
Expand Down Expand Up @@ -291,6 +292,12 @@ def used_cache(
)
cache.update(used_cache, suffix=suffix)

if used_run_cache:
used_cache = self.stage_cache.get_used_cache(
used_run_cache, remote=remote, force=force, jobs=jobs,
)
cache.update(used_cache)

return cache

def _collect_graph(self, stages):
Expand Down
10 changes: 5 additions & 5 deletions dvc/repo/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ def _fetch(
config.NoRemoteError: thrown when downloading only local files and no
remote is configured
"""

used_run_cache = self.stage_cache.pull(remote) if run_cache else []
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shame NamedCache relies on checksums so heavily(or rather push/pull itself), otherwise we could've fed stage_cache to it directly. But for now, this works fine.


used = self.used_cache(
targets,
all_branches=all_branches,
Expand All @@ -44,18 +47,15 @@ def _fetch(
remote=remote,
jobs=jobs,
recursive=recursive,
used_run_cache=used_run_cache,
)

downloaded = 0
failed = 0

try:
downloaded += self.cloud.pull(
used,
jobs,
remote=remote,
show_checksums=show_checksums,
run_cache=run_cache,
used, jobs, remote=remote, show_checksums=show_checksums,
)
except NoRemoteError:
if not used.external and used["local"]:
Expand Down
5 changes: 4 additions & 1 deletion dvc/repo/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ def push(
all_commits=False,
run_cache=False,
):
used_run_cache = self.stage_cache.push(remote) if run_cache else []

used = self.used_cache(
targets,
all_branches=all_branches,
Expand All @@ -24,6 +26,7 @@ def push(
remote=remote,
jobs=jobs,
recursive=recursive,
used_run_cache=used_run_cache,
)

return self.cloud.push(used, jobs, remote=remote, run_cache=run_cache)
return self.cloud.push(used, jobs, remote=remote)
31 changes: 30 additions & 1 deletion dvc/stage/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,11 @@ def restore(self, stage):

@staticmethod
def _transfer(func, from_remote, to_remote):
ret = []

runs = from_remote.path_info / "runs"
if not from_remote.exists(runs):
return
return []

for src in from_remote.walk_files(runs):
rel = src.relative_to(from_remote.path_info)
Expand All @@ -118,9 +120,36 @@ def _transfer(func, from_remote, to_remote):
if to_remote.exists(key) and first(to_remote.walk_files(key)):
continue
func(src, dst)
ret.append((src.parent.name, src.name))

return ret

def push(self, remote):
remote = self.repo.cloud.get_remote(remote)
return self._transfer(remote.upload, self.repo.cache.local, remote)

def pull(self, remote):
remote = self.repo.cloud.get_remote(remote)
return self._transfer(remote.download, remote, self.repo.cache.local)

def get_used_cache(self, used_run_cache, *args, **kwargs):
from dvc.cache import NamedCache
from dvc.stage import create_stage, PipelineStage

cache = NamedCache()

for key, value in used_run_cache:
entry = self._load_cache(key, value)
if not entry:
continue
stage = create_stage(
Copy link
Contributor Author

@efiop efiop May 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this is hacky. But run cache has a very good control over what it can contain (see _get_stage_hash above), so we can do such things without any problems for now.

PipelineStage,
repo=self.repo,
path="dvc.yaml",
cmd=entry["cmd"],
deps=[dep["path"] for dep in entry["deps"]],
outs=[out["path"] for out in entry["outs"]],
)
StageLoader.fill_from_lock(stage, entry)
cache.update(stage.get_used_cache(*args, **kwargs))
return cache