Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fsspec: create directories on upload/upload_fobj #6309

Merged
merged 1 commit into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion dvc/fs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def getsize(self, path_info):
def remove(self, path_info):
raise RemoteActionNotImplemented("remove", self.scheme)

def makedirs(self, path_info):
def makedirs(self, path_info, **kwargs):
"""Optional: Implement only if the remote needs to create
directories before copying/linking/moving data
"""
Expand Down
47 changes: 34 additions & 13 deletions dvc/fs/fsspec_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,11 @@ def __init__(self, **kwargs):
def fs(self):
raise NotImplementedError

@lru_cache(512)
def _with_bucket(self, path):
if isinstance(path, self.PATH_CLS):
return f"{path.bucket}/{path.path}"
return path
return str(path)

def _strip_bucket(self, entry):
try:
bucket, path = entry.split("/", 1)
except ValueError:
# If there is no path attached, only returns
# the bucket (top-level).
bucket, path = entry, None

return path or bucket
return entry

def _strip_buckets(self, entries, detail=False):
for entry in entries:
Expand Down Expand Up @@ -86,6 +76,7 @@ def open(
return self.fs.open(self._with_bucket(path_info), mode=mode)

def copy(self, from_info, to_info):
self.makedirs(to_info.parent)
self.fs.copy(self._with_bucket(from_info), self._with_bucket(to_info))

def exists(self, path_info) -> bool:
Expand Down Expand Up @@ -118,18 +109,25 @@ def info(self, path_info):
info["name"] = self._strip_bucket(info["name"])
return info

def makedirs(self, path_info, **kwargs):
self.fs.makedirs(
self._with_bucket(path_info), exist_ok=kwargs.pop("exist_ok", True)
)

def _upload_fobj(self, fobj, to_info, size=None):
self.makedirs(to_info.parent)
with self.open(to_info, "wb") as fdest:
shutil.copyfileobj(fobj, fdest, length=fdest.blocksize)

def _upload(
self, from_file, to_info, name=None, no_progress_bar=False, **kwargs
):
self.makedirs(to_info.parent)
size = os.path.getsize(from_file)
with open(from_file, "rb") as fobj:
self.upload_fobj(
fobj,
self._with_bucket(to_info),
to_info,
size=size,
desc=name,
no_progress_bar=no_progress_bar,
Expand Down Expand Up @@ -158,6 +156,29 @@ def _download(
class ObjectFSWrapper(FSSpecWrapper):
TRAVERSE_PREFIX_LEN = 3

@lru_cache(512)
def _with_bucket(self, path):
if isinstance(path, self.PATH_CLS):
return f"{path.bucket}/{path.path}"
return path

def _strip_bucket(self, entry):
try:
bucket, path = entry.split("/", 1)
except ValueError:
# If there is no path attached, only returns
# the bucket (top-level).
bucket, path = entry, None
return path or bucket

def makedirs(self, path_info, **kwargs):
# For object storages make this method a no-op. The original
# fs.makedirs() method will only check if the bucket exists
# and create if it doesn't though we don't want to support
# that behavior, and the check will cost some time so we'll
# simply ignore all mkdir()/makedirs() calls.
return None

def _isdir(self, path_info):
# Directory in object storages are interpreted differently
# among different fsspec providers, so this logic is a temporary
Expand Down
2 changes: 1 addition & 1 deletion dvc/fs/hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def remove(self, path_info):
else:
hdfs.delete_file(path_info.path)

def makedirs(self, path_info):
def makedirs(self, path_info, **kwargs):
with self.hdfs(path_info) as hdfs:
# NOTE: fs.create_dir creates parents by default
hdfs.create_dir(path_info.path)
Expand Down
4 changes: 2 additions & 2 deletions dvc/fs/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ def remove(self, path_info):
raise NotImplementedError
remove(path_info)

def makedirs(self, path_info):
makedirs(path_info, exist_ok=True)
def makedirs(self, path_info, **kwargs):
makedirs(path_info, exist_ok=kwargs.pop("exist_ok", True))

def isexec(self, path_info):
mode = self.stat(path_info).st_mode
Expand Down
2 changes: 1 addition & 1 deletion dvc/fs/ssh/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def remove(self, path_info):
with self.ssh() as ssh:
ssh.remove(path_info.path)

def makedirs(self, path_info):
def makedirs(self, path_info, **kwargs):
with self.ssh() as ssh:
ssh.makedirs(path_info.path)

Expand Down
7 changes: 0 additions & 7 deletions dvc/fs/webdav.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ def _upload_fobj(self, fobj, to_info, size: int = None):
fobj, rpath, overwrite=True, size=size
)

def makedirs(self, path_info):
path = self.translate_path_info(path_info)
return self.fs.makedirs(path, exist_ok=True)

@lru_cache(512)
def translate_path_info(self, path):
if isinstance(path, self.PATH_CLS):
Expand All @@ -93,9 +89,6 @@ def translate_path_info(self, path):

_with_bucket = translate_path_info

def _strip_bucket(self, entry):
return entry


class WebDAVSFileSystem(WebDAVFileSystem): # pylint:disable=abstract-method
scheme = Schemes.WEBDAVS
28 changes: 27 additions & 1 deletion tests/func/test_fs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import io
import os
from operator import itemgetter
from os.path import join
Expand Down Expand Up @@ -324,7 +325,7 @@ def test_fs_ls(dvc, cloud):
pytest.lazy_fixture("gdrive"),
],
)
def test_fs_find_recursive(dvc, cloud):
def test_fs_find(dvc, cloud):
cloud.gen({"data": {"foo": "foo", "bar": {"baz": "baz"}, "quux": "quux"}})
cls, config, path_info = get_cloud_fs(dvc, **cloud.config)
fs = cls(**config)
Expand Down Expand Up @@ -382,3 +383,28 @@ def test_fs_fsspec_path_management(dvc, cloud):
data_details = fs.info(data)
assert data_details["name"].rstrip("/") == data.path
assert data_details["type"] == "directory"


@pytest.mark.needs_internet
@pytest.mark.parametrize(
"cloud",
[
pytest.lazy_fixture("azure"),
pytest.lazy_fixture("gs"),
pytest.lazy_fixture("s3"),
pytest.lazy_fixture("webdav"),
],
)
def test_fs_makedirs_on_upload_and_copy(dvc, cloud):
cls, config, _ = get_cloud_fs(dvc, **cloud.config)
fs = cls(**config)

with io.BytesIO(b"foo") as stream:
fs.upload_fobj(stream, cloud / "dir" / "foo")

assert fs.isdir(cloud / "dir")
assert fs.exists(cloud / "dir" / "foo")

fs.copy(cloud / "dir" / "foo", cloud / "dir2" / "foo")
assert fs.isdir(cloud / "dir2")
assert fs.exists(cloud / "dir2" / "foo")