From 14436269d89e8c1ea2d5f594b3ff275f98da43eb Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Mon, 13 Apr 2020 16:46:10 +0900 Subject: [PATCH 01/12] repo: separate dir cache and file cache in memory - `used_cache()`/`get_used_cache()` in repo/stage/output now return tuples of (dir_cache, file_cache) instead of one flat/merged cache --- dvc/data_cloud.py | 46 ++++++++++++++++++++++++++----------------- dvc/external_repo.py | 2 +- dvc/output/base.py | 21 ++++++++++++-------- dvc/remote/local.py | 12 +++++------ dvc/repo/__init__.py | 29 ++++++++++++++++++++------- dvc/repo/fetch.py | 41 ++++++++++++++++++++++++++++---------- dvc/repo/gc.py | 5 ++--- dvc/stage/__init__.py | 7 +++---- 8 files changed, 106 insertions(+), 57 deletions(-) diff --git a/dvc/data_cloud.py b/dvc/data_cloud.py index ea375b65ce..a3976f1452 100644 --- a/dvc/data_cloud.py +++ b/dvc/data_cloud.py @@ -48,11 +48,12 @@ def get_remote(self, remote=None, command=""): def _init_remote(self, remote): return Remote(self.repo, name=remote) - def push(self, cache, jobs=None, remote=None, show_checksums=False): + def push(self, caches, jobs=None, remote=None, show_checksums=False): """Push data items in a cloud-agnostic way. Args: - cache (NamedCache): named checksums to push to the cloud. + caches (list): list of (dir_cache, file_cache) tuples containing + named checksums to push to the cloud. jobs (int): number of jobs that can be running simultaneously. remote (dvc.remote.base.RemoteBASE): optional remote to push to. By default remote from core.remote config option is used. @@ -60,17 +61,18 @@ def push(self, cache, jobs=None, remote=None, show_checksums=False): information messages. """ return self.repo.cache.local.push( - cache, + caches, jobs=jobs, remote=self.get_remote(remote, "push"), show_checksums=show_checksums, ) - def pull(self, cache, jobs=None, remote=None, show_checksums=False): + def pull(self, caches, jobs=None, remote=None, show_checksums=False): """Pull data items in a cloud-agnostic way. Args: - cache (NamedCache): named checksums to pull from the cloud. + caches (list): list of (dir_cache, file_cache) tuples containing + named checksums to pull from the cloud. jobs (int): number of jobs that can be running simultaneously. remote (dvc.remote.base.RemoteBASE): optional remote to pull from. By default remote from core.remote config option is used. @@ -79,28 +81,36 @@ def pull(self, cache, jobs=None, remote=None, show_checksums=False): """ remote = self.get_remote(remote, "pull") downloaded_items_num = self.repo.cache.local.pull( - cache, jobs=jobs, remote=remote, show_checksums=show_checksums + caches, jobs=jobs, remote=remote, show_checksums=show_checksums ) if not remote.verify: - self._save_pulled_checksums(cache) + self._save_pulled_checksums(caches) return downloaded_items_num def _save_pulled_checksums(self, cache): - for checksum in cache["local"].keys(): - cache_file = self.repo.cache.local.checksum_to_path_info(checksum) - if self.repo.cache.local.exists(cache_file): - # We can safely save here, as existing corrupted files will be - # removed upon status, while files corrupted during download - # will not be moved from tmp_file (see `RemoteBASE.download()`) - self.repo.state.save(cache_file, checksum) - - def status(self, cache, jobs=None, remote=None, show_checksums=False): + for dir_cache, file_cache in cache: + checksums = set(file_cache["local"].keys()) + if dir_cache is not None: + checksums.update(dir_cache["local"].keys()) + for checksum in checksums: + cache_file = self.repo.cache.local.checksum_to_path_info( + checksum + ) + if self.repo.cache.local.exists(cache_file): + # We can safely save here, as existing corrupted files will + # be removed upon status, while files corrupted during + # download will not be moved from tmp_file + # (see `RemoteBASE.download()`) + self.repo.state.save(cache_file, checksum) + + def status(self, caches, jobs=None, remote=None, show_checksums=False): """Check status of data items in a cloud-agnostic way. Args: - cache (NamedCache): named checksums to check status for. + caches (list): list of (dir_cache, file_cache) tuples containg + named checksums to check status for. jobs (int): number of jobs that can be running simultaneously. remote (dvc.remote.base.RemoteBASE): optional remote to compare cache to. By default remote from core.remote config option @@ -110,5 +120,5 @@ def status(self, cache, jobs=None, remote=None, show_checksums=False): """ remote = self.get_remote(remote, "status") return self.repo.cache.local.status( - cache, jobs=jobs, remote=remote, show_checksums=show_checksums + caches, jobs=jobs, remote=remote, show_checksums=show_checksums ) diff --git a/dvc/external_repo.py b/dvc/external_repo.py index a598eb6692..142fcfc9d2 100644 --- a/dvc/external_repo.py +++ b/dvc/external_repo.py @@ -104,7 +104,7 @@ def _pull_cached(self, out, path_info, dest): # Only pull unless all needed cache is present if out.changed_cache(filter_info=src): - self.cloud.pull(out.get_used_cache(filter_info=src)) + self.cloud.pull([out.get_used_cache(filter_info=src)]) try: out.checkout(filter_info=src) diff --git a/dvc/output/base.py b/dvc/output/base.py index 879dd9a713..345e055af0 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -363,8 +363,9 @@ def _collect_used_dir_cache( if self.cache.changed_cache_file(self.checksum): try: + cache = NamedCache.make("local", self.checksum, str(self)) self.repo.cloud.pull( - NamedCache.make("local", self.checksum, str(self)), + [(None, cache)], jobs=jobs, remote=remote, show_checksums=False, @@ -401,16 +402,22 @@ def get_used_cache(self, **kwargs): In case that the given output is a directory, it will also include the `info` of its files. + + Returns: + 2-tuple of NamedCache objects in the form of + (directory `info`, file `info`). + If the given output is not a directory, the first tuple entry will + be None. """ if not self.use_cache: - return NamedCache() + return None, NamedCache() if self.stage.is_repo_import: cache = NamedCache() (dep,) = self.stage.deps cache.external[dep.repo_pair].add(dep.def_path) - return cache + return None, cache if not self.checksum: msg = ( @@ -429,16 +436,14 @@ def get_used_cache(self, **kwargs): ) ) logger.warning(msg) - return NamedCache() + return None, NamedCache() ret = NamedCache.make(self.scheme, self.checksum, str(self)) if not self.is_dir_checksum: - return ret + return None, ret - ret.update(self._collect_used_dir_cache(**kwargs)) - - return ret + return ret, self._collect_used_dir_cache(**kwargs) @classmethod def _validate_output_path(cls, path): diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 7643c3556b..3df92a034f 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -320,7 +320,7 @@ def _get_plans(self, download, remote, status_info, status): def _process( self, - named_cache, + named_caches, remote, jobs=None, show_checksums=False, @@ -348,7 +348,7 @@ def _process( jobs = remote.JOBS status_info = self.status( - named_cache, + named_caches, remote, jobs=jobs, show_checksums=show_checksums, @@ -373,18 +373,18 @@ def _process( return len(plans[0]) - def push(self, named_cache, remote, jobs=None, show_checksums=False): + def push(self, named_caches, remote, jobs=None, show_checksums=False): return self._process( - named_cache, + named_caches, remote, jobs=jobs, show_checksums=show_checksums, download=False, ) - def pull(self, named_cache, remote, jobs=None, show_checksums=False): + def pull(self, named_caches, remote, jobs=None, show_checksums=False): return self._process( - named_cache, + named_caches, remote, jobs=jobs, show_checksums=show_checksums, diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index 8d50c54ded..657e45054c 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -253,12 +253,18 @@ def used_cache( `all_branches`/`all_tags`/`all_commits` to expand the scope. Returns: - A dictionary with Schemes (representing output's location) as keys, + A list of 2-tuples in the form (dir_cache, file_cache). + Each NamedCache object is a dictionary with Schemes + (representing output's location) as keys, and a list with the outputs' `dumpd` as values. + If the given output is not a directory, the first tuple entry + will be None. """ from dvc.cache import NamedCache - cache = NamedCache() + used_caches = [] + # group together file caches which do not have an associated directory + file_caches = NamedCache() for branch in self.brancher( all_branches=all_branches, @@ -276,15 +282,24 @@ def used_cache( suffix = "({})".format(branch) if branch else "" for stage, filter_info in pairs: - used_cache = stage.get_used_cache( + for dir_cache, file_cache in stage.get_used_cache( remote=remote, force=force, jobs=jobs, filter_info=filter_info, - ) - cache.update(used_cache, suffix=suffix) - - return cache + ): + if dir_cache is None: + file_caches.update(file_cache, suffix=suffix) + else: + used_dir = NamedCache() + used_dir.update(dir_cache, suffix=suffix) + used_file = NamedCache() + used_file.update(file_cache, suffix=suffix) + used_caches.append((used_dir, used_file)) + + if file_caches._items or file_caches.external: + used_caches.append((None, file_caches)) + return used_caches def _collect_graph(self, stages=None): """Generate a graph by using the given stages on the given directory diff --git a/dvc/repo/fetch.py b/dvc/repo/fetch.py index f77491bb79..7fdf5d6b86 100644 --- a/dvc/repo/fetch.py +++ b/dvc/repo/fetch.py @@ -1,12 +1,12 @@ import logging -from dvc.cache import NamedCache +from funcy import concat + from dvc.config import NoRemoteError from dvc.exceptions import DownloadError, OutputNotFoundError from dvc.scm.base import CloneError from dvc.path_info import PathInfo - logger = logging.getLogger(__name__) @@ -54,15 +54,34 @@ def _fetch( used, jobs, remote=remote, show_checksums=show_checksums ) except NoRemoteError: - if not used.external and used["local"]: + external = False + local = False + for dir_cache, file_cache in used: + if dir_cache: + if dir_cache.external: + external = True + if dir_cache["local"]: + local = True + if file_cache.external: + external = True + if file_cache["local"]: + local = True + if not external and local: raise except DownloadError as exc: failed += exc.amount - for (repo_url, repo_rev), files in used.external.items(): - d, f = _fetch_external(self, repo_url, repo_rev, files, jobs) - downloaded += d - failed += f + for dir_cache, file_cache in used: + if dir_cache is None: + items = file_cache.external.items() + else: + items = concat( + dir_cache.external.items(), file_cache.external.items() + ) + for (repo_url, repo_rev), files in items: + d, f = _fetch_external(self, repo_url, repo_rev, files, jobs) + downloaded += d + failed += f if failed: raise DownloadError(failed) @@ -82,7 +101,7 @@ def _fetch_external(self, repo_url, repo_rev, files, jobs): if is_dvc_repo: repo.cache.local.cache_dir = self.cache.local.cache_dir with repo.state: - cache = NamedCache() + used_cache = [] for name in files: try: out = repo.find_out_by_relpath(name) @@ -90,10 +109,12 @@ def _fetch_external(self, repo_url, repo_rev, files, jobs): # try to add to cache if they are git-tracked files git_files.append(name) else: - cache.update(out.get_used_cache()) + used_cache.append(out.get_used_cache()) try: - downloaded += repo.cloud.pull(cache, jobs=jobs) + downloaded += repo.cloud.pull( + used_cache, jobs=jobs + ) except DownloadError as exc: failed += exc.amount diff --git a/dvc/repo/gc.py b/dvc/repo/gc.py index 8f1222fd84..5cd5d2b645 100644 --- a/dvc/repo/gc.py +++ b/dvc/repo/gc.py @@ -1,7 +1,6 @@ import logging from . import locked -from dvc.cache import NamedCache from dvc.exceptions import InvalidArgumentError @@ -60,9 +59,9 @@ def gc( stack.enter_context(repo.lock) stack.enter_context(repo.state) - used = NamedCache() + used = [] for repo in all_repos + [self]: - used.update( + used.extend( repo.used_cache( all_branches=all_branches, with_deps=with_deps, diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index c2c1c1d9c6..09de5fd6f7 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -725,10 +725,9 @@ def get_all_files_number(self, filter_info=None): ) def get_used_cache(self, *args, **kwargs): - from dvc.cache import NamedCache - cache = NamedCache() + ret = [] for out in self._filter_outs(kwargs.get("filter_info")): - cache.update(out.get_used_cache(*args, **kwargs)) + ret.append(out.get_used_cache(*args, **kwargs)) - return cache + return ret From 5a77d6a130bb02b5e4e1a22a0251e2f6e57d7b18 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Mon, 13 Apr 2020 17:01:44 +0900 Subject: [PATCH 02/12] update tests for new get_used_cache behavior --- tests/func/test_data_cloud.py | 6 ++++-- tests/unit/output/test_output.py | 5 ++++- tests/unit/remote/test_local.py | 2 +- tests/unit/repo/test_repo.py | 15 +++++++++++---- 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/tests/func/test_data_cloud.py b/tests/func/test_data_cloud.py index 1bf93ab7fb..2d17f27616 100644 --- a/tests/func/test_data_cloud.py +++ b/tests/func/test_data_cloud.py @@ -112,7 +112,7 @@ def _test_cloud(self): out = stage.outs[0] cache = out.cache_path md5 = out.checksum - info = out.get_used_cache() + info = [out.get_used_cache()] stages = self.dvc.add(self.DATA_DIR) self.assertEqual(len(stages), 1) @@ -122,7 +122,9 @@ def _test_cloud(self): cache_dir = out_dir.cache_path name_dir = str(out_dir) md5_dir = out_dir.checksum - info_dir = NamedCache.make(out_dir.scheme, md5_dir, name_dir) + info_dir = [ + (NamedCache.make(out_dir.scheme, md5_dir, name_dir), NamedCache()) + ] with self.cloud.repo.state: # Check status diff --git a/tests/unit/output/test_output.py b/tests/unit/output/test_output.py index fb9325471b..42e503ec83 100644 --- a/tests/unit/output/test_output.py +++ b/tests/unit/output/test_output.py @@ -86,5 +86,8 @@ def test_get_used_cache(exists, expected_message, mocker, caplog): ).return_value = exists with caplog.at_level(logging.WARNING, logger="dvc"): - assert isinstance(output.get_used_cache(), NamedCache) + used = output.get_used_cache() + assert isinstance(used, tuple) + assert used[0] is None + assert isinstance(used[1], NamedCache) assert first(caplog.messages) == expected_message diff --git a/tests/unit/remote/test_local.py b/tests/unit/remote/test_local.py index a5d653e971..fb3478416d 100644 --- a/tests/unit/remote/test_local.py +++ b/tests/unit/remote/test_local.py @@ -26,7 +26,7 @@ def test_status_download_optimization(mocker, dvc): other_remote.url = "other_remote" other_remote.cache_exists.return_value = [] - remote.status(infos, other_remote, download=True) + remote.status([(None, infos)], other_remote, download=True) assert other_remote.cache_exists.call_count == 0 diff --git a/tests/unit/repo/test_repo.py b/tests/unit/repo/test_repo.py index 985c113ef1..447487f5b0 100644 --- a/tests/unit/repo/test_repo.py +++ b/tests/unit/repo/test_repo.py @@ -37,10 +37,10 @@ def test_used_cache(tmp_dir, dvc, path): from dvc.cache import NamedCache tmp_dir.dvc_gen({"dir": {"subdir": {"file": "file"}, "other": "other"}}) - expected = NamedCache.make( + expected_dir = NamedCache.make( "local", "70922d6bf66eb073053a82f77d58c536.dir", "dir" ) - expected.add( + expected_file = NamedCache.make( "local", "8c7dd922ad47494fc02c388e12c00eac", os.path.join("dir", "subdir", "file"), @@ -48,9 +48,16 @@ def test_used_cache(tmp_dir, dvc, path): with dvc.state: used_cache = dvc.used_cache([path]) + assert isinstance(used_cache, list) + assert len(used_cache) == 1 + assert isinstance(used_cache[0], tuple) + used_dir = used_cache[0][0] + used_file = used_cache[0][1] assert ( - used_cache._items == expected._items - and used_cache.external == expected.external + used_dir._items == expected_dir._items + and used_dir.external == expected_dir.external + and used_file._items == expected_file._items + and used_file.external == expected_file.external ) From ad68c27b1e5be60fca397da7fbb8f926191b0fab Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Mon, 13 Apr 2020 17:02:39 +0900 Subject: [PATCH 03/12] remote: if .dir checksum exists on remote, assume contents also exists - affects all commands which use `cache_exists()` (remote status) --- dvc/remote/base.py | 18 ++++++++++------ dvc/remote/local.py | 51 +++++++++++++++++++++++++++++++++++++-------- 2 files changed, 54 insertions(+), 15 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index c349a7a12b..111bc1e455 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -731,12 +731,14 @@ def all(self, jobs=None, name=None): remote_size, remote_checksums, jobs, name ) - def gc(self, named_cache, jobs=None): - logger.debug("named_cache: {} jobs: {}".format(named_cache, jobs)) - used = self.extract_used_local_checksums(named_cache) + def gc(self, named_caches, jobs=None): + used = self.extract_used_local_checksums(named_caches) if self.scheme != "": - used.update(named_cache[self.scheme]) + for dir_cache, file_cache in named_caches: + if dir_cache: + used.update(dir_cache[self.scheme]) + used.update(file_cache[self.scheme]) removed = False for checksum in self.all(jobs, str(self.path_info)): @@ -1246,8 +1248,12 @@ def unprotect(path_info): def _get_unpacked_dir_names(self, checksums): return set() - def extract_used_local_checksums(self, named_cache): - used = set(named_cache["local"]) + def extract_used_local_checksums(self, named_caches): + used = set() + for dir_cache, file_cache in named_caches: + if dir_cache: + used.update(dir_cache["local"]) + used.update(file_cache["local"]) unpacked = self._get_unpacked_dir_names(used) return used | unpacked diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 3df92a034f..9a254a9c48 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -5,8 +5,11 @@ from concurrent.futures import ThreadPoolExecutor from functools import partial +from funcy import first + from shortuuid import uuid +from dvc.cache import NamedCache from dvc.compat import fspath_py35 from dvc.exceptions import DvcException, DownloadError, UploadError from dvc.path_info import PathInfo @@ -249,7 +252,7 @@ def open(path_info, mode="r", encoding=None): def status( self, - named_cache, + named_caches, remote, jobs=None, show_checksums=False, @@ -258,28 +261,58 @@ def status( logger.debug( "Preparing to collect status from {}".format(remote.path_info) ) - md5s = list(named_cache[self.scheme]) + cache = NamedCache() + dir_contents = {} + md5s = set() + for dir_cache, file_cache in named_caches: + cache.update(file_cache) + md5s.update(file_cache[self.scheme]) + if dir_cache is not None: + cache.update(dir_cache) + dir_checksum = first(dir_cache[self.scheme].keys()) + md5s.add(dir_checksum) + dir_contents[dir_checksum] = file_cache[self.scheme].keys() logger.debug("Collecting information from local cache...") - local_exists = self.cache_exists(md5s, jobs=jobs, name=self.cache_dir) + local_exists = frozenset( + self.cache_exists(md5s, jobs=jobs, name=self.cache_dir) + ) # This is a performance optimization. We can safely assume that, # if the resources that we want to fetch are already cached, # there's no need to check the remote storage for the existence of # those files. - if download and sorted(local_exists) == sorted(md5s): + if download and local_exists == md5s: remote_exists = local_exists else: logger.debug("Collecting information from remote cache...") - remote_exists = list( - remote.cache_exists( - md5s, jobs=jobs, name=str(remote.path_info) + remote_exists = set() + dir_md5s = set(dir_contents.keys()) + if dir_md5s: + # If .dir checksum exists on the remote, assume directory + # contents also exists on the remote + for dir_checksum in remote._cache_object_exists(dir_md5s): + file_checksums = dir_contents[dir_checksum] + logger.debug( + "'{}' exists on remote, " + "assuming '{}' files also exist".format( + dir_checksum, len(file_checksums) + ) + ) + md5s.remove(dir_checksum) + remote_exists.add(dir_checksum) + md5s.difference_update(file_checksums) + remote_exists.update(file_checksums) + if md5s: + remote_exists.update( + remote.cache_exists( + md5s, jobs=jobs, name=str(remote.path_info) + ) ) - ) ret = { checksum: {"name": checksum if show_checksums else " ".join(names)} - for checksum, names in named_cache[self.scheme].items() + for checksum, names in cache[self.scheme].items() } self._fill_statuses(ret, local_exists, remote_exists) From edf2ec12fbe381a4403e356ce28fa223a51b6d7c Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Wed, 8 Apr 2020 16:02:36 +0900 Subject: [PATCH 04/12] push: only upload .dir file after full file contents has been uploaded --- dvc/remote/local.py | 128 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 105 insertions(+), 23 deletions(-) diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 9a254a9c48..39a8d73293 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -2,10 +2,10 @@ import logging import os import stat -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import as_completed, ThreadPoolExecutor from functools import partial -from funcy import first +from funcy import concat, first from shortuuid import uuid @@ -258,17 +258,43 @@ def status( show_checksums=False, download=False, ): + # Return flattened dict containing all status info + dir_status, file_status, _ = self._status( + named_caches, + remote, + jobs=jobs, + show_checksums=show_checksums, + download=download, + ) + return dict(dir_status, **file_status) + + def _status( + self, + named_caches, + remote, + jobs=None, + show_checksums=False, + download=False, + ): + """Return a tuple of (dir_status_info, file_status_info, dir_mapping). + + dir_status_info contains status for .dir files, file_status_info + contains status for all other files, and dir_mapping is a dict of + {dir_path_info: set(file_path_info...)} which can be used to map + a .dir file to its file contents. + """ logger.debug( "Preparing to collect status from {}".format(remote.path_info) ) - cache = NamedCache() + merged_dir_cache = NamedCache() + merged_file_cache = NamedCache() dir_contents = {} md5s = set() for dir_cache, file_cache in named_caches: - cache.update(file_cache) + merged_file_cache.update(file_cache) md5s.update(file_cache[self.scheme]) if dir_cache is not None: - cache.update(dir_cache) + merged_dir_cache.update(dir_cache) dir_checksum = first(dir_cache[self.scheme].keys()) md5s.add(dir_checksum) dir_contents[dir_checksum] = file_cache[self.scheme].keys() @@ -310,15 +336,27 @@ def status( ) ) - ret = { - checksum: {"name": checksum if show_checksums else " ".join(names)} - for checksum, names in cache[self.scheme].items() - } - self._fill_statuses(ret, local_exists, remote_exists) - - self._log_missing_caches(ret) - - return ret + def cache_to_dict(cache): + return { + checksum: { + "name": checksum if show_checksums else " ".join(names) + } + for checksum, names in cache[self.scheme].items() + } + + dir_status = cache_to_dict(merged_dir_cache) + file_status = cache_to_dict(merged_file_cache) + self._fill_statuses(dir_status, local_exists, remote_exists) + self._fill_statuses(file_status, local_exists, remote_exists) + + self._log_missing_caches(dict(dir_status, **file_status)) + + dir_paths = {} + for dir_checksum, file_checksums in dir_contents.items(): + dir_paths[remote.checksum_to_path_info(dir_checksum)] = frozenset( + map(remote.checksum_to_path_info, file_checksums) + ) + return dir_status, file_status, dir_paths @staticmethod def _fill_statuses(checksum_info_dir, local_exists, remote_exists): @@ -380,7 +418,7 @@ def _process( if jobs is None: jobs = remote.JOBS - status_info = self.status( + dir_status, file_status, dir_paths = self._status( named_caches, remote, jobs=jobs, @@ -388,23 +426,67 @@ def _process( download=download, ) - plans = self._get_plans(download, remote, status_info, status) + dir_plans = self._get_plans(download, remote, dir_status, status) + file_plans = self._get_plans(download, remote, file_status, status) - if len(plans[0]) == 0: + if len(dir_plans[0]) + len(file_plans[0]) == 0: return 0 - if jobs > 1: - with ThreadPoolExecutor(max_workers=jobs) as executor: - fails = sum(executor.map(func, *plans)) - else: - fails = sum(map(func, *plans)) + with ThreadPoolExecutor(max_workers=jobs) as executor: + if download: + fails = sum(executor.map(func, *dir_plans)) + fails += sum(executor.map(func, *file_plans)) + else: + # for uploads, push files first, and any .dir files last + + file_futures = {} + for from_info, to_info, name in zip(*file_plans): + file_futures[to_info] = executor.submit( + func, from_info, to_info, name + ) + dir_futures = {} + for from_info, to_info, name in zip(*dir_plans): + wait_futures = { + future + for file_path, future in file_futures.items() + if file_path in dir_paths[to_info] + } + dir_futures[to_info] = executor.submit( + self._dir_upload, + func, + wait_futures, + from_info, + to_info, + name, + ) + fails = sum( + future.result() + for future in concat( + file_futures.values(), dir_futures.values() + ) + ) if fails: if download: raise DownloadError(fails) raise UploadError(fails) - return len(plans[0]) + return len(dir_plans[0]) + len(file_plans[0]) + + def _dir_upload(self, func, futures, from_info, to_info, name): + for future in as_completed(futures): + if future.result(): + # do not upload this .dir file if any file in this + # directory failed to upload + logger.debug( + "failed to upload full contents of '{}', " + "aborting .dir file upload".format(name) + ) + logger.error( + "failed to upload '{}' to '{}'".format(from_info, to_info) + ) + return 1 + return func(from_info, to_info, name) def push(self, named_caches, remote, jobs=None, show_checksums=False): return self._process( From b5e557465eabaadf8ba8f4ea1385ba08656f332e Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Wed, 8 Apr 2020 16:14:49 +0900 Subject: [PATCH 05/12] gc: always remove .dir checksums first --- dvc/remote/base.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 111bc1e455..13885bc71e 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -741,7 +741,12 @@ def gc(self, named_caches, jobs=None): used.update(file_cache[self.scheme]) removed = False - for checksum in self.all(jobs, str(self.path_info)): + # checksums must be sorted to ensure we always remove .dir files first + for checksum in sorted( + self.all(jobs, str(self.path_info)), + key=self.is_dir_checksum, + reverse=True, + ): if checksum in used: continue path_info = self.checksum_to_path_info(checksum) From 60ff14c4ce93db9fe770a5abf4fe53f5c4a94652 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Mon, 13 Apr 2020 20:03:17 +0900 Subject: [PATCH 06/12] functional tests for push/gc --- tests/func/test_gc.py | 31 +++++++++++++++++++++++++++++++ tests/func/test_remote.py | 22 +++++++++++++++++++--- 2 files changed, 50 insertions(+), 3 deletions(-) diff --git a/tests/func/test_gc.py b/tests/func/test_gc.py index acb3bb94e3..d383c01084 100644 --- a/tests/func/test_gc.py +++ b/tests/func/test_gc.py @@ -1,5 +1,6 @@ import logging import os +from mock import patch import configobj import pytest @@ -304,3 +305,33 @@ def test_gc_cloud_positive(tmp_dir, dvc, tmp_path_factory): for flag in ["-cw", "-ca", "-cT", "-caT", "-cwT"]: assert main(["gc", "-vf", flag]) == 0 + + +def test_gc_cloud_remove_order(tmp_dir, scm, dvc, tmp_path_factory, mocker): + storage = fspath(tmp_path_factory.mktemp("test_remote_base")) + dvc.config["remote"]["local_remote"] = {"url": storage} + dvc.config["core"]["remote"] = "local_remote" + + (standalone, dir1, dir2) = tmp_dir.dvc_gen( + { + "file1": "standalone", + "dir1": {"file2": "file2"}, + "dir2": {"file3": "file3", "file4": "file4"}, + } + ) + dvc.push() + dvc.remove(standalone.relpath) + dvc.remove(dir1.relpath) + dvc.remove(dir2.relpath) + dvc.gc(workspace=True) + + with patch.object(RemoteLOCAL, "remove", autospec=True) as remove: + dvc.gc(workspace=True, cloud=True) + assert len(remove.mock_calls) == 8 + # dir (and unpacked dir) should be first 4 checksums removed from + # the remote + for args in remove.call_args_list[:4]: + checksum = str(args.args[1]) + assert checksum.endswith(".dir") or checksum.endswith( + ".dir.unpacked" + ) diff --git a/tests/func/test_remote.py b/tests/func/test_remote.py index f1ebd35068..b594f3d383 100644 --- a/tests/func/test_remote.py +++ b/tests/func/test_remote.py @@ -177,23 +177,25 @@ def test_partial_push_n_pull(tmp_dir, dvc, tmp_path_factory): foo = tmp_dir.dvc_gen({"foo": "foo content"})[0].outs[0] bar = tmp_dir.dvc_gen({"bar": "bar content"})[0].outs[0] + baz = tmp_dir.dvc_gen({"baz": {"foo": "baz content"}})[0].outs[0] # Faulty upload version, failing on foo original = RemoteLOCAL._upload def unreliable_upload(self, from_file, to_info, name=None, **kwargs): - if name == "foo": + if "foo" in name: raise Exception("stop foo") return original(self, from_file, to_info, name, **kwargs) with patch.object(RemoteLOCAL, "_upload", unreliable_upload): with pytest.raises(UploadError) as upload_error_info: dvc.push() - assert upload_error_info.value.amount == 1 + assert upload_error_info.value.amount == 3 remote = dvc.cloud.get_remote("upstream") assert not remote.exists(remote.checksum_to_path_info(foo.checksum)) assert remote.exists(remote.checksum_to_path_info(bar.checksum)) + assert not remote.exists(remote.checksum_to_path_info(baz.checksum)) # Push everything and delete local cache dvc.push() @@ -202,7 +204,7 @@ def unreliable_upload(self, from_file, to_info, name=None, **kwargs): with patch.object(RemoteLOCAL, "_download", side_effect=Exception): with pytest.raises(DownloadError) as download_error_info: dvc.pull() - assert download_error_info.value.amount == 2 + assert download_error_info.value.amount == 4 def test_raise_on_too_many_open_files(tmp_dir, dvc, tmp_path_factory, mocker): @@ -236,3 +238,17 @@ def test_external_dir_resource_on_no_cache(tmp_dir, dvc, tmp_path_factory): dvc.cache.local = None with pytest.raises(RemoteCacheRequiredError): dvc.run(deps=[fspath(external_dir)]) + + +def test_push_order(tmp_dir, dvc, tmp_path_factory): + url = fspath(tmp_path_factory.mktemp("upstream")) + dvc.config["remote"]["upstream"] = {"url": url} + dvc.config["core"]["remote"] = "upstream" + + tmp_dir.dvc_gen({"foo": {"bar": "bar content"}}) + tmp_dir.dvc_gen({"baz": "baz content"}) + + with patch.object(RemoteLOCAL, "_upload", return_value=0) as upload: + dvc.push() + # last uploaded file should be dir checksum + assert upload.call_args.args[0].endswith(".dir") From 45f2debc61455799dab99e672c0c1a508582c11a Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 14 Apr 2020 14:51:13 +0900 Subject: [PATCH 07/12] repo: support nesting dir caches in NamedCache --- dvc/cache.py | 76 ++++++++++++++++++++++++++++++++---- dvc/output/base.py | 23 +++++------ dvc/repo/__init__.py | 32 +++++---------- dvc/repo/fetch.py | 40 +++++-------------- dvc/repo/gc.py | 5 ++- dvc/stage/__init__.py | 7 ++-- tests/unit/repo/test_repo.py | 24 +++++------- 7 files changed, 114 insertions(+), 93 deletions(-) diff --git a/dvc/cache.py b/dvc/cache.py index 0a5b355765..70ba6a05b2 100644 --- a/dvc/cache.py +++ b/dvc/cache.py @@ -71,9 +71,39 @@ def __init__(self, repo): azure = _make_remote_property("azure") +class NamedCacheItem(object): + def __init__(self): + self.names = set() + self.children = defaultdict(NamedCacheItem) + + def __eq__(self, other): + return self.names == other.names and self.children == other.children + + def child_keys(self): + for key, child in self.children.items(): + yield key + yield from child.child_keys() + + def child_names(self): + for key, child in self.children.items(): + yield key, child.names + yield from child.child_names() + + def add(self, checksum, item): + self.children[checksum].update(item) + + def update(self, item, suffix=""): + if suffix: + self.names.update(n + suffix for n in item.names) + else: + self.names.update(item.names) + for checksum, item in item.children.items(): + self.children[checksum].update(item) + + class NamedCache(object): def __init__(self): - self._items = defaultdict(lambda: defaultdict(set)) + self._items = defaultdict(lambda: defaultdict(NamedCacheItem)) self.external = defaultdict(set) @classmethod @@ -86,7 +116,18 @@ def __getitem__(self, key): return self._items[key] def add(self, scheme, checksum, name): - self._items[scheme][checksum].add(name) + """Add a mapped name for the specified checksum.""" + self._items[scheme][checksum].names.add(name) + + def add_child_cache(self, checksum, cache, suffix=""): + """Add/update child cache for the specified checksum.""" + for scheme, src in cache._items.items(): + dst = self._items[scheme][checksum].children + for child_checksum, item in src.items(): + dst[child_checksum].update(item, suffix=suffix) + + for repo_pair, files in cache.external.items(): + self.external[repo_pair].update(files) def add_external(self, url, rev, path): self.external[url, rev].add(path) @@ -94,11 +135,32 @@ def add_external(self, url, rev, path): def update(self, cache, suffix=""): for scheme, src in cache._items.items(): dst = self._items[scheme] - for checksum, names in src.items(): - if suffix: - dst[checksum].update(n + suffix for n in names) - else: - dst[checksum].update(names) + for checksum, item in src.items(): + dst[checksum].update(item, suffix=suffix) for repo_pair, files in cache.external.items(): self.external[repo_pair].update(files) + + def scheme_keys(self, scheme): + """Iterate over a flat list of all keys for the specified scheme, + including children. + """ + for key, item in self._items[scheme].items(): + yield key + yield from item.child_keys() + + def scheme_names(self, scheme): + """Iterate over a flat list of checksum, names items for the specified + scheme, including children. + """ + for key, item in self._items[scheme].items(): + yield key, item.names + yield from item.child_names() + + def dir_keys(self, scheme): + return ( + key for key, item in self._items[scheme].items() if item.children + ) + + def child_keys(self, scheme, checksum): + return self._items[scheme][checksum].child_keys() diff --git a/dvc/output/base.py b/dvc/output/base.py index 345e055af0..2948c117f5 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -363,9 +363,8 @@ def _collect_used_dir_cache( if self.cache.changed_cache_file(self.checksum): try: - cache = NamedCache.make("local", self.checksum, str(self)) self.repo.cloud.pull( - [(None, cache)], + NamedCache.make("local", self.checksum, str(self)), jobs=jobs, remote=remote, show_checksums=False, @@ -402,22 +401,16 @@ def get_used_cache(self, **kwargs): In case that the given output is a directory, it will also include the `info` of its files. - - Returns: - 2-tuple of NamedCache objects in the form of - (directory `info`, file `info`). - If the given output is not a directory, the first tuple entry will - be None. """ if not self.use_cache: - return None, NamedCache() + return NamedCache() if self.stage.is_repo_import: cache = NamedCache() (dep,) = self.stage.deps cache.external[dep.repo_pair].add(dep.def_path) - return None, cache + return cache if not self.checksum: msg = ( @@ -436,14 +429,18 @@ def get_used_cache(self, **kwargs): ) ) logger.warning(msg) - return None, NamedCache() + return NamedCache() ret = NamedCache.make(self.scheme, self.checksum, str(self)) if not self.is_dir_checksum: - return None, ret + return ret - return ret, self._collect_used_dir_cache(**kwargs) + ret.add_child_cache( + self.checksum, self._collect_used_dir_cache(**kwargs), + ) + + return ret @classmethod def _validate_output_path(cls, path): diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index 657e45054c..79414c4e59 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -253,18 +253,13 @@ def used_cache( `all_branches`/`all_tags`/`all_commits` to expand the scope. Returns: - A list of 2-tuples in the form (dir_cache, file_cache). - Each NamedCache object is a dictionary with Schemes - (representing output's location) as keys, - and a list with the outputs' `dumpd` as values. - If the given output is not a directory, the first tuple entry - will be None. + A dictionary with Schemes (representing output's location) mapped + to items containing the output's `dumpd` names and the output's + children (if the given output is a directory). """ from dvc.cache import NamedCache - used_caches = [] - # group together file caches which do not have an associated directory - file_caches = NamedCache() + cache = NamedCache() for branch in self.brancher( all_branches=all_branches, @@ -282,24 +277,15 @@ def used_cache( suffix = "({})".format(branch) if branch else "" for stage, filter_info in pairs: - for dir_cache, file_cache in stage.get_used_cache( + used_cache = stage.get_used_cache( remote=remote, force=force, jobs=jobs, filter_info=filter_info, - ): - if dir_cache is None: - file_caches.update(file_cache, suffix=suffix) - else: - used_dir = NamedCache() - used_dir.update(dir_cache, suffix=suffix) - used_file = NamedCache() - used_file.update(file_cache, suffix=suffix) - used_caches.append((used_dir, used_file)) - - if file_caches._items or file_caches.external: - used_caches.append((None, file_caches)) - return used_caches + ) + cache.update(used_cache, suffix=suffix) + + return cache def _collect_graph(self, stages=None): """Generate a graph by using the given stages on the given directory diff --git a/dvc/repo/fetch.py b/dvc/repo/fetch.py index 7fdf5d6b86..888f552da0 100644 --- a/dvc/repo/fetch.py +++ b/dvc/repo/fetch.py @@ -1,7 +1,6 @@ import logging -from funcy import concat - +from dvc.cache import NamedCache from dvc.config import NoRemoteError from dvc.exceptions import DownloadError, OutputNotFoundError from dvc.scm.base import CloneError @@ -54,34 +53,15 @@ def _fetch( used, jobs, remote=remote, show_checksums=show_checksums ) except NoRemoteError: - external = False - local = False - for dir_cache, file_cache in used: - if dir_cache: - if dir_cache.external: - external = True - if dir_cache["local"]: - local = True - if file_cache.external: - external = True - if file_cache["local"]: - local = True - if not external and local: + if not used.external and used["local"]: raise except DownloadError as exc: failed += exc.amount - for dir_cache, file_cache in used: - if dir_cache is None: - items = file_cache.external.items() - else: - items = concat( - dir_cache.external.items(), file_cache.external.items() - ) - for (repo_url, repo_rev), files in items: - d, f = _fetch_external(self, repo_url, repo_rev, files, jobs) - downloaded += d - failed += f + for (repo_url, repo_rev), files in used.external.items(): + d, f = _fetch_external(self, repo_url, repo_rev, files, jobs) + downloaded += d + failed += f if failed: raise DownloadError(failed) @@ -101,7 +81,7 @@ def _fetch_external(self, repo_url, repo_rev, files, jobs): if is_dvc_repo: repo.cache.local.cache_dir = self.cache.local.cache_dir with repo.state: - used_cache = [] + cache = NamedCache() for name in files: try: out = repo.find_out_by_relpath(name) @@ -109,12 +89,10 @@ def _fetch_external(self, repo_url, repo_rev, files, jobs): # try to add to cache if they are git-tracked files git_files.append(name) else: - used_cache.append(out.get_used_cache()) + cache.update(out.get_used_cache()) try: - downloaded += repo.cloud.pull( - used_cache, jobs=jobs - ) + downloaded += repo.cloud.pull(cache, jobs=jobs) except DownloadError as exc: failed += exc.amount diff --git a/dvc/repo/gc.py b/dvc/repo/gc.py index 5cd5d2b645..8f1222fd84 100644 --- a/dvc/repo/gc.py +++ b/dvc/repo/gc.py @@ -1,6 +1,7 @@ import logging from . import locked +from dvc.cache import NamedCache from dvc.exceptions import InvalidArgumentError @@ -59,9 +60,9 @@ def gc( stack.enter_context(repo.lock) stack.enter_context(repo.state) - used = [] + used = NamedCache() for repo in all_repos + [self]: - used.extend( + used.update( repo.used_cache( all_branches=all_branches, with_deps=with_deps, diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index 09de5fd6f7..c2c1c1d9c6 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -725,9 +725,10 @@ def get_all_files_number(self, filter_info=None): ) def get_used_cache(self, *args, **kwargs): + from dvc.cache import NamedCache - ret = [] + cache = NamedCache() for out in self._filter_outs(kwargs.get("filter_info")): - ret.append(out.get_used_cache(*args, **kwargs)) + cache.update(out.get_used_cache(*args, **kwargs)) - return ret + return cache diff --git a/tests/unit/repo/test_repo.py b/tests/unit/repo/test_repo.py index 447487f5b0..fbd3f11ef5 100644 --- a/tests/unit/repo/test_repo.py +++ b/tests/unit/repo/test_repo.py @@ -37,27 +37,23 @@ def test_used_cache(tmp_dir, dvc, path): from dvc.cache import NamedCache tmp_dir.dvc_gen({"dir": {"subdir": {"file": "file"}, "other": "other"}}) - expected_dir = NamedCache.make( + expected = NamedCache.make( "local", "70922d6bf66eb073053a82f77d58c536.dir", "dir" ) - expected_file = NamedCache.make( - "local", - "8c7dd922ad47494fc02c388e12c00eac", - os.path.join("dir", "subdir", "file"), + expected.add_child_cache( + "70922d6bf66eb073053a82f77d58c536.dir", + NamedCache.make( + "local", + "8c7dd922ad47494fc02c388e12c00eac", + os.path.join("dir", "subdir", "file"), + ), ) with dvc.state: used_cache = dvc.used_cache([path]) - assert isinstance(used_cache, list) - assert len(used_cache) == 1 - assert isinstance(used_cache[0], tuple) - used_dir = used_cache[0][0] - used_file = used_cache[0][1] assert ( - used_dir._items == expected_dir._items - and used_dir.external == expected_dir.external - and used_file._items == expected_file._items - and used_file.external == expected_file.external + used_cache._items == expected._items + and used_cache.external == expected.external ) From 02aaf4234460803097e395efcdbc91b01c966b0d Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 14 Apr 2020 16:06:51 +0900 Subject: [PATCH 08/12] remote: NamedCache updates --- dvc/data_cloud.py | 47 +++++++++------------- dvc/external_repo.py | 2 +- dvc/remote/base.py | 17 +++----- dvc/remote/local.py | 74 +++++++++++++++++------------------ tests/func/test_data_cloud.py | 6 +-- 5 files changed, 62 insertions(+), 84 deletions(-) diff --git a/dvc/data_cloud.py b/dvc/data_cloud.py index a3976f1452..6fe2683ee2 100644 --- a/dvc/data_cloud.py +++ b/dvc/data_cloud.py @@ -48,12 +48,11 @@ def get_remote(self, remote=None, command=""): def _init_remote(self, remote): return Remote(self.repo, name=remote) - def push(self, caches, jobs=None, remote=None, show_checksums=False): + def push(self, cache, jobs=None, remote=None, show_checksums=False): """Push data items in a cloud-agnostic way. Args: - caches (list): list of (dir_cache, file_cache) tuples containing - named checksums to push to the cloud. + cache (NamedCache): named checksums to push to the cloud. jobs (int): number of jobs that can be running simultaneously. remote (dvc.remote.base.RemoteBASE): optional remote to push to. By default remote from core.remote config option is used. @@ -61,18 +60,17 @@ def push(self, caches, jobs=None, remote=None, show_checksums=False): information messages. """ return self.repo.cache.local.push( - caches, + cache, jobs=jobs, remote=self.get_remote(remote, "push"), show_checksums=show_checksums, ) - def pull(self, caches, jobs=None, remote=None, show_checksums=False): + def pull(self, cache, jobs=None, remote=None, show_checksums=False): """Pull data items in a cloud-agnostic way. Args: - caches (list): list of (dir_cache, file_cache) tuples containing - named checksums to pull from the cloud. + cache (NamedCache): named checksums to pull from the cloud. jobs (int): number of jobs that can be running simultaneously. remote (dvc.remote.base.RemoteBASE): optional remote to pull from. By default remote from core.remote config option is used. @@ -81,36 +79,29 @@ def pull(self, caches, jobs=None, remote=None, show_checksums=False): """ remote = self.get_remote(remote, "pull") downloaded_items_num = self.repo.cache.local.pull( - caches, jobs=jobs, remote=remote, show_checksums=show_checksums + cache, jobs=jobs, remote=remote, show_checksums=show_checksums ) if not remote.verify: - self._save_pulled_checksums(caches) + self._save_pulled_checksums(cache) return downloaded_items_num def _save_pulled_checksums(self, cache): - for dir_cache, file_cache in cache: - checksums = set(file_cache["local"].keys()) - if dir_cache is not None: - checksums.update(dir_cache["local"].keys()) - for checksum in checksums: - cache_file = self.repo.cache.local.checksum_to_path_info( - checksum - ) - if self.repo.cache.local.exists(cache_file): - # We can safely save here, as existing corrupted files will - # be removed upon status, while files corrupted during - # download will not be moved from tmp_file - # (see `RemoteBASE.download()`) - self.repo.state.save(cache_file, checksum) - - def status(self, caches, jobs=None, remote=None, show_checksums=False): + for checksum in cache.scheme_keys("local"): + cache_file = self.repo.cache.local.checksum_to_path_info(checksum) + if self.repo.cache.local.exists(cache_file): + # We can safely save here, as existing corrupted files will + # be removed upon status, while files corrupted during + # download will not be moved from tmp_file + # (see `RemoteBASE.download()`) + self.repo.state.save(cache_file, checksum) + + def status(self, cache, jobs=None, remote=None, show_checksums=False): """Check status of data items in a cloud-agnostic way. Args: - caches (list): list of (dir_cache, file_cache) tuples containg - named checksums to check status for. + cache (NamedCache): named checksums to check status for. jobs (int): number of jobs that can be running simultaneously. remote (dvc.remote.base.RemoteBASE): optional remote to compare cache to. By default remote from core.remote config option @@ -120,5 +111,5 @@ def status(self, caches, jobs=None, remote=None, show_checksums=False): """ remote = self.get_remote(remote, "status") return self.repo.cache.local.status( - caches, jobs=jobs, remote=remote, show_checksums=show_checksums + cache, jobs=jobs, remote=remote, show_checksums=show_checksums ) diff --git a/dvc/external_repo.py b/dvc/external_repo.py index 142fcfc9d2..a598eb6692 100644 --- a/dvc/external_repo.py +++ b/dvc/external_repo.py @@ -104,7 +104,7 @@ def _pull_cached(self, out, path_info, dest): # Only pull unless all needed cache is present if out.changed_cache(filter_info=src): - self.cloud.pull([out.get_used_cache(filter_info=src)]) + self.cloud.pull(out.get_used_cache(filter_info=src)) try: out.checkout(filter_info=src) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 13885bc71e..0001ee5c63 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -731,14 +731,11 @@ def all(self, jobs=None, name=None): remote_size, remote_checksums, jobs, name ) - def gc(self, named_caches, jobs=None): - used = self.extract_used_local_checksums(named_caches) + def gc(self, named_cache, jobs=None): + used = self.extract_used_local_checksums(named_cache) if self.scheme != "": - for dir_cache, file_cache in named_caches: - if dir_cache: - used.update(dir_cache[self.scheme]) - used.update(file_cache[self.scheme]) + used.update(named_cache.scheme_keys(self.scheme)) removed = False # checksums must be sorted to ensure we always remove .dir files first @@ -1253,12 +1250,8 @@ def unprotect(path_info): def _get_unpacked_dir_names(self, checksums): return set() - def extract_used_local_checksums(self, named_caches): - used = set() - for dir_cache, file_cache in named_caches: - if dir_cache: - used.update(dir_cache["local"]) - used.update(file_cache["local"]) + def extract_used_local_checksums(self, named_cache): + used = set(named_cache.scheme_keys("local")) unpacked = self._get_unpacked_dir_names(used) return used | unpacked diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 39a8d73293..101cbf4a5d 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -5,11 +5,10 @@ from concurrent.futures import as_completed, ThreadPoolExecutor from functools import partial -from funcy import concat, first +from funcy import concat from shortuuid import uuid -from dvc.cache import NamedCache from dvc.compat import fspath_py35 from dvc.exceptions import DvcException, DownloadError, UploadError from dvc.path_info import PathInfo @@ -252,7 +251,7 @@ def open(path_info, mode="r", encoding=None): def status( self, - named_caches, + named_cache, remote, jobs=None, show_checksums=False, @@ -260,7 +259,7 @@ def status( ): # Return flattened dict containing all status info dir_status, file_status, _ = self._status( - named_caches, + named_cache, remote, jobs=jobs, show_checksums=show_checksums, @@ -270,7 +269,7 @@ def status( def _status( self, - named_caches, + named_cache, remote, jobs=None, show_checksums=False, @@ -286,18 +285,7 @@ def _status( logger.debug( "Preparing to collect status from {}".format(remote.path_info) ) - merged_dir_cache = NamedCache() - merged_file_cache = NamedCache() - dir_contents = {} - md5s = set() - for dir_cache, file_cache in named_caches: - merged_file_cache.update(file_cache) - md5s.update(file_cache[self.scheme]) - if dir_cache is not None: - merged_dir_cache.update(dir_cache) - dir_checksum = first(dir_cache[self.scheme].keys()) - md5s.add(dir_checksum) - dir_contents[dir_checksum] = file_cache[self.scheme].keys() + md5s = set(named_cache.scheme_keys(self.scheme)) logger.debug("Collecting information from local cache...") local_exists = frozenset( @@ -313,12 +301,14 @@ def _status( else: logger.debug("Collecting information from remote cache...") remote_exists = set() - dir_md5s = set(dir_contents.keys()) + dir_md5s = set(named_cache.dir_keys(self.scheme)) if dir_md5s: # If .dir checksum exists on the remote, assume directory # contents also exists on the remote for dir_checksum in remote._cache_object_exists(dir_md5s): - file_checksums = dir_contents[dir_checksum] + file_checksums = list( + named_cache.child_keys(self.scheme, dir_checksum) + ) logger.debug( "'{}' exists on remote, " "assuming '{}' files also exist".format( @@ -336,26 +326,32 @@ def _status( ) ) - def cache_to_dict(cache): - return { - checksum: { - "name": checksum if show_checksums else " ".join(names) - } - for checksum, names in cache[self.scheme].items() - } + def make_names(checksum, names): + return {"name": checksum if show_checksums else " ".join(names)} + + dir_status = {} + file_status = {} + dir_paths = {} + for checksum, item in named_cache[self.scheme].items(): + if item.children: + dir_status[checksum] = make_names(checksum, item.names) + file_status.update( + { + child_checksum: make_names(child_checksum, child.names) + for child_checksum, child in item.children.items() + } + ) + dir_paths[remote.checksum_to_path_info(checksum)] = frozenset( + map(remote.checksum_to_path_info, item.child_keys()) + ) + else: + file_status[checksum] = make_names(checksum, item.names) - dir_status = cache_to_dict(merged_dir_cache) - file_status = cache_to_dict(merged_file_cache) self._fill_statuses(dir_status, local_exists, remote_exists) self._fill_statuses(file_status, local_exists, remote_exists) self._log_missing_caches(dict(dir_status, **file_status)) - dir_paths = {} - for dir_checksum, file_checksums in dir_contents.items(): - dir_paths[remote.checksum_to_path_info(dir_checksum)] = frozenset( - map(remote.checksum_to_path_info, file_checksums) - ) return dir_status, file_status, dir_paths @staticmethod @@ -391,7 +387,7 @@ def _get_plans(self, download, remote, status_info, status): def _process( self, - named_caches, + named_cache, remote, jobs=None, show_checksums=False, @@ -419,7 +415,7 @@ def _process( jobs = remote.JOBS dir_status, file_status, dir_paths = self._status( - named_caches, + named_cache, remote, jobs=jobs, show_checksums=show_checksums, @@ -488,18 +484,18 @@ def _dir_upload(self, func, futures, from_info, to_info, name): return 1 return func(from_info, to_info, name) - def push(self, named_caches, remote, jobs=None, show_checksums=False): + def push(self, named_cache, remote, jobs=None, show_checksums=False): return self._process( - named_caches, + named_cache, remote, jobs=jobs, show_checksums=show_checksums, download=False, ) - def pull(self, named_caches, remote, jobs=None, show_checksums=False): + def pull(self, named_cache, remote, jobs=None, show_checksums=False): return self._process( - named_caches, + named_cache, remote, jobs=jobs, show_checksums=show_checksums, diff --git a/tests/func/test_data_cloud.py b/tests/func/test_data_cloud.py index 2d17f27616..1bf93ab7fb 100644 --- a/tests/func/test_data_cloud.py +++ b/tests/func/test_data_cloud.py @@ -112,7 +112,7 @@ def _test_cloud(self): out = stage.outs[0] cache = out.cache_path md5 = out.checksum - info = [out.get_used_cache()] + info = out.get_used_cache() stages = self.dvc.add(self.DATA_DIR) self.assertEqual(len(stages), 1) @@ -122,9 +122,7 @@ def _test_cloud(self): cache_dir = out_dir.cache_path name_dir = str(out_dir) md5_dir = out_dir.checksum - info_dir = [ - (NamedCache.make(out_dir.scheme, md5_dir, name_dir), NamedCache()) - ] + info_dir = NamedCache.make(out_dir.scheme, md5_dir, name_dir) with self.cloud.repo.state: # Check status From e914aaecaed60f32a5aabe72241622d69729d81b Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 14 Apr 2020 17:50:48 +0900 Subject: [PATCH 09/12] Fix tests --- tests/func/test_remote.py | 4 +++- tests/unit/output/test_output.py | 5 +---- tests/unit/remote/test_local.py | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/tests/func/test_remote.py b/tests/func/test_remote.py index b594f3d383..d9b9fcf8ef 100644 --- a/tests/func/test_remote.py +++ b/tests/func/test_remote.py @@ -204,7 +204,9 @@ def unreliable_upload(self, from_file, to_info, name=None, **kwargs): with patch.object(RemoteLOCAL, "_download", side_effect=Exception): with pytest.raises(DownloadError) as download_error_info: dvc.pull() - assert download_error_info.value.amount == 4 + # error count should be len(.dir + standalone file checksums) + # since files inside dir are ignored if dir cache entry is missing + assert download_error_info.value.amount == 3 def test_raise_on_too_many_open_files(tmp_dir, dvc, tmp_path_factory, mocker): diff --git a/tests/unit/output/test_output.py b/tests/unit/output/test_output.py index 42e503ec83..fb9325471b 100644 --- a/tests/unit/output/test_output.py +++ b/tests/unit/output/test_output.py @@ -86,8 +86,5 @@ def test_get_used_cache(exists, expected_message, mocker, caplog): ).return_value = exists with caplog.at_level(logging.WARNING, logger="dvc"): - used = output.get_used_cache() - assert isinstance(used, tuple) - assert used[0] is None - assert isinstance(used[1], NamedCache) + assert isinstance(output.get_used_cache(), NamedCache) assert first(caplog.messages) == expected_message diff --git a/tests/unit/remote/test_local.py b/tests/unit/remote/test_local.py index fb3478416d..a5d653e971 100644 --- a/tests/unit/remote/test_local.py +++ b/tests/unit/remote/test_local.py @@ -26,7 +26,7 @@ def test_status_download_optimization(mocker, dvc): other_remote.url = "other_remote" other_remote.cache_exists.return_value = [] - remote.status([(None, infos)], other_remote, download=True) + remote.status(infos, other_remote, download=True) assert other_remote.cache_exists.call_count == 0 From 266d1abb917008d729cafe4be7910b1ef51a6125 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 14 Apr 2020 20:20:40 +0900 Subject: [PATCH 10/12] Fix deepsource warnings --- dvc/cache.py | 4 ++-- dvc/remote/local.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/dvc/cache.py b/dvc/cache.py index 70ba6a05b2..1769a45633 100644 --- a/dvc/cache.py +++ b/dvc/cache.py @@ -97,8 +97,8 @@ def update(self, item, suffix=""): self.names.update(n + suffix for n in item.names) else: self.names.update(item.names) - for checksum, item in item.children.items(): - self.children[checksum].update(item) + for checksum, child_item in item.children.items(): + self.children[checksum].update(child_item) class NamedCache(object): diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 101cbf4a5d..16b846db52 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -469,7 +469,8 @@ def _process( return len(dir_plans[0]) + len(file_plans[0]) - def _dir_upload(self, func, futures, from_info, to_info, name): + @staticmethod + def _dir_upload(func, futures, from_info, to_info, name): for future in as_completed(futures): if future.result(): # do not upload this .dir file if any file in this From 448876d40e20632af75dd5dc2f0803e3bccb4399 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 14 Apr 2020 20:49:51 +0900 Subject: [PATCH 11/12] tests: use pytest mocker fixture --- tests/func/test_gc.py | 19 ++++++++----------- tests/func/test_remote.py | 10 +++++----- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/tests/func/test_gc.py b/tests/func/test_gc.py index d383c01084..acbbd78e49 100644 --- a/tests/func/test_gc.py +++ b/tests/func/test_gc.py @@ -1,6 +1,5 @@ import logging import os -from mock import patch import configobj import pytest @@ -325,13 +324,11 @@ def test_gc_cloud_remove_order(tmp_dir, scm, dvc, tmp_path_factory, mocker): dvc.remove(dir2.relpath) dvc.gc(workspace=True) - with patch.object(RemoteLOCAL, "remove", autospec=True) as remove: - dvc.gc(workspace=True, cloud=True) - assert len(remove.mock_calls) == 8 - # dir (and unpacked dir) should be first 4 checksums removed from - # the remote - for args in remove.call_args_list[:4]: - checksum = str(args.args[1]) - assert checksum.endswith(".dir") or checksum.endswith( - ".dir.unpacked" - ) + mocked_remove = mocker.patch.object(RemoteLOCAL, "remove", autospec=True) + dvc.gc(workspace=True, cloud=True) + assert len(mocked_remove.mock_calls) == 8 + # dir (and unpacked dir) should be first 4 checksums removed from + # the remote + for args in mocked_remove.call_args_list[:4]: + checksum = str(args[0][1]) + assert checksum.endswith(".dir") or checksum.endswith(".dir.unpacked") diff --git a/tests/func/test_remote.py b/tests/func/test_remote.py index d9b9fcf8ef..f6f37017ea 100644 --- a/tests/func/test_remote.py +++ b/tests/func/test_remote.py @@ -242,7 +242,7 @@ def test_external_dir_resource_on_no_cache(tmp_dir, dvc, tmp_path_factory): dvc.run(deps=[fspath(external_dir)]) -def test_push_order(tmp_dir, dvc, tmp_path_factory): +def test_push_order(tmp_dir, dvc, tmp_path_factory, mocker): url = fspath(tmp_path_factory.mktemp("upstream")) dvc.config["remote"]["upstream"] = {"url": url} dvc.config["core"]["remote"] = "upstream" @@ -250,7 +250,7 @@ def test_push_order(tmp_dir, dvc, tmp_path_factory): tmp_dir.dvc_gen({"foo": {"bar": "bar content"}}) tmp_dir.dvc_gen({"baz": "baz content"}) - with patch.object(RemoteLOCAL, "_upload", return_value=0) as upload: - dvc.push() - # last uploaded file should be dir checksum - assert upload.call_args.args[0].endswith(".dir") + mocked_upload = mocker.patch.object(RemoteLOCAL, "_upload", return_value=0) + dvc.push() + # last uploaded file should be dir checksum + assert mocked_upload.call_args[0][0].endswith(".dir") From 9bc3d78f0ab4f6a4c6de643ae4e6fc2e76622169 Mon Sep 17 00:00:00 2001 From: Ruslan Kuprieiev Date: Tue, 14 Apr 2020 15:37:15 +0300 Subject: [PATCH 12/12] Update dvc/cache.py Co-Authored-By: Saugat Pachhai --- dvc/cache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dvc/cache.py b/dvc/cache.py index 1769a45633..bb56d6c08b 100644 --- a/dvc/cache.py +++ b/dvc/cache.py @@ -71,7 +71,7 @@ def __init__(self, repo): azure = _make_remote_property("azure") -class NamedCacheItem(object): +class NamedCacheItem: def __init__(self): self.names = set() self.children = defaultdict(NamedCacheItem)