diff --git a/scmrepo/fs.py b/scmrepo/fs.py index 09d0ceb7..0533630f 100644 --- a/scmrepo/fs.py +++ b/scmrepo/fs.py @@ -1,6 +1,6 @@ import errno import os -from itertools import chain +import posixpath from typing import ( TYPE_CHECKING, Any, @@ -20,6 +20,124 @@ from scmrepo.git.objects import GitTrie +class Path: + def __init__(self, sep, getcwd=None, realpath=None): + def _getcwd(): + return "" + + self.getcwd = getcwd or _getcwd + self.realpath = realpath or self.abspath + + assert sep == posixpath.sep + self.flavour = posixpath + + def chdir(self, path): + def _getcwd(): + return path + + self.getcwd = _getcwd + + def join(self, *parts): + return self.flavour.join(*parts) + + def split(self, path): + return self.flavour.split(path) + + def normpath(self, path): + return self.flavour.normpath(path) + + def isabs(self, path): + return self.flavour.isabs(path) + + def abspath(self, path): + if not self.isabs(path): + path = self.join(self.getcwd(), path) + return self.normpath(path) + + def commonprefix(self, path): + return self.flavour.commonprefix(path) + + def parts(self, path): + drive, path = self.flavour.splitdrive(path.rstrip(self.flavour.sep)) + + ret = [] + while True: + path, part = self.flavour.split(path) + + if part: + ret.append(part) + continue + + if path: + ret.append(path) + + break + + ret.reverse() + + if drive: + ret = [drive] + ret + + return tuple(ret) + + def parent(self, path): + return self.flavour.dirname(path) + + def dirname(self, path): + return self.parent(path) + + def parents(self, path): + parts = self.parts(path) + return tuple( + self.join(*parts[:length]) + for length in range(len(parts) - 1, 0, -1) + ) + + def name(self, path): + return self.parts(path)[-1] + + def suffix(self, path): + name = self.name(path) + _, dot, suffix = name.partition(".") + return dot + suffix + + def with_name(self, path, name): + parts = list(self.parts(path)) + parts[-1] = name + return self.join(*parts) + + def with_suffix(self, path, suffix): + parts = list(self.parts(path)) + real_path, _, _ = parts[-1].partition(".") + parts[-1] = real_path + suffix + return self.join(*parts) + + def isin(self, left, right): + left_parts = self.parts(left) + right_parts = self.parts(right) + left_len = len(left_parts) + right_len = len(right_parts) + return left_len > right_len and left_parts[:right_len] == right_parts + + def isin_or_eq(self, left, right): + return left == right or self.isin(left, right) + + def overlaps(self, left, right): + # pylint: disable=arguments-out-of-order + return self.isin_or_eq(left, right) or self.isin(right, left) + + def relpath(self, path, start=None): + if start is None: + start = self.getcwd() + return self.flavour.relpath(path, start=start) + + def relparts(self, path, base): + return self.parts(self.relpath(path, base)) + + def as_posix(self, path): + return path.replace(self.flavour.sep, posixpath.sep) + + def bytesio_len(obj: "BytesIO") -> Optional[int]: try: offset = obj.tell() @@ -32,8 +150,8 @@ def bytesio_len(obj: "BytesIO") -> Optional[int]: class GitFileSystem(AbstractFileSystem): # pylint: disable=abstract-method - sep = os.sep cachable = False + root_marker = "/" def __init__( self, @@ -54,22 +172,22 @@ def __init__( resolved = resolver(scm, rev or "HEAD") tree_obj = scm.pygit2.get_tree_obj(rev=resolved) trie = GitTrie(tree_obj, resolved) - path = scm.root_dir - else: - assert path self.trie = trie - self.root_dir = path self.rev = self.trie.rev - def _get_key(self, path: str) -> Tuple[str, ...]: - from scmrepo.utils import relpath + def _getcwd(): + return self.root_marker + + self.path = Path(self.sep, getcwd=_getcwd) - if os.path.isabs(path): - path = relpath(path, self.root_dir) - relparts = path.split(os.sep) - if relparts == ["."]: + def _get_key(self, path: str) -> Tuple[str, ...]: + path = self.path.abspath(path) + if path == self.root_marker: return () + relparts = path.split(self.sep) + if relparts and relparts[0] in (".", ""): + relparts = relparts[1:] return tuple(relparts) def _open( @@ -102,7 +220,7 @@ def info(self, path: str, **kwargs: Any) -> Dict[str, Any]: try: return { **self.trie.info(key), - "name": os.path.join(self.root_dir, self.sep.join(key)), + "name": path, } except KeyError: raise FileNotFoundError( @@ -116,51 +234,22 @@ def exists(self, path: str, **kwargs: Any) -> bool: def checksum(self, path: str) -> str: return self.info(path)["sha"] - def walk( # pylint: disable=arguments-differ - self, - top: str, - topdown: bool = True, - onerror: Callable[[OSError], None] = None, - maxdepth: int = None, - detail: bool = False, - **kwargs: Any, - ): - """Directory tree generator. - - See `os.walk` for the docs. Differences: - - no support for symlinks - """ - assert maxdepth is None # not supported yet. - if not self.isdir(top): - if onerror: - if self.exists(top): - exc: OSError = NotADirectoryError( - errno.ENOTDIR, os.strerror(errno.ENOTDIR), top - ) - else: - exc = FileNotFoundError( - errno.ENOENT, os.strerror(errno.ENOENT), top - ) - onerror(exc) - return [] - - key = self._get_key(top) - for prefix, dirs, files in self.trie.walk(key, topdown=topdown): - root = self.root_dir - - if prefix: - root = os.path.join(root, os.sep.join(prefix)) - if detail: - yield ( - root, - {d: self.info(os.path.join(root, d)) for d in dirs}, - {f: self.info(os.path.join(root, f)) for f in files}, - ) - else: - yield root, dirs, files - def ls(self, path, detail=True, **kwargs): - for _, dirs, files in self.walk(path, detail=detail, **kwargs): - if detail: - return list(chain(dirs.values(), files.values())) - return dirs + files + info = self.info(path) + if info["type"] != "directory": + return [info] if detail else [path] + + key = self._get_key(path) + try: + names = self.trie.ls(key) + except KeyError as exc: + raise FileNotFoundError from exc + + paths = [ + posixpath.join(path, name) if path else name for name in names + ] + + if not detail: + return paths + + return [self.info(_path) for _path in paths] diff --git a/scmrepo/git/backend/pygit2.py b/scmrepo/git/backend/pygit2.py index d483efc8..defbaa86 100644 --- a/scmrepo/git/backend/pygit2.py +++ b/scmrepo/git/backend/pygit2.py @@ -64,7 +64,10 @@ def mode(self): @property def size(self) -> int: - return len(self.obj.read_raw()) + try: + return len(self.obj.read_raw()) + except KeyError: + return 0 @property def sha(self) -> str: diff --git a/scmrepo/git/objects.py b/scmrepo/git/objects.py index 0988e0f2..2da033fe 100644 --- a/scmrepo/git/objects.py +++ b/scmrepo/git/objects.py @@ -101,20 +101,30 @@ def isfile(self, key: tuple) -> bool: return obj.isfile - def walk(self, top: tuple, topdown: Optional[bool] = True): - dirs = [] - nondirs = [] + def ls(self, key: tuple): + ret = [] - def node_factory(_, path, children, obj): - if path == top: + def node_factory(_, _key, children, obj): + if key == _key: assert obj.isdir list(filter(None, children)) - elif obj.isdir: - dirs.append(obj.name) else: - nondirs.append(obj.name) + ret.append(_key[-1]) + + self.trie.traverse(node_factory, prefix=key) + + return ret - self.trie.traverse(node_factory, prefix=top) + def walk(self, top: tuple, topdown: Optional[bool] = True): + dirs = [] + nondirs = [] + + for name in self.ls(top): + info = self.info(top + (name,)) + if info["type"] == "directory": + dirs.append(name) + else: + nondirs.append(name) if topdown: yield top, dirs, nondirs diff --git a/tests/test_fs.py b/tests/test_fs.py index d07c27f1..bd509663 100644 --- a/tests/test_fs.py +++ b/tests/test_fs.py @@ -1,5 +1,3 @@ -import os - import pytest from pytest_test_utils import TmpDir @@ -31,18 +29,22 @@ def test_exists(tmp_dir: TmpDir, scm: Git): fs = scm.get_fs("master") + assert fs.exists("/") + assert fs.exists(".") assert not fs.exists("foo") assert not fs.exists("тест") assert not fs.exists("data") - assert not fs.exists(os.path.join("data", "lorem")) + assert not fs.exists("data/lorem") scm.add_commit(files, message="add") fs = scm.get_fs("master") + assert fs.exists("/") + assert fs.exists(".") assert fs.exists("foo") assert fs.exists("тест") assert fs.exists("data") - assert fs.exists(os.path.join("data", "lorem")) + assert fs.exists("data/lorem") assert not fs.exists("non-existing-file") @@ -52,6 +54,8 @@ def test_isdir(tmp_dir: TmpDir, scm: Git): fs = scm.get_fs("master") + assert fs.isdir("/") + assert fs.isdir(".") assert fs.isdir("data") assert not fs.isdir("foo") assert not fs.isdir("non-existing-file") @@ -62,6 +66,8 @@ def test_isfile(tmp_dir: TmpDir, scm: Git): scm.add_commit(["foo", "data"], message="add") fs = scm.get_fs("master") + assert not fs.isfile("/") + assert not fs.isfile(".") assert fs.isfile("foo") assert not fs.isfile("data") assert not fs.isfile("not-existing-file") @@ -75,7 +81,7 @@ def test_walk(tmp_dir: TmpDir, scm: Git): "data": {"lorem": "ipsum", "subdir": {"sub": "sub"}}, } ) - scm.add_commit(os.path.join("data", "subdir"), message="add") + scm.add_commit("data/subdir", message="add") fs = scm.get_fs("master") def convert_to_sets(walk_results): @@ -84,24 +90,22 @@ def convert_to_sets(walk_results): for root, dirs, nondirs in walk_results ] - assert convert_to_sets(fs.walk(".")) == convert_to_sets( + assert convert_to_sets(fs.walk("/")) == convert_to_sets( [ - (scm.root_dir, ["data"], []), - (os.path.join(scm.root_dir, "data"), ["subdir"], []), + ("/", ["data"], []), + ("/data", ["subdir"], []), ( - os.path.join(scm.root_dir, "data", "subdir"), + "/data/subdir", [], ["sub"], ), ] ) - assert convert_to_sets( - fs.walk(os.path.join("data", "subdir")) - ) == convert_to_sets( + assert convert_to_sets(fs.walk("data/subdir")) == convert_to_sets( [ ( - os.path.join(scm.root_dir, "data", "subdir"), + "data/subdir", [], ["sub"], ) @@ -120,25 +124,25 @@ def test_ls(tmp_dir: TmpDir, scm: Git): scm.add_commit(files, message="add") fs = scm.get_fs("master") - assert fs.ls(".", detail=False) == ["data", "foo", "тест"] - assert fs.ls(".") == [ + assert fs.ls("/", detail=False) == ["/data", "/foo", "/тест"] + assert fs.ls("/") == [ { "mode": 16384, - "name": str(tmp_dir / "data"), + "name": "/data", "sha": "f5d6ac1955c85410b71bb6e35e4c57c54e2ad524", "size": 66, "type": "directory", }, { "mode": 33188, - "name": str(tmp_dir / "foo"), + "name": "/foo", "sha": "19102815663d23f8b75a47e7a01965dcdc96468c", "size": 3, "type": "file", }, { "mode": 33188, - "name": str(tmp_dir / "тест"), + "name": "/тест", "sha": "eeeba1738f4c12844163b89112070c6e57eb764e", "size": 16, "type": "file", diff --git a/tests/test_git.py b/tests/test_git.py index f5bc93e2..04f0ce5c 100644 --- a/tests/test_git.py +++ b/tests/test_git.py @@ -126,7 +126,7 @@ def test_walk_with_submodules( files = [] dirs = [] fs = scm.get_fs("HEAD") - for _, dnames, fnames in fs.walk("."): + for _, dnames, fnames in fs.walk(""): dirs.extend(dnames) files.extend(fnames) @@ -135,30 +135,6 @@ def test_walk_with_submodules( assert set(files) == {".gitmodules", "submodule"} -def test_walk_onerror(tmp_dir: TmpDir, scm: Git): - def onerror(exc): - raise exc - - tmp_dir.gen("foo", "foo") - scm.add_commit("foo", message="init") - - fs = scm.get_fs("HEAD") - - # path does not exist - for _ in fs.walk("dir"): - pass - with pytest.raises(OSError): - for _ in fs.walk("dir", onerror=onerror): - pass - - # path is not a directory - for _ in fs.walk("foo"): - pass - with pytest.raises(OSError): - for _ in fs.walk("foo", onerror=onerror): - pass - - @pytest.mark.skip_git_backend("pygit2") def test_is_tracked(tmp_dir: TmpDir, scm: Git, git: Git): tmp_dir.gen(