Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 34 additions & 50 deletions src/dvc_objects/fs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,25 +587,19 @@ def put(
recursive: bool = False, # pylint: disable=unused-argument
batch_size: int = None,
):
jobs = batch_size or self.jobs
if self.fs.async_impl:
return self.fs.put(
from_info,
to_info,
callback=callback,
batch_size=jobs,
recursive=recursive,
)
from .generic import copy
from .local import localfs

assert not recursive, "not implemented yet"
from_infos = [from_info] if isinstance(from_info, str) else from_info
to_infos = [to_info] if isinstance(to_info, str) else to_info

callback.set_size(len(from_infos))
executor = ThreadPoolExecutor(max_workers=jobs, cancel_on_error=True)
with executor:
put_file = callback.wrap_and_branch(self.put_file)
list(executor.imap_unordered(put_file, from_infos, to_infos))
jobs = batch_size or self.jobs
copy(
localfs,
from_info,
self,
to_info,
callback=callback,
batch_size=jobs,
)

def get(
self,
Expand All @@ -617,47 +611,37 @@ def get(
) -> None:
# Currently, the implementation is non-recursive if the paths are
# provided as a list, and recursive if it's a single path.
from .generic import copy
from .local import localfs

def get_file(rpath, lpath, **kwargs):
localfs.makedirs(localfs.path.parent(lpath), exist_ok=True)
self.fs.get_file(rpath, lpath, **kwargs)

get_file = callback.wrap_and_branch(get_file)

if isinstance(from_info, list) and isinstance(to_info, list):
from_infos: List[AnyFSPath] = from_info
to_infos: List[AnyFSPath] = to_info
from_infos: Union[AnyFSPath, List[AnyFSPath]] = from_info
to_infos: Union[AnyFSPath, List[AnyFSPath]] = to_info
else:
assert isinstance(from_info, str)
assert isinstance(to_info, str)

if not self.isdir(from_info):
callback.set_size(1)
return get_file(from_info, to_info)

from_infos = list(self.find(from_info))
if not from_infos:
return localfs.makedirs(to_info, exist_ok=True)

to_infos = [
localfs.path.join(to_info, *self.path.relparts(info, from_info))
for info in from_infos
]
if self.isdir(from_info):
from_infos = list(self.find(from_info))
if not from_infos:
return localfs.makedirs(to_info, exist_ok=True)

to_infos = [
localfs.path.join(to_info, *self.path.relparts(info, from_info))
for info in from_infos
]
else:
from_infos = from_info
to_infos = to_info

jobs = batch_size or self.jobs
if self.fs.async_impl:
return self.fs.get(
from_infos,
to_infos,
callback=callback,
batch_size=jobs,
)

callback.set_size(len(from_infos))
executor = ThreadPoolExecutor(max_workers=jobs, cancel_on_error=True)
with executor:
list(executor.imap_unordered(get_file, from_infos, to_infos))
copy(
Copy link
Contributor Author

@pmrowla pmrowla Jul 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could use transfer here to get reflinking instead of copy in the event we are only doing local->local operation on an fs that supports reflinks, but using copy keeps us consistent with existing get()/put() behavior

self,
from_infos,
localfs,
to_infos,
callback=callback,
batch_size=jobs,
)

def ukey(self, path: AnyFSPath) -> str:
return self.fs.ukey(path)
Expand Down