Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions src/dvc_objects/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,13 @@ def _init(self, dname: str) -> None:
self._dirs = set()
with suppress(FileNotFoundError, NotImplementedError):
self._dirs = {
self.fs.path.name(path)
for path in self.fs.ls(self.path, detail=False)
self.fs.name(path) for path in self.fs.ls(self.path, detail=False)
}

if dname in self._dirs:
return

self.makedirs(self.fs.path.join(self.path, dname))
self.makedirs(self.fs.join(self.path, dname))
self._dirs.add(dname)

def exists(self, oid: str) -> bool:
Expand Down Expand Up @@ -198,7 +197,7 @@ def _oid_parts(self, oid: str) -> Tuple[str, str]:
return oid[:2], oid[2:]

def oid_to_path(self, oid) -> str:
return self.fs.path.join(self.path, *self._oid_parts(oid))
return self.fs.join(self.path, *self._oid_parts(oid))

def _list_prefixes(
self,
Expand All @@ -216,12 +215,12 @@ def _list_prefixes(
yield from self.fs.find(paths, batch_size=jobs, prefix=prefix)

def path_to_oid(self, path) -> str:
if self.fs.path.isabs(path):
self_path = self.fs.path.abspath(self.path)
if self.fs.isabs(path):
self_path = self.fs.abspath(self.path)
else:
self_path = self.path
self_parts = self.fs.path.parts(self_path)
parts = self.fs.path.parts(path)[len(self_parts) :]
self_parts = self.fs.parts(self_path)
parts = self.fs.parts(path)[len(self_parts) :]

if not (len(parts) == 2 and parts[0] and len(parts[0]) == 2):
raise ValueError(f"Bad cache file path '{path}'")
Expand Down
160 changes: 144 additions & 16 deletions src/dvc_objects/fs/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio
import datetime
import logging
import ntpath
import os
import posixpath
import shutil
from functools import partial
from multiprocessing import cpu_count
Expand All @@ -11,15 +13,18 @@
Any,
ClassVar,
Dict,
Iterable,
Iterator,
List,
Literal,
Optional,
Sequence,
Tuple,
Union,
cast,
overload,
)
from urllib.parse import urlsplit, urlunsplit

from fsspec.asyn import get_loop

Expand All @@ -40,8 +45,6 @@

from fsspec.spec import AbstractFileSystem

from .path import Path


logger = logging.getLogger(__name__)

Expand All @@ -68,6 +71,7 @@ def __init__(self, link: str, fs: "FileSystem", path: str) -> None:
class FileSystem:
sep = "/"

flavour = posixpath
protocol = "base"
REQUIRES: ClassVar[Dict[str, str]] = {}
_JOBS = 4 * cpu_count()
Expand Down Expand Up @@ -104,14 +108,138 @@ def config(self) -> Dict[str, Any]:
def root_marker(self) -> str:
return self.fs.root_marker

@cached_property
def path(self) -> "Path":
from .path import Path
def getcwd(self) -> str:
return ""

def chdir(self, path: str):
raise NotImplementedError

@classmethod
def join(cls, *parts: str) -> str:
return cls.flavour.join(*parts)

@classmethod
def split(cls, path: str) -> Tuple[str, str]:
return cls.flavour.split(path)

@classmethod
def splitext(cls, path: str) -> Tuple[str, str]:
return cls.flavour.splitext(path)

def normpath(self, path: str) -> str:
if self.flavour == ntpath:
return self.flavour.normpath(path)

parts = list(urlsplit(path))
parts[2] = self.flavour.normpath(parts[2])
return urlunsplit(parts)

@classmethod
def isabs(cls, path: str) -> bool:
return cls.flavour.isabs(path)

def abspath(self, path: str) -> str:
if not self.isabs(path):
path = self.join(self.getcwd(), path)
return self.normpath(path)

@classmethod
def commonprefix(cls, paths: Sequence[str]) -> str:
return cls.flavour.commonprefix(paths)

@classmethod
def commonpath(cls, paths: Iterable[str]) -> str:
return cls.flavour.commonpath(list(paths))

@classmethod
def parts(cls, path: str) -> Tuple[str, ...]:
drive, path = cls.flavour.splitdrive(path.rstrip(cls.flavour.sep))

ret = []
while True:
path, part = cls.flavour.split(path)

if part:
ret.append(part)
continue

if path:
ret.append(path)

break

ret.reverse()

if drive:
ret = [drive, *ret]

def _getcwd():
return self.fs.root_marker
return tuple(ret)

return Path(self.sep, getcwd=_getcwd)
@classmethod
def parent(cls, path: str) -> str:
return cls.flavour.dirname(path)

@classmethod
def dirname(cls, path: str) -> str:
return cls.parent(path)

@classmethod
def parents(cls, path: str) -> Iterator[str]:
while True:
parent = cls.flavour.dirname(path)
if parent == path:
break
yield parent
path = parent

@classmethod
def name(cls, path: str) -> str:
return cls.flavour.basename(path)

@classmethod
def suffix(cls, path: str) -> str:
name = cls.name(path)
_, dot, suffix = name.partition(".")
return dot + suffix

@classmethod
def with_name(cls, path: str, name: str) -> str:
return cls.join(cls.parent(path), name)

@classmethod
def with_suffix(cls, path: str, suffix: str) -> str:
return cls.splitext(path)[0] + suffix

@classmethod
def isin(cls, left: str, right: str) -> bool:
if left == right:
return False
try:
common = cls.commonpath([left, right])
except ValueError:
# Paths don't have the same drive
return False
return common == right

@classmethod
def isin_or_eq(cls, left: str, right: str) -> bool:
return left == right or cls.isin(left, right)

@classmethod
def overlaps(cls, left: str, right: str) -> bool:
return cls.isin_or_eq(left, right) or cls.isin(right, left)

def relpath(self, path: str, start: Optional[str] = None) -> str:
if start is None:
start = "."
return self.flavour.relpath(self.abspath(path), start=self.abspath(start))

def relparts(self, path: str, start: Optional[str] = None) -> Tuple[str, ...]:
return self.parts(self.relpath(path, start=start))

@classmethod
def as_posix(cls, path: str) -> str:
return path.replace(cls.flavour.sep, posixpath.sep)

@classmethod
def _strip_protocol(cls, path: str) -> str:
Expand Down Expand Up @@ -299,7 +427,7 @@ def checksum(self, path: AnyFSPath) -> str:
return self.fs.checksum(path)

def copy(self, from_info: AnyFSPath, to_info: AnyFSPath) -> None:
self.makedirs(self.path.parent(to_info))
self.makedirs(self.parent(to_info))
self.fs.copy(from_info, to_info)

def cp_file(self, from_info: AnyFSPath, to_info: AnyFSPath, **kwargs: Any) -> None:
Expand Down Expand Up @@ -515,7 +643,7 @@ def put_file(
else:
assert isinstance(from_file, str)
self.fs.put_file(os.fspath(from_file), to_info, callback=callback, **kwargs)
self.fs.invalidate_cache(self.path.parent(to_info))
self.fs.invalidate_cache(self.parent(to_info))

def get_file(
self,
Expand All @@ -527,7 +655,7 @@ def get_file(
self.fs.get_file(from_info, to_info, callback=callback, **kwargs)

def upload_fobj(self, fobj: IO, to_info: AnyFSPath, **kwargs) -> None:
self.makedirs(self.path.parent(to_info))
self.makedirs(self.parent(to_info))
with self.open(to_info, "wb") as fdest:
shutil.copyfileobj(
fobj,
Expand Down Expand Up @@ -597,7 +725,7 @@ def get(
from .local import localfs

def get_file(rpath, lpath, **kwargs):
localfs.makedirs(localfs.path.parent(lpath), exist_ok=True)
localfs.makedirs(localfs.parent(lpath), exist_ok=True)
self.fs.get_file(rpath, lpath, **kwargs)

get_file = wrap_and_branch_callback(callback, get_file)
Expand All @@ -618,7 +746,7 @@ def get_file(rpath, lpath, **kwargs):
return localfs.makedirs(to_info, exist_ok=True)

to_infos = [
localfs.path.join(to_info, *self.path.relparts(info, from_info))
localfs.join(to_info, *self.relparts(info, from_info))
for info in from_infos
]

Expand Down Expand Up @@ -679,9 +807,9 @@ def find(

def _make_args(paths: List[AnyFSPath]) -> Iterator[Tuple[str, str]]:
for path in paths:
if prefix and not path.endswith(self.path.flavour.sep):
parent = self.path.parent(path)
yield parent, self.path.parts(path)[-1]
if prefix and not path.endswith(self.flavour.sep):
parent = self.parent(path)
yield parent, self.parts(path)[-1]
else:
yield path, ""

Expand Down
10 changes: 5 additions & 5 deletions src/dvc_objects/fs/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,16 +394,16 @@ def test_links(
) -> List["AnyFSPath"]:
from .utils import tmp_fname

from_file = from_fs.path.join(from_path, tmp_fname())
to_file = to_fs.path.join(
to_fs.path.parent(to_path),
from_file = from_fs.join(from_path, tmp_fname())
to_file = to_fs.join(
to_fs.parent(to_path),
tmp_fname(),
)

from_fs.makedirs(from_fs.path.parent(from_file))
from_fs.makedirs(from_fs.parent(from_file))
with from_fs.open(from_file, "wb") as fobj:
fobj.write(b"test")
to_fs.makedirs(to_fs.path.parent(to_file))
to_fs.makedirs(to_fs.parent(to_file))

ret = []
try:
Expand Down
18 changes: 10 additions & 8 deletions src/dvc_objects/fs/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ def modified(self, path):
class LocalFileSystem(FileSystem):
sep = os.sep

flavour = os.path
protocol = "local"
PARAM_CHECKSUM = "md5"
PARAM_PATH = "path"
Expand All @@ -189,17 +190,18 @@ class LocalFileSystem(FileSystem):
def fs(self):
return FsspecLocalFileSystem(**self.config)

@cached_property
def path(self):
from .path import LocalFileSystemPath
def getcwd(self):
return os.getcwd()

def normpath(self, path: str) -> str:
return self.flavour.normpath(path)

return LocalFileSystemPath(
self.sep, getcwd=os.getcwd, realpath=os.path.realpath
)
def realpath(self, path: str) -> str:
return self.flavour.realpath(path)

def upload_fobj(self, fobj, to_info, **kwargs):
self.makedirs(self.path.parent(to_info))
tmp_info = self.path.join(self.path.parent(to_info), tmp_fname(""))
self.makedirs(self.parent(to_info))
tmp_info = self.join(self.parent(to_info), tmp_fname(""))
try:
with open(tmp_info, "wb+") as fdest:
shutil.copyfileobj(fobj, fdest)
Expand Down
Loading