diff --git a/dvc/cache.py b/dvc/cache.py index 0a5b355765..bb56d6c08b 100644 --- a/dvc/cache.py +++ b/dvc/cache.py @@ -71,9 +71,39 @@ def __init__(self, repo): azure = _make_remote_property("azure") +class NamedCacheItem: + def __init__(self): + self.names = set() + self.children = defaultdict(NamedCacheItem) + + def __eq__(self, other): + return self.names == other.names and self.children == other.children + + def child_keys(self): + for key, child in self.children.items(): + yield key + yield from child.child_keys() + + def child_names(self): + for key, child in self.children.items(): + yield key, child.names + yield from child.child_names() + + def add(self, checksum, item): + self.children[checksum].update(item) + + def update(self, item, suffix=""): + if suffix: + self.names.update(n + suffix for n in item.names) + else: + self.names.update(item.names) + for checksum, child_item in item.children.items(): + self.children[checksum].update(child_item) + + class NamedCache(object): def __init__(self): - self._items = defaultdict(lambda: defaultdict(set)) + self._items = defaultdict(lambda: defaultdict(NamedCacheItem)) self.external = defaultdict(set) @classmethod @@ -86,7 +116,18 @@ def __getitem__(self, key): return self._items[key] def add(self, scheme, checksum, name): - self._items[scheme][checksum].add(name) + """Add a mapped name for the specified checksum.""" + self._items[scheme][checksum].names.add(name) + + def add_child_cache(self, checksum, cache, suffix=""): + """Add/update child cache for the specified checksum.""" + for scheme, src in cache._items.items(): + dst = self._items[scheme][checksum].children + for child_checksum, item in src.items(): + dst[child_checksum].update(item, suffix=suffix) + + for repo_pair, files in cache.external.items(): + self.external[repo_pair].update(files) def add_external(self, url, rev, path): self.external[url, rev].add(path) @@ -94,11 +135,32 @@ def add_external(self, url, rev, path): def update(self, cache, suffix=""): for scheme, src in cache._items.items(): dst = self._items[scheme] - for checksum, names in src.items(): - if suffix: - dst[checksum].update(n + suffix for n in names) - else: - dst[checksum].update(names) + for checksum, item in src.items(): + dst[checksum].update(item, suffix=suffix) for repo_pair, files in cache.external.items(): self.external[repo_pair].update(files) + + def scheme_keys(self, scheme): + """Iterate over a flat list of all keys for the specified scheme, + including children. + """ + for key, item in self._items[scheme].items(): + yield key + yield from item.child_keys() + + def scheme_names(self, scheme): + """Iterate over a flat list of checksum, names items for the specified + scheme, including children. + """ + for key, item in self._items[scheme].items(): + yield key, item.names + yield from item.child_names() + + def dir_keys(self, scheme): + return ( + key for key, item in self._items[scheme].items() if item.children + ) + + def child_keys(self, scheme, checksum): + return self._items[scheme][checksum].child_keys() diff --git a/dvc/data_cloud.py b/dvc/data_cloud.py index ea375b65ce..6fe2683ee2 100644 --- a/dvc/data_cloud.py +++ b/dvc/data_cloud.py @@ -88,12 +88,13 @@ def pull(self, cache, jobs=None, remote=None, show_checksums=False): return downloaded_items_num def _save_pulled_checksums(self, cache): - for checksum in cache["local"].keys(): + for checksum in cache.scheme_keys("local"): cache_file = self.repo.cache.local.checksum_to_path_info(checksum) if self.repo.cache.local.exists(cache_file): - # We can safely save here, as existing corrupted files will be - # removed upon status, while files corrupted during download - # will not be moved from tmp_file (see `RemoteBASE.download()`) + # We can safely save here, as existing corrupted files will + # be removed upon status, while files corrupted during + # download will not be moved from tmp_file + # (see `RemoteBASE.download()`) self.repo.state.save(cache_file, checksum) def status(self, cache, jobs=None, remote=None, show_checksums=False): diff --git a/dvc/output/base.py b/dvc/output/base.py index 879dd9a713..2948c117f5 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -436,7 +436,9 @@ def get_used_cache(self, **kwargs): if not self.is_dir_checksum: return ret - ret.update(self._collect_used_dir_cache(**kwargs)) + ret.add_child_cache( + self.checksum, self._collect_used_dir_cache(**kwargs), + ) return ret diff --git a/dvc/remote/base.py b/dvc/remote/base.py index c349a7a12b..0001ee5c63 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -732,14 +732,18 @@ def all(self, jobs=None, name=None): ) def gc(self, named_cache, jobs=None): - logger.debug("named_cache: {} jobs: {}".format(named_cache, jobs)) used = self.extract_used_local_checksums(named_cache) if self.scheme != "": - used.update(named_cache[self.scheme]) + used.update(named_cache.scheme_keys(self.scheme)) removed = False - for checksum in self.all(jobs, str(self.path_info)): + # checksums must be sorted to ensure we always remove .dir files first + for checksum in sorted( + self.all(jobs, str(self.path_info)), + key=self.is_dir_checksum, + reverse=True, + ): if checksum in used: continue path_info = self.checksum_to_path_info(checksum) @@ -1247,7 +1251,7 @@ def _get_unpacked_dir_names(self, checksums): return set() def extract_used_local_checksums(self, named_cache): - used = set(named_cache["local"]) + used = set(named_cache.scheme_keys("local")) unpacked = self._get_unpacked_dir_names(used) return used | unpacked diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 7643c3556b..16b846db52 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -2,9 +2,11 @@ import logging import os import stat -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import as_completed, ThreadPoolExecutor from functools import partial +from funcy import concat + from shortuuid import uuid from dvc.compat import fspath_py35 @@ -255,37 +257,102 @@ def status( show_checksums=False, download=False, ): + # Return flattened dict containing all status info + dir_status, file_status, _ = self._status( + named_cache, + remote, + jobs=jobs, + show_checksums=show_checksums, + download=download, + ) + return dict(dir_status, **file_status) + + def _status( + self, + named_cache, + remote, + jobs=None, + show_checksums=False, + download=False, + ): + """Return a tuple of (dir_status_info, file_status_info, dir_mapping). + + dir_status_info contains status for .dir files, file_status_info + contains status for all other files, and dir_mapping is a dict of + {dir_path_info: set(file_path_info...)} which can be used to map + a .dir file to its file contents. + """ logger.debug( "Preparing to collect status from {}".format(remote.path_info) ) - md5s = list(named_cache[self.scheme]) + md5s = set(named_cache.scheme_keys(self.scheme)) logger.debug("Collecting information from local cache...") - local_exists = self.cache_exists(md5s, jobs=jobs, name=self.cache_dir) + local_exists = frozenset( + self.cache_exists(md5s, jobs=jobs, name=self.cache_dir) + ) # This is a performance optimization. We can safely assume that, # if the resources that we want to fetch are already cached, # there's no need to check the remote storage for the existence of # those files. - if download and sorted(local_exists) == sorted(md5s): + if download and local_exists == md5s: remote_exists = local_exists else: logger.debug("Collecting information from remote cache...") - remote_exists = list( - remote.cache_exists( - md5s, jobs=jobs, name=str(remote.path_info) + remote_exists = set() + dir_md5s = set(named_cache.dir_keys(self.scheme)) + if dir_md5s: + # If .dir checksum exists on the remote, assume directory + # contents also exists on the remote + for dir_checksum in remote._cache_object_exists(dir_md5s): + file_checksums = list( + named_cache.child_keys(self.scheme, dir_checksum) + ) + logger.debug( + "'{}' exists on remote, " + "assuming '{}' files also exist".format( + dir_checksum, len(file_checksums) + ) + ) + md5s.remove(dir_checksum) + remote_exists.add(dir_checksum) + md5s.difference_update(file_checksums) + remote_exists.update(file_checksums) + if md5s: + remote_exists.update( + remote.cache_exists( + md5s, jobs=jobs, name=str(remote.path_info) + ) ) - ) - ret = { - checksum: {"name": checksum if show_checksums else " ".join(names)} - for checksum, names in named_cache[self.scheme].items() - } - self._fill_statuses(ret, local_exists, remote_exists) + def make_names(checksum, names): + return {"name": checksum if show_checksums else " ".join(names)} + + dir_status = {} + file_status = {} + dir_paths = {} + for checksum, item in named_cache[self.scheme].items(): + if item.children: + dir_status[checksum] = make_names(checksum, item.names) + file_status.update( + { + child_checksum: make_names(child_checksum, child.names) + for child_checksum, child in item.children.items() + } + ) + dir_paths[remote.checksum_to_path_info(checksum)] = frozenset( + map(remote.checksum_to_path_info, item.child_keys()) + ) + else: + file_status[checksum] = make_names(checksum, item.names) + + self._fill_statuses(dir_status, local_exists, remote_exists) + self._fill_statuses(file_status, local_exists, remote_exists) - self._log_missing_caches(ret) + self._log_missing_caches(dict(dir_status, **file_status)) - return ret + return dir_status, file_status, dir_paths @staticmethod def _fill_statuses(checksum_info_dir, local_exists, remote_exists): @@ -347,7 +414,7 @@ def _process( if jobs is None: jobs = remote.JOBS - status_info = self.status( + dir_status, file_status, dir_paths = self._status( named_cache, remote, jobs=jobs, @@ -355,23 +422,68 @@ def _process( download=download, ) - plans = self._get_plans(download, remote, status_info, status) + dir_plans = self._get_plans(download, remote, dir_status, status) + file_plans = self._get_plans(download, remote, file_status, status) - if len(plans[0]) == 0: + if len(dir_plans[0]) + len(file_plans[0]) == 0: return 0 - if jobs > 1: - with ThreadPoolExecutor(max_workers=jobs) as executor: - fails = sum(executor.map(func, *plans)) - else: - fails = sum(map(func, *plans)) + with ThreadPoolExecutor(max_workers=jobs) as executor: + if download: + fails = sum(executor.map(func, *dir_plans)) + fails += sum(executor.map(func, *file_plans)) + else: + # for uploads, push files first, and any .dir files last + + file_futures = {} + for from_info, to_info, name in zip(*file_plans): + file_futures[to_info] = executor.submit( + func, from_info, to_info, name + ) + dir_futures = {} + for from_info, to_info, name in zip(*dir_plans): + wait_futures = { + future + for file_path, future in file_futures.items() + if file_path in dir_paths[to_info] + } + dir_futures[to_info] = executor.submit( + self._dir_upload, + func, + wait_futures, + from_info, + to_info, + name, + ) + fails = sum( + future.result() + for future in concat( + file_futures.values(), dir_futures.values() + ) + ) if fails: if download: raise DownloadError(fails) raise UploadError(fails) - return len(plans[0]) + return len(dir_plans[0]) + len(file_plans[0]) + + @staticmethod + def _dir_upload(func, futures, from_info, to_info, name): + for future in as_completed(futures): + if future.result(): + # do not upload this .dir file if any file in this + # directory failed to upload + logger.debug( + "failed to upload full contents of '{}', " + "aborting .dir file upload".format(name) + ) + logger.error( + "failed to upload '{}' to '{}'".format(from_info, to_info) + ) + return 1 + return func(from_info, to_info, name) def push(self, named_cache, remote, jobs=None, show_checksums=False): return self._process( diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index 8d50c54ded..79414c4e59 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -253,8 +253,9 @@ def used_cache( `all_branches`/`all_tags`/`all_commits` to expand the scope. Returns: - A dictionary with Schemes (representing output's location) as keys, - and a list with the outputs' `dumpd` as values. + A dictionary with Schemes (representing output's location) mapped + to items containing the output's `dumpd` names and the output's + children (if the given output is a directory). """ from dvc.cache import NamedCache diff --git a/dvc/repo/fetch.py b/dvc/repo/fetch.py index f77491bb79..888f552da0 100644 --- a/dvc/repo/fetch.py +++ b/dvc/repo/fetch.py @@ -6,7 +6,6 @@ from dvc.scm.base import CloneError from dvc.path_info import PathInfo - logger = logging.getLogger(__name__) diff --git a/tests/func/test_gc.py b/tests/func/test_gc.py index acb3bb94e3..acbbd78e49 100644 --- a/tests/func/test_gc.py +++ b/tests/func/test_gc.py @@ -304,3 +304,31 @@ def test_gc_cloud_positive(tmp_dir, dvc, tmp_path_factory): for flag in ["-cw", "-ca", "-cT", "-caT", "-cwT"]: assert main(["gc", "-vf", flag]) == 0 + + +def test_gc_cloud_remove_order(tmp_dir, scm, dvc, tmp_path_factory, mocker): + storage = fspath(tmp_path_factory.mktemp("test_remote_base")) + dvc.config["remote"]["local_remote"] = {"url": storage} + dvc.config["core"]["remote"] = "local_remote" + + (standalone, dir1, dir2) = tmp_dir.dvc_gen( + { + "file1": "standalone", + "dir1": {"file2": "file2"}, + "dir2": {"file3": "file3", "file4": "file4"}, + } + ) + dvc.push() + dvc.remove(standalone.relpath) + dvc.remove(dir1.relpath) + dvc.remove(dir2.relpath) + dvc.gc(workspace=True) + + mocked_remove = mocker.patch.object(RemoteLOCAL, "remove", autospec=True) + dvc.gc(workspace=True, cloud=True) + assert len(mocked_remove.mock_calls) == 8 + # dir (and unpacked dir) should be first 4 checksums removed from + # the remote + for args in mocked_remove.call_args_list[:4]: + checksum = str(args[0][1]) + assert checksum.endswith(".dir") or checksum.endswith(".dir.unpacked") diff --git a/tests/func/test_remote.py b/tests/func/test_remote.py index f1ebd35068..f6f37017ea 100644 --- a/tests/func/test_remote.py +++ b/tests/func/test_remote.py @@ -177,23 +177,25 @@ def test_partial_push_n_pull(tmp_dir, dvc, tmp_path_factory): foo = tmp_dir.dvc_gen({"foo": "foo content"})[0].outs[0] bar = tmp_dir.dvc_gen({"bar": "bar content"})[0].outs[0] + baz = tmp_dir.dvc_gen({"baz": {"foo": "baz content"}})[0].outs[0] # Faulty upload version, failing on foo original = RemoteLOCAL._upload def unreliable_upload(self, from_file, to_info, name=None, **kwargs): - if name == "foo": + if "foo" in name: raise Exception("stop foo") return original(self, from_file, to_info, name, **kwargs) with patch.object(RemoteLOCAL, "_upload", unreliable_upload): with pytest.raises(UploadError) as upload_error_info: dvc.push() - assert upload_error_info.value.amount == 1 + assert upload_error_info.value.amount == 3 remote = dvc.cloud.get_remote("upstream") assert not remote.exists(remote.checksum_to_path_info(foo.checksum)) assert remote.exists(remote.checksum_to_path_info(bar.checksum)) + assert not remote.exists(remote.checksum_to_path_info(baz.checksum)) # Push everything and delete local cache dvc.push() @@ -202,7 +204,9 @@ def unreliable_upload(self, from_file, to_info, name=None, **kwargs): with patch.object(RemoteLOCAL, "_download", side_effect=Exception): with pytest.raises(DownloadError) as download_error_info: dvc.pull() - assert download_error_info.value.amount == 2 + # error count should be len(.dir + standalone file checksums) + # since files inside dir are ignored if dir cache entry is missing + assert download_error_info.value.amount == 3 def test_raise_on_too_many_open_files(tmp_dir, dvc, tmp_path_factory, mocker): @@ -236,3 +240,17 @@ def test_external_dir_resource_on_no_cache(tmp_dir, dvc, tmp_path_factory): dvc.cache.local = None with pytest.raises(RemoteCacheRequiredError): dvc.run(deps=[fspath(external_dir)]) + + +def test_push_order(tmp_dir, dvc, tmp_path_factory, mocker): + url = fspath(tmp_path_factory.mktemp("upstream")) + dvc.config["remote"]["upstream"] = {"url": url} + dvc.config["core"]["remote"] = "upstream" + + tmp_dir.dvc_gen({"foo": {"bar": "bar content"}}) + tmp_dir.dvc_gen({"baz": "baz content"}) + + mocked_upload = mocker.patch.object(RemoteLOCAL, "_upload", return_value=0) + dvc.push() + # last uploaded file should be dir checksum + assert mocked_upload.call_args[0][0].endswith(".dir") diff --git a/tests/unit/repo/test_repo.py b/tests/unit/repo/test_repo.py index 985c113ef1..fbd3f11ef5 100644 --- a/tests/unit/repo/test_repo.py +++ b/tests/unit/repo/test_repo.py @@ -40,10 +40,13 @@ def test_used_cache(tmp_dir, dvc, path): expected = NamedCache.make( "local", "70922d6bf66eb073053a82f77d58c536.dir", "dir" ) - expected.add( - "local", - "8c7dd922ad47494fc02c388e12c00eac", - os.path.join("dir", "subdir", "file"), + expected.add_child_cache( + "70922d6bf66eb073053a82f77d58c536.dir", + NamedCache.make( + "local", + "8c7dd922ad47494fc02c388e12c00eac", + os.path.join("dir", "subdir", "file"), + ), ) with dvc.state: