Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 149 additions & 60 deletions scmrepo/fs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import errno
import os
from itertools import chain
import posixpath
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -20,6 +20,124 @@
from scmrepo.git.objects import GitTrie


class Path:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporary copy from dvc/fs/path.py until we get it into fsspec. Useful for handling chdirs.

def __init__(self, sep, getcwd=None, realpath=None):
def _getcwd():
return ""

self.getcwd = getcwd or _getcwd
self.realpath = realpath or self.abspath

assert sep == posixpath.sep
self.flavour = posixpath

def chdir(self, path):
def _getcwd():
return path

self.getcwd = _getcwd

def join(self, *parts):
return self.flavour.join(*parts)

def split(self, path):
return self.flavour.split(path)

def normpath(self, path):
return self.flavour.normpath(path)

def isabs(self, path):
return self.flavour.isabs(path)

def abspath(self, path):
if not self.isabs(path):
path = self.join(self.getcwd(), path)
return self.normpath(path)

def commonprefix(self, path):
return self.flavour.commonprefix(path)

def parts(self, path):
drive, path = self.flavour.splitdrive(path.rstrip(self.flavour.sep))

ret = []
while True:
path, part = self.flavour.split(path)

if part:
ret.append(part)
continue

if path:
ret.append(path)

break

ret.reverse()

if drive:
ret = [drive] + ret

return tuple(ret)

def parent(self, path):
return self.flavour.dirname(path)

def dirname(self, path):
return self.parent(path)

def parents(self, path):
parts = self.parts(path)
return tuple(
self.join(*parts[:length])
for length in range(len(parts) - 1, 0, -1)
)

def name(self, path):
return self.parts(path)[-1]

def suffix(self, path):
name = self.name(path)
_, dot, suffix = name.partition(".")
return dot + suffix

def with_name(self, path, name):
parts = list(self.parts(path))
parts[-1] = name
return self.join(*parts)

def with_suffix(self, path, suffix):
parts = list(self.parts(path))
real_path, _, _ = parts[-1].partition(".")
parts[-1] = real_path + suffix
return self.join(*parts)

def isin(self, left, right):
left_parts = self.parts(left)
right_parts = self.parts(right)
left_len = len(left_parts)
right_len = len(right_parts)
return left_len > right_len and left_parts[:right_len] == right_parts

def isin_or_eq(self, left, right):
return left == right or self.isin(left, right)

def overlaps(self, left, right):
# pylint: disable=arguments-out-of-order
return self.isin_or_eq(left, right) or self.isin(right, left)

def relpath(self, path, start=None):
if start is None:
start = self.getcwd()
return self.flavour.relpath(path, start=start)

def relparts(self, path, base):
return self.parts(self.relpath(path, base))

def as_posix(self, path):
return path.replace(self.flavour.sep, posixpath.sep)


def bytesio_len(obj: "BytesIO") -> Optional[int]:
try:
offset = obj.tell()
Expand All @@ -32,8 +150,8 @@ def bytesio_len(obj: "BytesIO") -> Optional[int]:

class GitFileSystem(AbstractFileSystem):
# pylint: disable=abstract-method
sep = os.sep
cachable = False
root_marker = "/"

def __init__(
self,
Expand All @@ -54,22 +172,22 @@ def __init__(
resolved = resolver(scm, rev or "HEAD")
tree_obj = scm.pygit2.get_tree_obj(rev=resolved)
trie = GitTrie(tree_obj, resolved)
path = scm.root_dir
else:
assert path

self.trie = trie
self.root_dir = path
self.rev = self.trie.rev

def _get_key(self, path: str) -> Tuple[str, ...]:
from scmrepo.utils import relpath
def _getcwd():
return self.root_marker

self.path = Path(self.sep, getcwd=_getcwd)

if os.path.isabs(path):
path = relpath(path, self.root_dir)
relparts = path.split(os.sep)
if relparts == ["."]:
def _get_key(self, path: str) -> Tuple[str, ...]:
path = self.path.abspath(path)
if path == self.root_marker:
return ()
relparts = path.split(self.sep)
if relparts and relparts[0] in (".", ""):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better to normalize the path beforehand, but our fs.path is not yet available in fsspec, so we'll get to it later.

relparts = relparts[1:]
return tuple(relparts)

def _open(
Expand Down Expand Up @@ -102,7 +220,7 @@ def info(self, path: str, **kwargs: Any) -> Dict[str, Any]:
try:
return {
**self.trie.info(key),
"name": os.path.join(self.root_dir, self.sep.join(key)),
"name": path,
}
except KeyError:
raise FileNotFoundError(
Expand All @@ -116,51 +234,22 @@ def exists(self, path: str, **kwargs: Any) -> bool:
def checksum(self, path: str) -> str:
return self.info(path)["sha"]

def walk( # pylint: disable=arguments-differ
self,
top: str,
topdown: bool = True,
onerror: Callable[[OSError], None] = None,
maxdepth: int = None,
detail: bool = False,
**kwargs: Any,
):
"""Directory tree generator.

See `os.walk` for the docs. Differences:
- no support for symlinks
"""
assert maxdepth is None # not supported yet.
if not self.isdir(top):
if onerror:
if self.exists(top):
exc: OSError = NotADirectoryError(
errno.ENOTDIR, os.strerror(errno.ENOTDIR), top
)
else:
exc = FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), top
)
onerror(exc)
return []

key = self._get_key(top)
for prefix, dirs, files in self.trie.walk(key, topdown=topdown):
root = self.root_dir

if prefix:
root = os.path.join(root, os.sep.join(prefix))
if detail:
yield (
root,
{d: self.info(os.path.join(root, d)) for d in dirs},
{f: self.info(os.path.join(root, f)) for f in files},
)
else:
yield root, dirs, files

def ls(self, path, detail=True, **kwargs):
for _, dirs, files in self.walk(path, detail=detail, **kwargs):
if detail:
return list(chain(dirs.values(), files.values()))
return dirs + files
info = self.info(path)
if info["type"] != "directory":
return [info] if detail else [path]

key = self._get_key(path)
try:
names = self.trie.ls(key)
except KeyError as exc:
raise FileNotFoundError from exc

paths = [
posixpath.join(path, name) if path else name for name in names
]

if not detail:
return paths

return [self.info(_path) for _path in paths]
5 changes: 4 additions & 1 deletion scmrepo/git/backend/pygit2.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ def mode(self):

@property
def size(self) -> int:
return len(self.obj.read_raw())
try:
return len(self.obj.read_raw())
except KeyError:
return 0

@property
def sha(self) -> str:
Expand Down
28 changes: 19 additions & 9 deletions scmrepo/git/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,20 +101,30 @@ def isfile(self, key: tuple) -> bool:

return obj.isfile

def walk(self, top: tuple, topdown: Optional[bool] = True):
dirs = []
nondirs = []
def ls(self, key: tuple):
ret = []

def node_factory(_, path, children, obj):
if path == top:
def node_factory(_, _key, children, obj):
if key == _key:
assert obj.isdir
list(filter(None, children))
elif obj.isdir:
dirs.append(obj.name)
else:
nondirs.append(obj.name)
ret.append(_key[-1])

self.trie.traverse(node_factory, prefix=key)

return ret

self.trie.traverse(node_factory, prefix=top)
def walk(self, top: tuple, topdown: Optional[bool] = True):
dirs = []
nondirs = []

for name in self.ls(top):
info = self.info(top + (name,))
if info["type"] == "directory":
dirs.append(name)
else:
nondirs.append(name)

if topdown:
yield top, dirs, nondirs
Expand Down
Loading