Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions dvc/data/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ class CompareStatusResult(NamedTuple):
deleted: Set["HashInfo"]


def _indexed_dir_hashes(odb, index, dir_objs, name, cache_odb):
def _indexed_dir_hashes(odb, index, dir_objs, name, cache_odb, jobs=None):
# Validate our index by verifying all indexed .dir hashes
# still exist on the remote
dir_hashes = set(dir_objs.keys())
indexed_dirs = set(index.dir_hashes())
indexed_dir_exists = set()
if indexed_dirs:
indexed_dir_exists.update(odb.list_hashes_exists(indexed_dirs))
indexed_dir_exists.update(odb.list_hashes_exists(indexed_dirs, jobs))
missing_dirs = indexed_dirs.difference(indexed_dir_exists)
if missing_dirs:
logger.debug(
Expand All @@ -46,7 +46,7 @@ def _indexed_dir_hashes(odb, index, dir_objs, name, cache_odb):

# Check if non-indexed (new) dir hashes exist on remote
dir_exists = dir_hashes.intersection(indexed_dir_exists)
dir_exists.update(odb.list_hashes_exists(dir_hashes - dir_exists))
dir_exists.update(odb.list_hashes_exists(dir_hashes - dir_exists, jobs))

# If .dir hash exists in the ODB, assume directory contents
# also exists
Expand Down Expand Up @@ -76,6 +76,7 @@ def status(
index: Optional["ObjectDBIndexBase"] = None,
cache_odb: Optional["ObjectDB"] = None,
shallow: bool = True,
jobs: Optional[int] = None,
**kwargs,
) -> "StatusResult":
"""Return status of whether or not the specified objects exist odb.
Expand Down Expand Up @@ -121,15 +122,17 @@ def status(
if index and hashes:
if dir_objs:
exists = hashes.intersection(
_indexed_dir_hashes(odb, index, dir_objs, name, cache_odb)
_indexed_dir_hashes(
odb, index, dir_objs, name, cache_odb, jobs=jobs
)
)
hashes.difference_update(exists)
if hashes:
exists.update(index.intersection(hashes))
hashes.difference_update(exists)

if hashes:
exists.update(odb.hashes_exist(hashes, name=odb.fs_path, **kwargs))
exists.update(odb.hashes_exist(hashes, name=odb.fs_path, jobs=jobs))
return StatusResult(
{hash_infos[hash_] for hash_ in exists},
{hash_infos[hash_] for hash_ in (hashes - exists)},
Expand All @@ -144,6 +147,7 @@ def compare_status(
check_deleted: bool = True,
src_index: Optional["ObjectDBIndexBase"] = None,
dest_index: Optional["ObjectDBIndexBase"] = None,
jobs: Optional[int] = None,
**kwargs,
) -> "CompareStatusResult":
"""Compare status for the specified objects between two ODBs.
Expand All @@ -157,13 +161,13 @@ def compare_status(
if "cache_odb" not in kwargs:
kwargs["cache_odb"] = src
dest_exists, dest_missing = status(
dest, obj_ids, index=dest_index, **kwargs
dest, obj_ids, index=dest_index, jobs=jobs, **kwargs
)
# for transfer operations we can skip src status check when all objects
# already exist in dest
if dest_missing or check_deleted:
src_exists, src_missing = status(
src, obj_ids, index=src_index, **kwargs
src, obj_ids, index=src_index, jobs=jobs, **kwargs
)
else:
src_exists = dest_exists
Expand Down
4 changes: 3 additions & 1 deletion dvc/data/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ def transfer(
if src == dest:
return 0

status = compare_status(src, dest, obj_ids, check_deleted=False, **kwargs)
status = compare_status(
src, dest, obj_ids, check_deleted=False, jobs=jobs, **kwargs
)
if not status.new:
return 0

Expand Down
6 changes: 4 additions & 2 deletions dvc/objects/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ def all(self, jobs=None, name=None):

remote_size, remote_hashes = self._estimate_remote_size(name=name)
return self._list_hashes_traverse(
remote_size, remote_hashes, jobs, name
remote_size, remote_hashes, jobs=jobs, name=name
)

def _remove_unpacked_dir(self, hash_):
Expand Down Expand Up @@ -457,6 +457,8 @@ def hashes_exist(self, hashes, jobs=None, name=None):

logger.debug(f"Querying '{len(hashes)}' hashes via traverse")
remote_hashes = set(
self._list_hashes_traverse(remote_size, remote_hashes, jobs, name)
self._list_hashes_traverse(
remote_size, remote_hashes, jobs=jobs, name=name
)
)
return list(hashes & set(remote_hashes))
2 changes: 1 addition & 1 deletion dvc/repo/gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def gc(
return

odb = self.cloud.get_remote_odb(remote, "gc -c")
removed = ogc(odb, used_obj_ids)
removed = ogc(odb, used_obj_ids, jobs=jobs)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we were ignoring job count in gc too

Copy link
Collaborator

@skshetry skshetry Feb 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have an issue for that: #5961.

Copy link
Contributor

@daavoo daavoo Feb 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this P.R. close #5961 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm looking into how easy it would be to add basic multithreading to gc. For now, it only uses multithreading when listing objects from the odb (odb.all()) using _list_hashes_traverse.

I think it should #5961 should be closed in another PR.

if removed:
get_index(odb).clear()
else:
Expand Down
46 changes: 44 additions & 2 deletions tests/func/test_data_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@

import dvc as dvc_module
from dvc.cli import main
from dvc.data.db.local import LocalObjectDB
from dvc.external_repo import clean_repos
from dvc.objects.db import ObjectDB
from dvc.stage.exceptions import StageNotFound
from dvc.testing.test_remote import ( # noqa, pylint: disable=unused-import
TestRemote,
)
from dvc.utils.fs import remove


def test_cloud_cli(tmp_dir, dvc, remote):
args = ["-v", "-j", "2"]
def test_cloud_cli(tmp_dir, dvc, remote, mocker):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is getting crazier and crazier :) We'll get rid of it when we have lower level testing in dvc-data/objects.

jobs = 2
args = ["-v", "-j", str(jobs)]

(stage,) = tmp_dir.dvc_gen("foo", "foo")
cache = stage.outs[0].cache_path
Expand All @@ -34,25 +37,44 @@ def test_cloud_cli(tmp_dir, dvc, remote):
cache_dir = stage_dir.outs[0].cache_path

# FIXME check status output
hashes_exist = mocker.spy(LocalObjectDB, "hashes_exist")

assert main(["push"] + args) == 0
assert os.path.exists(cache)
assert os.path.isfile(cache)
assert os.path.isfile(cache_dir)
assert hashes_exist.called
assert all(
_kwargs["jobs"] == jobs
for (_args, _kwargs) in hashes_exist.call_args_list
)

remove(dvc.odb.local.cache_dir)
hashes_exist.reset_mock()

assert main(["fetch"] + args) == 0
assert os.path.exists(cache)
assert os.path.isfile(cache)
assert os.path.isfile(cache_dir)
assert hashes_exist.called
assert all(
_kwargs["jobs"] == jobs
for (_args, _kwargs) in hashes_exist.call_args_list
)

hashes_exist.reset_mock()

assert main(["pull"] + args) == 0
assert os.path.exists(cache)
assert os.path.isfile(cache)
assert os.path.isfile(cache_dir)
assert os.path.isfile("foo")
assert os.path.isdir("data_dir")
assert hashes_exist.called
assert all(
_kwargs["jobs"] == jobs
for (_args, _kwargs) in hashes_exist.call_args_list
)

with open(cache, encoding="utf-8") as fd:
assert fd.read() == "foo"
Expand All @@ -62,18 +84,38 @@ def test_cloud_cli(tmp_dir, dvc, remote):
if remote.url.startswith("http"):
return

hashes_exist.reset_mock()

_list_hashes_traverse = mocker.spy(ObjectDB, "_list_hashes_traverse")
# NOTE: check if remote gc works correctly on directories
assert main(["gc", "-cw", "-f"] + args) == 0
assert _list_hashes_traverse.called
assert all(
_kwargs["jobs"] == 2
for (_args, _kwargs) in hashes_exist.call_args_list
)
shutil.move(dvc.odb.local.cache_dir, dvc.odb.local.cache_dir + ".back")

assert main(["fetch"] + args) == 0

assert hashes_exist.called
assert all(
_kwargs["jobs"] == jobs
for (_args, _kwargs) in hashes_exist.call_args_list
)

hashes_exist.reset_mock()
assert main(["pull", "-f"] + args) == 0
assert os.path.exists(cache)
assert os.path.isfile(cache)
assert os.path.isfile(cache_dir)
assert os.path.isfile("foo")
assert os.path.isdir("data_dir")
assert hashes_exist.called
assert all(
_kwargs["jobs"] == jobs
for (_args, _kwargs) in hashes_exist.call_args_list
)


def test_data_cloud_error_cli(dvc):
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/remote/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ def test_hashes_exist(object_exists, traverse, dvc):
traverse.assert_called_with(
256 * pow(16, odb.fs.TRAVERSE_PREFIX_LEN),
set(range(256)),
None,
None,
jobs=None,
name=None,
)


Expand Down