Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

objects: migrate remote push/pull to objects.transfer #6308

Merged
merged 22 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions dvc/checkout.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
)
from dvc.ignore import DvcIgnoreFilter
from dvc.objects import check, load
from dvc.objects.errors import ObjectFormatError
from dvc.objects.stage import stage
from dvc.remote.slow_link_detection import ( # type: ignore[attr-defined]
from dvc.objects.db.slow_link_detection import ( # type: ignore[attr-defined]
slow_link_guard,
)
from dvc.objects.errors import ObjectFormatError
from dvc.objects.stage import stage
from dvc.types import Optional

logger = logging.getLogger(__name__)
Expand Down
73 changes: 44 additions & 29 deletions dvc/data_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import logging
from typing import TYPE_CHECKING, Iterable, Optional

from dvc.objects.db import get_index

if TYPE_CHECKING:
from dvc.objects.db.base import ObjectDB
from dvc.objects.file import HashFile
from dvc.remote.base import Remote

logger = logging.getLogger(__name__)

Expand All @@ -24,18 +26,18 @@ class DataCloud:
def __init__(self, repo):
self.repo = repo

def get_remote(
def get_remote_odb(
self,
name: Optional[str] = None,
command: str = "<command>",
) -> "Remote":
) -> "ObjectDB":
from dvc.config import NoRemoteError

if not name:
name = self.repo.config["core"].get("remote")

if name:
return self._init_remote(name)
return self._init_odb(name)

if bool(self.repo.config["remote"]):
error_msg = (
Expand All @@ -52,69 +54,78 @@ def get_remote(

raise NoRemoteError(error_msg)

def _init_remote(self, name):
from dvc.remote import get_remote
def _init_odb(self, name):
from dvc.fs import get_cloud_fs
from dvc.objects.db import get_odb

return get_remote(self.repo, name=name)
cls, config, path_info = get_cloud_fs(self.repo, name=name)
config["tmp_dir"] = self.repo.tmp_dir
return get_odb(cls(**config), path_info, **config)

def push(
self,
objs: Iterable["HashFile"],
jobs: Optional[int] = None,
remote: Optional[str] = None,
show_checksums: bool = False,
odb: Optional["ObjectDB"] = None,
):
"""Push data items in a cloud-agnostic way.

Args:
objs: objects to push to the cloud.
jobs: number of jobs that can be running simultaneously.
remote: optional remote to push to.
remote: optional name of remote to push to.
By default remote from core.remote config option is used.
show_checksums: show checksums instead of file names in
information messages.
odb: optional ODB to push to. Overrides remote.
"""
remote_obj = self.get_remote(remote, "push")
from dvc.objects.transfer import transfer

return remote_obj.push(
if not odb:
odb = self.get_remote_odb(remote, "push")
return transfer(
self.repo.odb.local,
odb,
objs,
jobs=jobs,
show_checksums=show_checksums,
dest_index=get_index(odb),
cache_odb=self.repo.odb.local,
)

def pull(
self,
objs: Iterable["HashFile"],
jobs: Optional[int] = None,
remote: Optional[str] = None,
show_checksums: bool = False,
odb: Optional["ObjectDB"] = None,
):
"""Pull data items in a cloud-agnostic way.

Args:
objs: objects to pull from the cloud.
jobs: number of jobs that can be running simultaneously.
remote: optional remote to pull from.
remote: optional name of remote to pull from.
By default remote from core.remote config option is used.
show_checksums: show checksums instead of file names in
information messages.
odb: optional ODB to pull from. Overrides remote.
"""
remote_obj = self.get_remote(remote, "pull")
from dvc.objects.transfer import transfer

return remote_obj.pull(
if not odb:
odb = self.get_remote_odb(remote, "pull")
return transfer(
odb,
self.repo.odb.local,
objs,
jobs=jobs,
show_checksums=show_checksums,
src_index=get_index(odb),
cache_odb=self.repo.odb.local,
)

def status(
self,
objs: Iterable["HashFile"],
jobs: Optional[int] = None,
remote: Optional[str] = None,
show_checksums: bool = False,
odb: Optional["ObjectDB"] = None,
log_missing: bool = True,
):
"""Check status of data items in a cloud-agnostic way.
Expand All @@ -125,20 +136,24 @@ def status(
remote: optional remote to compare
cache to. By default remote from core.remote config option
is used.
show_checksums: show checksums instead of file names in
information messages.
odb: optional ODB to check status from. Overrides remote.
log_missing: log warning messages if file doesn't exist
neither in cache, neither in cloud.
"""
remote_obj = self.get_remote(remote, "status")
return remote_obj.status(
from dvc.objects.status import compare_status

if not odb:
odb = self.get_remote_odb(remote, "status")
return compare_status(
self.repo.odb.local,
odb,
objs,
jobs=jobs,
show_checksums=show_checksums,
log_missing=log_missing,
dest_index=get_index(odb),
cache_odb=self.repo.odb.local,
)

def get_url_for(self, remote, checksum):
remote_obj = self.get_remote(remote)
return str(remote_obj.odb.hash_to_path_info(checksum))
remote_odb = self.get_remote_odb(remote)
return str(remote_odb.hash_to_path_info(checksum))
11 changes: 2 additions & 9 deletions dvc/dependency/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,9 @@ def dumpd(self):

def download(self, to, jobs=None):
from dvc.checkout import checkout
from dvc.fs.memory import MemoryFileSystem
from dvc.objects import save
from dvc.repo.fetch import fetch_from_odb

for odb, objs in self.get_used_objs().items():
if isinstance(odb.fs, MemoryFileSystem):
for obj in objs:
save(self.repo.odb.local, obj, jobs=jobs)
else:
fetch_from_odb(self.repo, odb, objs, jobs=jobs)
self.repo.cloud.pull(objs, jobs=jobs, odb=odb)

obj = self.get_obj()
checkout(
Expand Down Expand Up @@ -129,7 +122,7 @@ def _get_used_and_obj(
recursive=True,
).items():
if odb is None:
odb = repo.cloud.get_remote().odb
odb = repo.cloud.get_remote_odb()
odb.read_only = True
self._check_circular_import(objs)
used_objs[odb].update(objs)
Expand Down
15 changes: 9 additions & 6 deletions dvc/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,18 +228,21 @@ def __init__(self, hook_name):
)


class DownloadError(DvcException):
class FileTransferError(DvcException):
_METHOD = "transfer"

def __init__(self, amount):
self.amount = amount

super().__init__(f"{amount} files failed to download")
super().__init__(f"{amount} files failed to {self._METHOD}")


class UploadError(DvcException):
def __init__(self, amount):
self.amount = amount
class DownloadError(FileTransferError):
_METHOD = "download"


super().__init__(f"{amount} files failed to upload")
class UploadError(FileTransferError):
_METHOD = "upload"


class CheckoutError(DvcException):
Expand Down
6 changes: 3 additions & 3 deletions dvc/fs/dvc.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ def open( # type: ignore
from dvc.config import NoRemoteError

try:
remote_obj = self.repo.cloud.get_remote(remote)
remote_odb = self.repo.cloud.get_remote_odb(remote)
except NoRemoteError as exc:
raise FileNotFoundError from exc
if out.is_dir_checksum:
checksum = self._get_granular_hash(path, out).value
else:
checksum = out.hash_info.value
remote_info = remote_obj.odb.hash_to_path_info(checksum)
return remote_obj.fs.open(
remote_info = remote_odb.hash_to_path_info(checksum)
return remote_odb.fs.open(
remote_info, mode=mode, encoding=encoding
)

Expand Down
18 changes: 18 additions & 0 deletions dvc/objects/db/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from typing import TYPE_CHECKING

from dvc.scheme import Schemes

if TYPE_CHECKING:
from .index import ObjectDBIndexBase


def get_odb(fs, path_info, **config):
from .base import ObjectDB
Expand Down Expand Up @@ -34,6 +39,19 @@ def _get_odb(repo, settings):
return get_odb(cls(**config), path_info, state=repo.state, **config)


def get_index(odb) -> "ObjectDBIndexBase":
import hashlib

from .index import ObjectDBIndex, ObjectDBIndexNoop

cls = ObjectDBIndex if odb.tmp_dir else ObjectDBIndexNoop
return cls(
odb.tmp_dir,
hashlib.sha256(odb.path_info.url.encode("utf-8")).hexdigest(),
odb.fs.CHECKSUM_DIR_SUFFIX,
)


class ODBManager:
CACHE_DIR = "cache"
CLOUD_SCHEMES = [
Expand Down
15 changes: 12 additions & 3 deletions dvc/objects/db/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,16 @@ def add(
fs: "BaseFileSystem",
hash_info: "HashInfo",
move: bool = True,
verify: Optional[bool] = None,
**kwargs,
):
if self.read_only:
raise ObjectDBPermissionError("Cannot add to read-only ODB")

if verify is None:
verify = self.verify
try:
self.check(hash_info, check_hash=self.verify)
self.check(hash_info, check_hash=verify)
return
except (ObjectFormatError, FileNotFoundError):
pass
Comment on lines +89 to 96
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, discussed with @isidentical that for some filesystems like (hdfs and future ssh) upload_fobj down below will no longer be atomic, so we might need to use a temporary path here and then just rename into place. (there is an option to wrap fs calls to make them atomic but that is error prone and ugly).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to me like this might still need to be handled at the fs level, uploading to a temp path and renaming at the ODB level won't work for all of our filesystems (HTTP doesn't support move/rename)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pmrowla That atomicity is not something that fs should care about when uploading/downloading, this is an odb-level behavior.

HTTP doesn't support move/rename

Are operations already atomic there? Or it just doesn't support rename at all anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the HTTP case, it's atomic since the full POST/PUT request wouldn't be completed so the server should drop whatever was partially uploaded. And yeah, we don't support rename/move/copy at all since there's no HTTP method for that operation (unless you're using an extension built on top of HTTP like webdav)

It seems to me that both _upload and _upload_fobj should work the same way, and should both guarantee atomicity at the fs level - like how in localfs we do the explicit upload to tempfile and rename for both _upload and _upload_fobj

Copy link
Member

@efiop efiop Jul 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pmrowla Thanks for clarifying!

_upload_fobj is temporary until fsspec migration is complete and we can use put/get[_file] directly.

fs atomicity is unlikely to be guaranteed by all filesystems and might actually be undesirable in some use cases outside dvc (e.g. you might want to upload as much of a file as you can, or you might not care about atomicity so you might not want to waste an API call for rename), so it seems like it could be more robust if we do that in our odb layer (or fs wrapper after all?) for now.

Clearly, it seems like it would be useful to have the knowledge about whether or not particular fs operations are atomic so that we could waste the least api calls possible, so maybe our fsspec_wrapper is indeed a pretty good place for it for now, similar how, IIRC, in C libraries you have atomic_* functions, we could have something like put_file and atomic_put_file or atomic=True or something. Maybe this could be useful for fsspec in general as well, not quite sure right now πŸ€”

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't look like this PR is changing the old behaviour, so probably not worth blocking it because of it, but we'll def need to keep this in mind for the followups.

Expand Down Expand Up @@ -120,8 +124,13 @@ def add(
else:
raise

self.protect(cache_info)
self.state.save(cache_info, self.fs, hash_info)
try:
if verify:
self.check(hash_info, check_hash=True)
self.protect(cache_info)
self.state.save(cache_info, self.fs, hash_info)
except (ObjectFormatError, FileNotFoundError):
pass

callback = kwargs.get("download_callback")
if callback:
Expand Down