Skip to content

Commit

Permalink
fsspec: create directories on upload/upload_fobj
Browse files Browse the repository at this point in the history
  • Loading branch information
isidentical committed Jul 13, 2021
1 parent 58f0911 commit 9d7ba06
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 26 deletions.
2 changes: 1 addition & 1 deletion dvc/fs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def getsize(self, path_info):
def remove(self, path_info):
raise RemoteActionNotImplemented("remove", self.scheme)

def makedirs(self, path_info):
def makedirs(self, path_info, **kwargs):
"""Optional: Implement only if the remote needs to create
directories before copying/linking/moving data
"""
Expand Down
47 changes: 34 additions & 13 deletions dvc/fs/fsspec_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,11 @@ def __init__(self, **kwargs):
def fs(self):
raise NotImplementedError

@lru_cache(512)
def _with_bucket(self, path):
if isinstance(path, self.PATH_CLS):
return f"{path.bucket}/{path.path}"
return path
return str(path)

def _strip_bucket(self, entry):
try:
bucket, path = entry.split("/", 1)
except ValueError:
# If there is no path attached, only returns
# the bucket (top-level).
bucket, path = entry, None

return path or bucket
return entry

def _strip_buckets(self, entries, detail=False):
for entry in entries:
Expand Down Expand Up @@ -86,6 +76,7 @@ def open(
return self.fs.open(self._with_bucket(path_info), mode=mode)

def copy(self, from_info, to_info):
self.makedirs(to_info.parent)
self.fs.copy(self._with_bucket(from_info), self._with_bucket(to_info))

def exists(self, path_info) -> bool:
Expand Down Expand Up @@ -118,18 +109,25 @@ def info(self, path_info):
info["name"] = self._strip_bucket(info["name"])
return info

def makedirs(self, path_info, **kwargs):
self.fs.makedirs(
self._with_bucket(path_info), exist_ok=kwargs.pop("exist_ok", True)
)

def _upload_fobj(self, fobj, to_info, size=None):
self.makedirs(to_info.parent)
with self.open(to_info, "wb") as fdest:
shutil.copyfileobj(fobj, fdest, length=fdest.blocksize)

def _upload(
self, from_file, to_info, name=None, no_progress_bar=False, **kwargs
):
self.makedirs(to_info.parent)
size = os.path.getsize(from_file)
with open(from_file, "rb") as fobj:
self.upload_fobj(
fobj,
self._with_bucket(to_info),
to_info,
size=size,
desc=name,
no_progress_bar=no_progress_bar,
Expand Down Expand Up @@ -158,6 +156,29 @@ def _download(
class ObjectFSWrapper(FSSpecWrapper):
TRAVERSE_PREFIX_LEN = 3

@lru_cache(512)
def _with_bucket(self, path):
if isinstance(path, self.PATH_CLS):
return f"{path.bucket}/{path.path}"
return path

def _strip_bucket(self, entry):
try:
bucket, path = entry.split("/", 1)
except ValueError:
# If there is no path attached, only returns
# the bucket (top-level).
bucket, path = entry, None
return path or bucket

def makedirs(self, path_info, **kwargs):
# For object storages make this method a no-op. The original
# fs.makedirs() method will only check if the bucket exists
# and create if it doesn't though we don't want to support
# that behavior, and the check will cost some time so we'll
# simply ignore all mkdir()/makedirs() calls.
return None

def _isdir(self, path_info):
# Directory in object storages are interpreted differently
# among different fsspec providers, so this logic is a temporary
Expand Down
2 changes: 1 addition & 1 deletion dvc/fs/hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def remove(self, path_info):
else:
hdfs.delete_file(path_info.path)

def makedirs(self, path_info):
def makedirs(self, path_info, **kwargs):
with self.hdfs(path_info) as hdfs:
# NOTE: fs.create_dir creates parents by default
hdfs.create_dir(path_info.path)
Expand Down
4 changes: 2 additions & 2 deletions dvc/fs/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ def remove(self, path_info):
raise NotImplementedError
remove(path_info)

def makedirs(self, path_info):
makedirs(path_info, exist_ok=True)
def makedirs(self, path_info, **kwargs):
makedirs(path_info, exist_ok=kwargs.pop("exist_ok", True))

def isexec(self, path_info):
mode = self.stat(path_info).st_mode
Expand Down
2 changes: 1 addition & 1 deletion dvc/fs/ssh/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def remove(self, path_info):
with self.ssh() as ssh:
ssh.remove(path_info.path)

def makedirs(self, path_info):
def makedirs(self, path_info, **kwargs):
with self.ssh() as ssh:
ssh.makedirs(path_info.path)

Expand Down
7 changes: 0 additions & 7 deletions dvc/fs/webdav.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ def _upload_fobj(self, fobj, to_info, size: int = None):
fobj, rpath, overwrite=True, size=size
)

def makedirs(self, path_info):
path = self.translate_path_info(path_info)
return self.fs.makedirs(path, exist_ok=True)

@lru_cache(512)
def translate_path_info(self, path):
if isinstance(path, self.PATH_CLS):
Expand All @@ -93,9 +89,6 @@ def translate_path_info(self, path):

_with_bucket = translate_path_info

def _strip_bucket(self, entry):
return entry


class WebDAVSFileSystem(WebDAVFileSystem): # pylint:disable=abstract-method
scheme = Schemes.WEBDAVS
28 changes: 27 additions & 1 deletion tests/func/test_fs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import io
import os
from operator import itemgetter
from os.path import join
Expand Down Expand Up @@ -324,7 +325,7 @@ def test_fs_ls(dvc, cloud):
pytest.lazy_fixture("gdrive"),
],
)
def test_fs_find_recursive(dvc, cloud):
def test_fs_find(dvc, cloud):
cloud.gen({"data": {"foo": "foo", "bar": {"baz": "baz"}, "quux": "quux"}})
cls, config, path_info = get_cloud_fs(dvc, **cloud.config)
fs = cls(**config)
Expand Down Expand Up @@ -382,3 +383,28 @@ def test_fs_fsspec_path_management(dvc, cloud):
data_details = fs.info(data)
assert data_details["name"].rstrip("/") == data.path
assert data_details["type"] == "directory"


@pytest.mark.needs_internet
@pytest.mark.parametrize(
"cloud",
[
pytest.lazy_fixture("azure"),
pytest.lazy_fixture("gs"),
pytest.lazy_fixture("s3"),
pytest.lazy_fixture("webdav"),
],
)
def test_fs_makedirs_on_upload_and_copy(dvc, cloud):
cls, config, _ = get_cloud_fs(dvc, **cloud.config)
fs = cls(**config)

with io.BytesIO(b"foo") as stream:
fs.upload_fobj(stream, cloud / "dir" / "foo")

assert fs.isdir(cloud / "dir")
assert fs.exists(cloud / "dir" / "foo")

fs.copy(cloud / "dir" / "foo", cloud / "dir2" / "foo")
assert fs.isdir(cloud / "dir2")
assert fs.exists(cloud / "dir2" / "foo")

0 comments on commit 9d7ba06

Please sign in to comment.