From fd89472790d127648d4654f0a72c0365335b68a2 Mon Sep 17 00:00:00 2001 From: Ruslan Kuprieiev Date: Mon, 25 Jan 2021 03:16:21 +0200 Subject: [PATCH] RepoTree: get rid if implicit fetching --- dvc/cache/base.py | 11 ++--- dvc/dependency/repo.py | 4 +- dvc/external_repo.py | 3 -- dvc/repo/__init__.py | 6 +-- dvc/repo/diff.py | 2 +- dvc/repo/fetch.py | 13 ++++- dvc/repo/ls.py | 4 +- dvc/tree/dvc.py | 71 +++++++++------------------ dvc/tree/repo.py | 20 ++------ dvc/tree/webdav.py | 20 ++++++++ tests/func/test_import.py | 4 +- tests/func/test_tree.py | 46 ----------------- tests/unit/test_external_repo.py | 2 - tests/unit/tree/test_dvc.py | 34 +++++-------- tests/unit/tree/test_repo.py | 12 ++--- tests/unit/tree/test_repo_metadata.py | 2 +- 16 files changed, 88 insertions(+), 166 deletions(-) diff --git a/dvc/cache/base.py b/dvc/cache/base.py index a32999ff15..8a6b089549 100644 --- a/dvc/cache/base.py +++ b/dvc/cache/base.py @@ -246,12 +246,7 @@ def _save_file(self, path_info, tree, hash_info, save_link=True, **kwargs): else: if self.changed_cache(hash_info): with tree.open(path_info, mode="rb") as fobj: - # if tree has fetch enabled, DVC out will be fetched on - # open and we do not need to read/copy any data - if not ( - tree.isdvc(path_info, strict=False) and tree.fetch - ): - self.tree.upload_fobj(fobj, cache_info) + self.tree.upload_fobj(fobj, cache_info) callback = kwargs.get("download_callback") if callback: callback(1) @@ -444,7 +439,9 @@ def save(self, path_info, tree, hash_info, save_link=True, **kwargs): ) if not hash_info: - hash_info = tree.get_hash(path_info, **kwargs) + kw = kwargs.copy() + kw.pop("download_callback", None) + hash_info = tree.get_hash(path_info, **kw) if not hash_info: raise FileNotFoundError( errno.ENOENT, os.strerror(errno.ENOENT), path_info diff --git a/dvc/dependency/repo.py b/dvc/dependency/repo.py index 87b011ebbb..7cdd6310ff 100644 --- a/dvc/dependency/repo.py +++ b/dvc/dependency/repo.py @@ -47,9 +47,7 @@ def _make_repo(self, *, locked=True, **kwargs): return external_repo(d["url"], rev=rev, **kwargs) def _get_hash(self, locked=True): - # we want stream but not fetch, so DVC out directories are - # walked, but dir contents is not fetched - with self._make_repo(locked=locked, fetch=False, stream=True) as repo: + with self._make_repo(locked=locked) as repo: path_info = PathInfo(repo.root_dir) / self.def_path return repo.repo_tree.get_hash(path_info, follow_subrepos=False) diff --git a/dvc/external_repo.py b/dvc/external_repo.py index ba470140ce..7b12df4cd5 100644 --- a/dvc/external_repo.py +++ b/dvc/external_repo.py @@ -66,9 +66,6 @@ def make_repo(path, **_kwargs): **kwargs, ) - if "fetch" not in repo_kwargs: - repo_kwargs["fetch"] = True - repo = Repo(**repo_kwargs) try: diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index bcb76de28f..25c80fa3c5 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -130,8 +130,6 @@ def __init__( config=None, url=None, repo_factory=None, - fetch=None, - stream=None, ): from dvc.cache import Cache from dvc.config import Config @@ -148,8 +146,6 @@ def __init__( self.url = url self._tree_conf = { - "stream": stream, - "fetch": fetch, "repo_factory": repo_factory, } @@ -458,7 +454,7 @@ def open_by_relpath(self, path, remote=None, mode="r", encoding=None): """Opens a specified resource as a file descriptor""" from dvc.tree.repo import RepoTree - tree = RepoTree(self, stream=True, subrepos=True) + tree = RepoTree(self, subrepos=True) path = PathInfo(self.root_dir) / path try: with self.state: diff --git a/dvc/repo/diff.py b/dvc/repo/diff.py index 55351c0ce0..cd734cacc6 100644 --- a/dvc/repo/diff.py +++ b/dvc/repo/diff.py @@ -22,7 +22,7 @@ def diff(self, a_rev="HEAD", b_rev=None, targets=None): from dvc.tree.repo import RepoTree - repo_tree = RepoTree(self, stream=True) + repo_tree = RepoTree(self) b_rev = b_rev if b_rev else "workspace" results = {} diff --git a/dvc/repo/fetch.py b/dvc/repo/fetch.py index 12af778857..345a182ecf 100644 --- a/dvc/repo/fetch.py +++ b/dvc/repo/fetch.py @@ -1,7 +1,8 @@ import logging +import os from dvc.config import NoRemoteError -from dvc.exceptions import DownloadError +from dvc.exceptions import DownloadError, NoOutputOrStageError from . import locked @@ -98,6 +99,16 @@ def cb(result): root = PathInfo(repo.root_dir) for path in files: path_info = root / path + try: + used = repo.used_cache( + [os.fspath(path_info)], + force=True, + jobs=jobs, + recursive=True, + ) + cb(repo.cloud.pull(used, jobs)) + except (NoOutputOrStageError, NoRemoteError): + pass self.cache.local.save( path_info, repo.repo_tree, diff --git a/dvc/repo/ls.py b/dvc/repo/ls.py index 0512a582ec..720b586614 100644 --- a/dvc/repo/ls.py +++ b/dvc/repo/ls.py @@ -30,9 +30,7 @@ def ls( """ from dvc.external_repo import external_repo - # use our own RepoTree instance instead of repo.repo_tree since we want to - # fetch directory listings, but don't want to fetch file contents. - with external_repo(url, rev, fetch=False, stream=True) as repo: + with external_repo(url, rev) as repo: path_info = PathInfo(repo.root_dir) if path: path_info /= path diff --git a/dvc/tree/dvc.py b/dvc/tree/dvc.py index 14a094dbc5..2bade0304e 100644 --- a/dvc/tree/dvc.py +++ b/dvc/tree/dvc.py @@ -7,7 +7,7 @@ from dvc.utils import relpath from ._metadata import Metadata -from .base import BaseTree, RemoteActionNotImplemented +from .base import BaseTree if typing.TYPE_CHECKING: from dvc.output.base import BaseOutput @@ -21,22 +21,13 @@ class DvcTree(BaseTree): # pylint:disable=abstract-method Args: repo: DVC repo. - fetch: if True, uncached DVC outs will be fetched on `open()`. - stream: if True, uncached DVC outs will be streamed directly from - remote on `open()`. - - `stream` takes precedence over `fetch`. If `stream` is enabled and - a remote does not support streaming, uncached DVC outs will be fetched - as a fallback. """ scheme = "local" PARAM_CHECKSUM = "md5" - def __init__(self, repo, fetch=False, stream=False): + def __init__(self, repo): super().__init__(repo, {"url": repo.root_dir}) - self.fetch = fetch - self.stream = stream def _find_outs(self, path, *args, **kwargs): outs = self.repo.find_outs_by_path(path, *args, **kwargs) @@ -54,9 +45,6 @@ def _get_granular_hash( self, path_info: PathInfo, out: "BaseOutput", remote=None ): assert isinstance(path_info, PathInfo) - if not self.fetch and not self.stream: - raise FileNotFoundError - # NOTE: use string paths here for performance reasons key = tuple(relpath(path_info, out.path_info).split(os.sep)) out.get_dir_cache(remote=remote) @@ -80,24 +68,20 @@ def open( # type: ignore out = outs[0] if out.changed_cache(filter_info=path): - if not self.fetch and not self.stream: - raise FileNotFoundError + from dvc.config import NoRemoteError - remote_obj = self.repo.cloud.get_remote(remote) - if self.stream: - if out.is_dir_checksum: - checksum = self._get_granular_hash(path, out).value - else: - checksum = out.hash_info.value - try: - remote_info = remote_obj.tree.hash_to_path_info(checksum) - return remote_obj.tree.open( - remote_info, mode=mode, encoding=encoding - ) - except RemoteActionNotImplemented: - pass - cache_info = out.get_used_cache(filter_info=path, remote=remote) - self.repo.cloud.pull(cache_info, remote=remote) + try: + remote_obj = self.repo.cloud.get_remote(remote) + except NoRemoteError: + raise FileNotFoundError + if out.is_dir_checksum: + checksum = self._get_granular_hash(path, out).value + else: + checksum = out.hash_info.value + remote_info = remote_obj.tree.hash_to_path_info(checksum) + return remote_obj.tree.open( + remote_info, mode=mode, encoding=encoding + ) if out.is_dir_checksum: checksum = self._get_granular_hash(path, out).value @@ -143,24 +127,17 @@ def isfile(self, path): # pylint: disable=arguments-differ except FileNotFoundError: return False - def _fetch_dir( - self, out, filter_info=None, download_callback=None, **kwargs - ): + def _fetch_dir(self, out, **kwargs): # pull dir cache if needed out.get_dir_cache(**kwargs) - # pull dir contents if needed - if self.fetch and out.changed_cache(filter_info=filter_info): - used_cache = out.get_used_cache(filter_info=filter_info) - downloaded = self.repo.cloud.pull(used_cache, **kwargs) - if download_callback: - download_callback(downloaded) - - def _add_dir(self, top, trie, out, **kwargs): - if not self.fetch and not self.stream: - return + dir_cache = out.dir_cache + hash_info = out.cache.save_dir_info(dir_cache) + if hash_info != out.hash_info: + raise FileNotFoundError - self._fetch_dir(out, filter_info=top, **kwargs) + def _add_dir(self, trie, out, **kwargs): + self._fetch_dir(out, **kwargs) base = out.path_info.parts for key in out.dir_cache.trie.iterkeys(): # noqa: B301 @@ -172,7 +149,7 @@ def _walk(self, root, trie, topdown=True, **kwargs): out = trie.get(root.parts) if out and out.is_dir_checksum: - self._add_dir(root, trie, out, **kwargs) + self._add_dir(trie, out, **kwargs) root_len = len(root.parts) for key, out in trie.iteritems(prefix=root.parts): # noqa: B301 @@ -215,7 +192,7 @@ def walk(self, top, topdown=True, onerror=None, **kwargs): trie[out.path_info.parts] = out if out.is_dir_checksum and root.isin_or_eq(out.path_info): - self._add_dir(top, trie, out, **kwargs) + self._add_dir(trie, out, **kwargs) yield from self._walk(root, trie, topdown=topdown, **kwargs) diff --git a/dvc/tree/repo.py b/dvc/tree/repo.py index ff4c3093f7..5e2b4a1e1a 100644 --- a/dvc/tree/repo.py +++ b/dvc/tree/repo.py @@ -36,7 +36,7 @@ class RepoTree(BaseTree): # pylint:disable=abstract-method PARAM_CHECKSUM = "md5" def __init__( - self, repo, subrepos=False, repo_factory: RepoFactory = None, **kwargs + self, repo, subrepos=False, repo_factory: RepoFactory = None, ): super().__init__(repo, {"url": repo.root_dir}) @@ -61,10 +61,8 @@ def __init__( self._dvctrees = {} """Keep a dvctree instance of each repo.""" - self._dvctree_configs = kwargs - if hasattr(repo, "dvc_dir"): - self._dvctrees[repo.root_dir] = DvcTree(repo, **kwargs) + self._dvctrees[repo.root_dir] = DvcTree(repo) def _get_repo(self, path) -> Optional["Repo"]: """Returns repo that the path falls in, using prefix. @@ -99,12 +97,8 @@ def _update(self, dirs, starting_repo): scm=self.repo.scm, rev=self.repo.get_rev(), repo_factory=self.repo_factory, - fetch=self.fetch, - stream=self.stream, - ) - self._dvctrees[repo.root_dir] = DvcTree( - repo, **self._dvctree_configs ) + self._dvctrees[repo.root_dir] = DvcTree(repo) self._subrepos_trie[d] = repo def _is_dvc_repo(self, dir_path): @@ -131,14 +125,6 @@ def _get_tree_pair(self, path) -> Tuple[BaseTree, Optional[DvcTree]]: dvc_tree = self._dvctrees.get(repo.root_dir) return repo.tree, dvc_tree - @property - def fetch(self): - return self._dvctree_configs.get("fetch") - - @property - def stream(self): - return self._dvctree_configs.get("stream") - def open( self, path, mode="r", encoding="utf-8", **kwargs ): # pylint: disable=arguments-differ diff --git a/dvc/tree/webdav.py b/dvc/tree/webdav.py index 9828d52686..7becd5bc9b 100644 --- a/dvc/tree/webdav.py +++ b/dvc/tree/webdav.py @@ -1,3 +1,4 @@ +import io import logging import os import threading @@ -122,6 +123,25 @@ def _client(self): return client + def open(self, path_info, mode="r", encoding=None, **kwargs): + from webdav3.exceptions import RemoteResourceNotFound + + assert mode in {"r", "rt", "rb"} + + fobj = io.BytesIO() + + try: + self._client.download_from(buff=fobj, remote_path=path_info.path) + except RemoteResourceNotFound as exc: + raise FileNotFoundError from exc + + fobj.seek(0) + + if "mode" == "rb": + return fobj + + return io.TextIOWrapper(fobj, encoding=encoding) + # Checks whether file/directory exists at remote def exists(self, path_info, use_dvcignore=True): # Use webdav check to test for file existence diff --git a/tests/func/test_import.py b/tests/func/test_import.py index 044c8ad280..c10bc27506 100644 --- a/tests/func/test_import.py +++ b/tests/func/test_import.py @@ -9,7 +9,7 @@ from dvc.cache import Cache from dvc.config import NoRemoteError from dvc.dvcfile import Dvcfile -from dvc.exceptions import CollectCacheError, DownloadError +from dvc.exceptions import DownloadError from dvc.stage.exceptions import StagePathNotFoundError from dvc.system import System from dvc.utils.fs import makedirs, remove @@ -289,7 +289,7 @@ def test_push_wildcard_from_bare_git_repo( dvc_repo = make_tmp_dir("dvc-repo", scm=True, dvc=True) with dvc_repo.chdir(): dvc_repo.dvc.imp(os.fspath(tmp_dir), "dirextra") - with pytest.raises(CollectCacheError): + with pytest.raises(FileNotFoundError): dvc_repo.dvc.imp(os.fspath(tmp_dir), "dir123") diff --git a/tests/func/test_tree.py b/tests/func/test_tree.py index 3df94ee9ef..63942bcf77 100644 --- a/tests/func/test_tree.py +++ b/tests/func/test_tree.py @@ -9,8 +9,6 @@ from dvc.scm import SCM from dvc.tree import get_cloud_tree from dvc.tree.local import LocalTree -from dvc.tree.repo import RepoTree -from dvc.utils.fs import remove from tests.basic_env import TestDir, TestGit, TestGitSubmodule @@ -189,50 +187,6 @@ def test_branch(self): ) -def test_repotree_walk_fetch(tmp_dir, dvc, scm, local_remote): - out = tmp_dir.dvc_gen({"dir": {"foo": "foo"}}, commit="init")[0].outs[0] - dvc.push() - remove(dvc.cache.local.cache_dir) - remove(tmp_dir / "dir") - - tree = RepoTree(dvc, fetch=True) - for _, _, _ in tree.walk("dir"): - pass - - assert os.path.exists(out.cache_path) - for _, hi in out.dir_cache.items(): - assert hi.name == out.tree.PARAM_CHECKSUM - assert os.path.exists(dvc.cache.local.tree.hash_to_path_info(hi.value)) - - -def test_repotree_cache_save(tmp_dir, dvc, scm, erepo_dir, local_cloud): - with erepo_dir.chdir(): - erepo_dir.gen({"dir": {"subdir": {"foo": "foo"}, "bar": "bar"}}) - erepo_dir.dvc_add("dir/subdir", commit="subdir") - erepo_dir.scm_add("dir", commit="dir") - erepo_dir.add_remote(config=local_cloud.config) - erepo_dir.dvc.push() - - # test only cares that either fetch or stream are set so that DVC dirs are - # walked. - # - # for this test, all file objects are being opened() and copied from tree - # into dvc.cache, not fetched or streamed from a remote - tree = RepoTree(erepo_dir.dvc, stream=True) - expected = [ - tree.get_file_hash(PathInfo(erepo_dir / path)).value - for path in ("dir/bar", "dir/subdir/foo") - ] - - cache = dvc.cache.local - path_info = PathInfo(erepo_dir / "dir") - hash_info = cache.tree.get_hash(path_info) - cache.save(path_info, tree, hash_info) - - for hash_ in expected: - assert os.path.exists(cache.tree.hash_to_path_info(hash_)) - - def test_cleantree_subrepo(tmp_dir, dvc, scm, monkeypatch): tmp_dir.gen({"subdir": {}}) subrepo_dir = tmp_dir / "subdir" diff --git a/tests/unit/test_external_repo.py b/tests/unit/test_external_repo.py index c237671f2f..601946d4c1 100644 --- a/tests/unit/test_external_repo.py +++ b/tests/unit/test_external_repo.py @@ -39,8 +39,6 @@ def test_hook_is_called(tmp_dir, erepo_dir, mocker): path, scm=repo.scm, rev=repo.get_rev(), - fetch=True, - stream=None, repo_factory=repo.repo_tree.repo_factory, ) for path in paths diff --git a/tests/unit/tree/test_dvc.py b/tests/unit/tree/test_dvc.py index 811988240d..fc9d8ead40 100644 --- a/tests/unit/tree/test_dvc.py +++ b/tests/unit/tree/test_dvc.py @@ -133,25 +133,7 @@ def test_walk(tmp_dir, dvc): assert len(actual) == len(expected) -@pytest.mark.parametrize( - "fetch,expected", - [ - (False, []), - ( - True, - [ - PathInfo("dir") / "subdir1", - PathInfo("dir") / "subdir2", - PathInfo("dir") / "subdir1" / "foo1", - PathInfo("dir") / "subdir1" / "bar1", - PathInfo("dir") / "subdir2" / "foo2", - PathInfo("dir") / "foo", - PathInfo("dir") / "bar", - ], - ), - ], -) -def test_walk_dir(tmp_dir, dvc, fetch, expected): +def test_walk_dir(tmp_dir, dvc): tmp_dir.gen( { "dir": { @@ -164,9 +146,17 @@ def test_walk_dir(tmp_dir, dvc, fetch, expected): ) dvc.add("dir") - tree = DvcTree(dvc, fetch=fetch) + tree = DvcTree(dvc) - expected = [str(tmp_dir / path) for path in expected] + expected = [ + str(tmp_dir / "dir" / "subdir1"), + str(tmp_dir / "dir" / "subdir2"), + str(tmp_dir / "dir" / "subdir1" / "foo1"), + str(tmp_dir / "dir" / "subdir1" / "bar1"), + str(tmp_dir / "dir" / "subdir2" / "foo2"), + str(tmp_dir / "dir" / "foo"), + str(tmp_dir / "dir" / "bar"), + ] actual = [] for root, dirs, files in tree.walk("dir"): @@ -231,7 +221,7 @@ def test_get_hash_granular(tmp_dir, dvc): tmp_dir.dvc_gen( {"dir": {"foo": "foo", "bar": "bar", "subdir": {"data": "data"}}} ) - tree = DvcTree(dvc, fetch=True) + tree = DvcTree(dvc) subdir = PathInfo(tmp_dir) / "dir" / "subdir" assert tree.get_hash(subdir) == HashInfo( "md5", "af314506f1622d107e0ed3f14ec1a3b5.dir", diff --git a/tests/unit/tree/test_repo.py b/tests/unit/tree/test_repo.py index b10ee9651a..ced69d6a26 100644 --- a/tests/unit/tree/test_repo.py +++ b/tests/unit/tree/test_repo.py @@ -94,7 +94,7 @@ def test_exists_isdir_isfile_dirty(tmp_dir, dvc): {"datafile": "data", "datadir": {"foo": "foo", "bar": "bar"}} ) - tree = RepoTree(dvc, stream=True) + tree = RepoTree(dvc) shutil.rmtree(tmp_dir / "datadir") (tmp_dir / "datafile").unlink() @@ -324,7 +324,7 @@ def test_subrepos(tmp_dir, scm, dvc): ) dvc.tree._reset() - tree = RepoTree(dvc, subrepos=True, fetch=True) + tree = RepoTree(dvc, subrepos=True) def assert_tree_belongs_to_repo(ret_val): method = tree._get_repo @@ -406,7 +406,7 @@ def test_subrepo_walk(tmp_dir, scm, dvc, dvcfiles, extra_expected): # using tree that does not have dvcignore dvc.tree._reset() - tree = RepoTree(dvc, subrepos=True, fetch=True) + tree = RepoTree(dvc, subrepos=True) expected = [ PathInfo("dir") / "repo", PathInfo("dir") / "repo.txt", @@ -449,7 +449,7 @@ def test_repo_tree_no_subrepos(tmp_dir, dvc, scm): # using tree that does not have dvcignore dvc.tree._reset() - tree = RepoTree(dvc, subrepos=False, fetch=True) + tree = RepoTree(dvc, subrepos=False) expected = [ tmp_dir / ".dvcignore", tmp_dir / ".gitignore", @@ -516,7 +516,7 @@ def test_get_hash_cached_granular(tmp_dir, dvc, mocker): tmp_dir.dvc_gen( {"dir": {"foo": "foo", "bar": "bar", "subdir": {"data": "data"}}} ) - tree = RepoTree(dvc, fetch=True) + tree = RepoTree(dvc) dvc_tree_spy = mocker.spy(tree._dvctrees[dvc.root_dir], "get_file_hash") subdir = PathInfo(tmp_dir) / "dir" / "subdir" assert tree.get_hash(subdir) == HashInfo( @@ -612,7 +612,7 @@ def dvc_structure(suffix): expected[str(tmp_dir / "subrepo1")].add("subrepo3") actual = {} - tree = RepoTree(dvc, subrepos=traverse_subrepos, fetch=True) + tree = RepoTree(dvc, subrepos=traverse_subrepos) for root, dirs, files in tree.walk(str(tmp_dir)): actual[root] = set(dirs + files) assert expected == actual diff --git a/tests/unit/tree/test_repo_metadata.py b/tests/unit/tree/test_repo_metadata.py index 0e98a4d116..903e2a360f 100644 --- a/tests/unit/tree/test_repo_metadata.py +++ b/tests/unit/tree/test_repo_metadata.py @@ -37,7 +37,7 @@ def repo_tree(tmp_dir, dvc, scm): tmp_dir.scm_gen(fs_structure, commit="repo init") tmp_dir.dvc_gen(dvc_structure, commit="use dvc") - yield RepoTree(dvc, fetch=True, subrepos=True) + yield RepoTree(dvc, subrepos=True) def test_metadata_not_existing(repo_tree):