From 686b5b3e185af11eee0d9771aeb146fd11ab4499 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniele=20Trifir=C3=B2?= Date: Tue, 1 Feb 2022 12:35:12 +0100 Subject: [PATCH 1/3] fix passing of `jobs` kwarg the `jobs` kwarg was not being passed down properly, resulting in it being ignored in some cases (push/pull/gc) --- dvc/data/status.py | 20 +++++++++++++------- dvc/data/transfer.py | 4 +++- dvc/objects/db.py | 6 ++++-- dvc/repo/gc.py | 2 +- tests/unit/remote/test_base.py | 4 ++-- 5 files changed, 23 insertions(+), 13 deletions(-) diff --git a/dvc/data/status.py b/dvc/data/status.py index 400b6f163e..43509220f7 100644 --- a/dvc/data/status.py +++ b/dvc/data/status.py @@ -27,14 +27,14 @@ class CompareStatusResult(NamedTuple): deleted: Set["HashInfo"] -def _indexed_dir_hashes(odb, index, dir_objs, name, cache_odb): +def _indexed_dir_hashes(odb, index, dir_objs, name, cache_odb, jobs=None): # Validate our index by verifying all indexed .dir hashes # still exist on the remote dir_hashes = set(dir_objs.keys()) indexed_dirs = set(index.dir_hashes()) indexed_dir_exists = set() if indexed_dirs: - indexed_dir_exists.update(odb.list_hashes_exists(indexed_dirs)) + indexed_dir_exists.update(odb.list_hashes_exists(indexed_dirs, jobs)) missing_dirs = indexed_dirs.difference(indexed_dir_exists) if missing_dirs: logger.debug( @@ -46,7 +46,7 @@ def _indexed_dir_hashes(odb, index, dir_objs, name, cache_odb): # Check if non-indexed (new) dir hashes exist on remote dir_exists = dir_hashes.intersection(indexed_dir_exists) - dir_exists.update(odb.list_hashes_exists(dir_hashes - dir_exists)) + dir_exists.update(odb.list_hashes_exists(dir_hashes - dir_exists, jobs)) # If .dir hash exists in the ODB, assume directory contents # also exists @@ -76,6 +76,7 @@ def status( index: Optional["ObjectDBIndexBase"] = None, cache_odb: Optional["ObjectDB"] = None, shallow: bool = True, + jobs: Optional[int] = None, **kwargs, ) -> "StatusResult": """Return status of whether or not the specified objects exist odb. @@ -121,7 +122,9 @@ def status( if index and hashes: if dir_objs: exists = hashes.intersection( - _indexed_dir_hashes(odb, index, dir_objs, name, cache_odb) + _indexed_dir_hashes( + odb, index, dir_objs, name, cache_odb, jobs=jobs + ) ) hashes.difference_update(exists) if hashes: @@ -129,7 +132,9 @@ def status( hashes.difference_update(exists) if hashes: - exists.update(odb.hashes_exist(hashes, name=odb.fs_path, **kwargs)) + exists.update( + odb.hashes_exist(hashes, name=odb.fs_path, jobs=jobs, **kwargs) + ) return StatusResult( {hash_infos[hash_] for hash_ in exists}, {hash_infos[hash_] for hash_ in (hashes - exists)}, @@ -144,6 +149,7 @@ def compare_status( check_deleted: bool = True, src_index: Optional["ObjectDBIndexBase"] = None, dest_index: Optional["ObjectDBIndexBase"] = None, + jobs: Optional[int] = None, **kwargs, ) -> "CompareStatusResult": """Compare status for the specified objects between two ODBs. @@ -157,13 +163,13 @@ def compare_status( if "cache_odb" not in kwargs: kwargs["cache_odb"] = src dest_exists, dest_missing = status( - dest, obj_ids, index=dest_index, **kwargs + dest, obj_ids, index=dest_index, jobs=jobs, **kwargs ) # for transfer operations we can skip src status check when all objects # already exist in dest if dest_missing or check_deleted: src_exists, src_missing = status( - src, obj_ids, index=src_index, **kwargs + src, obj_ids, index=src_index, jobs=jobs, **kwargs ) else: src_exists = dest_exists diff --git a/dvc/data/transfer.py b/dvc/data/transfer.py index dbb6cc6df0..1ca3228550 100644 --- a/dvc/data/transfer.py +++ b/dvc/data/transfer.py @@ -151,7 +151,9 @@ def transfer( if src == dest: return 0 - status = compare_status(src, dest, obj_ids, check_deleted=False, **kwargs) + status = compare_status( + src, dest, obj_ids, check_deleted=False, jobs=jobs, **kwargs + ) if not status.new: return 0 diff --git a/dvc/objects/db.py b/dvc/objects/db.py index 1d5eee565d..a30c2c6fbb 100644 --- a/dvc/objects/db.py +++ b/dvc/objects/db.py @@ -350,7 +350,7 @@ def all(self, jobs=None, name=None): remote_size, remote_hashes = self._estimate_remote_size(name=name) return self._list_hashes_traverse( - remote_size, remote_hashes, jobs, name + remote_size, remote_hashes, jobs=jobs, name=name ) def _remove_unpacked_dir(self, hash_): @@ -457,6 +457,8 @@ def hashes_exist(self, hashes, jobs=None, name=None): logger.debug(f"Querying '{len(hashes)}' hashes via traverse") remote_hashes = set( - self._list_hashes_traverse(remote_size, remote_hashes, jobs, name) + self._list_hashes_traverse( + remote_size, remote_hashes, jobs=jobs, name=name + ) ) return list(hashes & set(remote_hashes)) diff --git a/dvc/repo/gc.py b/dvc/repo/gc.py index 6aa98ac220..c3fe871bd5 100644 --- a/dvc/repo/gc.py +++ b/dvc/repo/gc.py @@ -82,7 +82,7 @@ def gc( return odb = self.cloud.get_remote_odb(remote, "gc -c") - removed = ogc(odb, used_obj_ids) + removed = ogc(odb, used_obj_ids, jobs=jobs) if removed: get_index(odb).clear() else: diff --git a/tests/unit/remote/test_base.py b/tests/unit/remote/test_base.py index 02b38b9551..0941922c8c 100644 --- a/tests/unit/remote/test_base.py +++ b/tests/unit/remote/test_base.py @@ -60,8 +60,8 @@ def test_hashes_exist(object_exists, traverse, dvc): traverse.assert_called_with( 256 * pow(16, odb.fs.TRAVERSE_PREFIX_LEN), set(range(256)), - None, - None, + jobs=None, + name=None, ) From 047b9b49b7b9b6d2797c8d163a6e9f66db0a74cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniele=20Trifir=C3=B2?= Date: Fri, 11 Feb 2022 18:23:26 +0100 Subject: [PATCH 2/3] test_data_cloud: also check for the correct number of jobs --- tests/func/test_data_cloud.py | 46 +++++++++++++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/tests/func/test_data_cloud.py b/tests/func/test_data_cloud.py index 9bd56f4600..eea0e549b4 100644 --- a/tests/func/test_data_cloud.py +++ b/tests/func/test_data_cloud.py @@ -7,7 +7,9 @@ import dvc as dvc_module from dvc.cli import main +from dvc.data.db.local import LocalObjectDB from dvc.external_repo import clean_repos +from dvc.objects.db import ObjectDB from dvc.stage.exceptions import StageNotFound from dvc.testing.test_remote import ( # noqa, pylint: disable=unused-import TestRemote, @@ -15,8 +17,9 @@ from dvc.utils.fs import remove -def test_cloud_cli(tmp_dir, dvc, remote): - args = ["-v", "-j", "2"] +def test_cloud_cli(tmp_dir, dvc, remote, mocker): + jobs = 2 + args = ["-v", "-j", str(jobs)] (stage,) = tmp_dir.dvc_gen("foo", "foo") cache = stage.outs[0].cache_path @@ -34,18 +37,32 @@ def test_cloud_cli(tmp_dir, dvc, remote): cache_dir = stage_dir.outs[0].cache_path # FIXME check status output + hashes_exist = mocker.spy(LocalObjectDB, "hashes_exist") assert main(["push"] + args) == 0 assert os.path.exists(cache) assert os.path.isfile(cache) assert os.path.isfile(cache_dir) + assert hashes_exist.called + assert all( + _kwargs["jobs"] == jobs + for (_args, _kwargs) in hashes_exist.call_args_list + ) remove(dvc.odb.local.cache_dir) + hashes_exist.reset_mock() assert main(["fetch"] + args) == 0 assert os.path.exists(cache) assert os.path.isfile(cache) assert os.path.isfile(cache_dir) + assert hashes_exist.called + assert all( + _kwargs["jobs"] == jobs + for (_args, _kwargs) in hashes_exist.call_args_list + ) + + hashes_exist.reset_mock() assert main(["pull"] + args) == 0 assert os.path.exists(cache) @@ -53,6 +70,11 @@ def test_cloud_cli(tmp_dir, dvc, remote): assert os.path.isfile(cache_dir) assert os.path.isfile("foo") assert os.path.isdir("data_dir") + assert hashes_exist.called + assert all( + _kwargs["jobs"] == jobs + for (_args, _kwargs) in hashes_exist.call_args_list + ) with open(cache, encoding="utf-8") as fd: assert fd.read() == "foo" @@ -62,18 +84,38 @@ def test_cloud_cli(tmp_dir, dvc, remote): if remote.url.startswith("http"): return + hashes_exist.reset_mock() + + _list_hashes_traverse = mocker.spy(ObjectDB, "_list_hashes_traverse") # NOTE: check if remote gc works correctly on directories assert main(["gc", "-cw", "-f"] + args) == 0 + assert _list_hashes_traverse.called + assert all( + _kwargs["jobs"] == 2 + for (_args, _kwargs) in hashes_exist.call_args_list + ) shutil.move(dvc.odb.local.cache_dir, dvc.odb.local.cache_dir + ".back") assert main(["fetch"] + args) == 0 + assert hashes_exist.called + assert all( + _kwargs["jobs"] == jobs + for (_args, _kwargs) in hashes_exist.call_args_list + ) + + hashes_exist.reset_mock() assert main(["pull", "-f"] + args) == 0 assert os.path.exists(cache) assert os.path.isfile(cache) assert os.path.isfile(cache_dir) assert os.path.isfile("foo") assert os.path.isdir("data_dir") + assert hashes_exist.called + assert all( + _kwargs["jobs"] == jobs + for (_args, _kwargs) in hashes_exist.call_args_list + ) def test_data_cloud_error_cli(dvc): From dfcb7d2060c18bf681f54523e3a23569a41bf6ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniele=20Trifir=C3=B2?= Date: Tue, 15 Feb 2022 10:42:02 +0100 Subject: [PATCH 3/3] status: cleanup invocation of hashes_exist --- dvc/data/status.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dvc/data/status.py b/dvc/data/status.py index 43509220f7..9ec4d0f3d7 100644 --- a/dvc/data/status.py +++ b/dvc/data/status.py @@ -132,9 +132,7 @@ def status( hashes.difference_update(exists) if hashes: - exists.update( - odb.hashes_exist(hashes, name=odb.fs_path, jobs=jobs, **kwargs) - ) + exists.update(odb.hashes_exist(hashes, name=odb.fs_path, jobs=jobs)) return StatusResult( {hash_infos[hash_] for hash_ in exists}, {hash_infos[hash_] for hash_ in (hashes - exists)},