Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

odb: use regular upload() when the source filesystem is local #6365

Merged
merged 2 commits into from
Jul 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 36 additions & 27 deletions dvc/objects/db/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from copy import copy
from typing import TYPE_CHECKING, Optional

from dvc.fs.local import LocalFileSystem
from dvc.objects.errors import ObjectDBPermissionError, ObjectFormatError
from dvc.objects.file import HashFile
from dvc.progress import Tqdm
Expand Down Expand Up @@ -75,6 +76,40 @@ def get(self, hash_info: "HashInfo"):
hash_info,
)

def _add_file(self, from_fs, from_info, to_info, move):
self.makedirs(to_info.parent)
use_move = isinstance(from_fs, type(self.fs)) and move
try:
if use_move:
self.fs.move(from_info, to_info)
elif isinstance(from_fs, LocalFileSystem):
if not isinstance(from_info, from_fs.PATH_CLS):
from_info = from_fs.PATH_CLS(from_info)
self.fs.upload(from_info, to_info)
isidentical marked this conversation as resolved.
Show resolved Hide resolved
elif isinstance(self.fs, LocalFileSystem):
from_fs.download_file(from_info, to_info)
Comment on lines +88 to +90
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, should both of these use _file methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@isidentical I see that we use fs.upload but fs.download_file. Wondering if they both should be upload/download or upload_file/download_file (likely the latter if we take fsspec's get/put_file future in the context, but maybe I'm missing something important here)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have upload_file(), so the upload() is put_file. And download_file() is the get_file(). The reason that I didn't use download() is that, it has an extra check against whether the source path is a directory, which is an waste of API call for this case where we know we are adding a single entity (file).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thank you! I guess we'll switch to get_file/put_file when reducing fsspec_wrapper in the future.

else:
with from_fs.open(from_info, mode="rb") as fobj:
self.fs.upload_fobj(fobj, to_info)
except OSError as exc:
# If the target file already exists, we are going to simply
# ignore the exception (#4992).
#
# On Windows, it is not always guaranteed that you'll get
# FileExistsError (it might be PermissionError or a bare OSError)
# but all of those exceptions raised from the original
# FileExistsError so we have a separate check for that.
if isinstance(exc, FileExistsError) or (
os.name == "nt"
and exc.__context__
and isinstance(exc.__context__, FileExistsError)
):
logger.debug("'%s' file already exists, skipping", to_info)
if use_move:
from_fs.remove(from_info)
else:
raise

def add(
self,
path_info: "AnyPath",
Expand All @@ -97,33 +132,7 @@ def add(

cache_info = self.hash_to_path_info(hash_info.value)
# using our makedirs to create dirs with proper permissions
self.makedirs(cache_info.parent)
use_move = isinstance(fs, type(self.fs)) and move
try:
if use_move:
self.fs.move(path_info, cache_info)
else:
with fs.open(path_info, mode="rb") as fobj:
self.fs.upload_fobj(fobj, cache_info)
except OSError as exc:
# If the target file already exists, we are going to simply
# ignore the exception (#4992).
#
# On Windows, it is not always guaranteed that you'll get
# FileExistsError (it might be PermissionError or a bare OSError)
# but all of those exceptions raised from the original
# FileExistsError so we have a separate check for that.
if isinstance(exc, FileExistsError) or (
os.name == "nt"
and exc.__context__
and isinstance(exc.__context__, FileExistsError)
):
logger.debug("'%s' file already exists, skipping", path_info)
if use_move:
fs.remove(path_info)
else:
raise

self._add_file(fs, path_info, cache_info, move)
try:
if verify:
self.check(hash_info, check_hash=True)
Expand Down
6 changes: 2 additions & 4 deletions tests/func/objects/db/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ def test_clear_on_download_err(tmp_dir, dvc, index, mocker):

assert list(index.hashes())

mocker.patch(
"dvc.fs.local.LocalFileSystem.upload_fobj", side_effect=Exception
)
mocker.patch("dvc.fs.local.LocalFileSystem.upload", side_effect=Exception)
with pytest.raises(DownloadError):
dvc.pull()
assert not list(index.hashes())
Expand All @@ -82,7 +80,7 @@ def unreliable_upload(self, from_file, to_info, name=None, **kwargs):
raise Exception("stop baz")
return original(self, from_file, to_info, name, **kwargs)

mocker.patch.object(LocalFileSystem, "upload_fobj", unreliable_upload)
mocker.patch.object(LocalFileSystem, "upload", unreliable_upload)
with pytest.raises(UploadError):
dvc.push()
assert not list(index.hashes())
2 changes: 1 addition & 1 deletion tests/func/test_external_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_cache_reused(erepo_dir, mocker, local_cloud):
erepo_dir.dvc_gen("file", "text", commit="add file")
erepo_dir.dvc.push()

download_spy = mocker.spy(LocalFileSystem, "upload_fobj")
download_spy = mocker.spy(LocalFileSystem, "upload")

# Use URL to prevent any fishy optimizations
url = f"file://{erepo_dir}"
Expand Down
2 changes: 1 addition & 1 deletion tests/func/test_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ def test_download_error_pulling_imported_stage(tmp_dir, dvc, erepo_dir):
remove(dst_cache)

with patch(
"dvc.fs.local.LocalFileSystem.upload_fobj", side_effect=Exception
"dvc.fs.local.LocalFileSystem.upload", side_effect=Exception
), pytest.raises(DownloadError):
dvc.pull(["foo_imported.dvc"])

Expand Down
10 changes: 5 additions & 5 deletions tests/func/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def test_partial_push_n_pull(tmp_dir, dvc, tmp_path_factory, local_remote):
baz = tmp_dir.dvc_gen({"baz": {"foo": "foo content"}})[0].outs[0]

# Faulty upload version, failing on foo
original = LocalFileSystem.upload_fobj
original = LocalFileSystem.upload
odb = dvc.cloud.get_remote_odb("upstream")

def unreliable_upload(self, fobj, to_info, **kwargs):
Expand All @@ -189,7 +189,7 @@ def unreliable_upload(self, fobj, to_info, **kwargs):
raise Exception("stop foo")
return original(self, fobj, to_info, **kwargs)

with patch.object(LocalFileSystem, "upload_fobj", unreliable_upload):
with patch.object(LocalFileSystem, "upload", unreliable_upload):
with pytest.raises(UploadError) as upload_error_info:
dvc.push()
assert upload_error_info.value.amount == 2
Expand All @@ -203,7 +203,7 @@ def unreliable_upload(self, fobj, to_info, **kwargs):
remove(dvc.odb.local.cache_dir)

baz.collect_used_dir_cache()
with patch.object(LocalFileSystem, "upload_fobj", side_effect=Exception):
with patch.object(LocalFileSystem, "upload", side_effect=Exception):
with pytest.raises(DownloadError) as download_error_info:
dvc.pull()
# error count should be len(.dir + standalone file checksums)
Expand All @@ -218,7 +218,7 @@ def test_raise_on_too_many_open_files(

mocker.patch.object(
LocalFileSystem,
"upload_fobj",
"upload",
side_effect=OSError(errno.EMFILE, "Too many open files"),
)

Expand Down Expand Up @@ -269,7 +269,7 @@ def test_push_order(tmp_dir, dvc, tmp_path_factory, mocker, local_remote):
tmp_dir.dvc_gen({"baz": "baz content"})

mocked_upload = mocker.patch.object(
LocalFileSystem, "upload_fobj", return_value=0
LocalFileSystem, "upload", return_value=0
)
dvc.push()

Expand Down
11 changes: 7 additions & 4 deletions tests/func/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,6 @@ def test_update_import_url_to_remote(tmp_dir, dvc, workspace, local_remote):
@pytest.mark.parametrize(
"workspace",
[
pytest.lazy_fixture("local_cloud"),
pytest.lazy_fixture("s3"),
pytest.param(
pytest.lazy_fixture("gs"), marks=pytest.mark.needs_internet
Expand Down Expand Up @@ -378,11 +377,15 @@ def test_update_import_url_to_remote_directory(
}
)

upload_file_mock = mocker.spy(type(dvc.odb.local.fs), "upload_fobj")
download_file_mock = mocker.spy(
type(dvc.cloud.get_remote_odb("cache").fs), "download_file"
)
stage = dvc.update(stage.path, to_remote=True)

# 2 new hashes (foo2, baz2) + 1 .dir hash
assert upload_file_mock.mock.call_count == 3
# 2 new hashes (foo2, baz2)
assert download_file_mock.mock.call_count == 2
# The new .dir hash will be transferred through MemFs, so
# we will not account the download_file() for it.

dvc.pull("data")
assert (tmp_dir / "data").read_text() == {
Expand Down