diff --git a/src/dvc_objects/fs/base.py b/src/dvc_objects/fs/base.py index 0b6337c1..3e04349c 100644 --- a/src/dvc_objects/fs/base.py +++ b/src/dvc_objects/fs/base.py @@ -587,25 +587,19 @@ def put( recursive: bool = False, # pylint: disable=unused-argument batch_size: int = None, ): - jobs = batch_size or self.jobs - if self.fs.async_impl: - return self.fs.put( - from_info, - to_info, - callback=callback, - batch_size=jobs, - recursive=recursive, - ) + from .generic import copy + from .local import localfs assert not recursive, "not implemented yet" - from_infos = [from_info] if isinstance(from_info, str) else from_info - to_infos = [to_info] if isinstance(to_info, str) else to_info - - callback.set_size(len(from_infos)) - executor = ThreadPoolExecutor(max_workers=jobs, cancel_on_error=True) - with executor: - put_file = callback.wrap_and_branch(self.put_file) - list(executor.imap_unordered(put_file, from_infos, to_infos)) + jobs = batch_size or self.jobs + copy( + localfs, + from_info, + self, + to_info, + callback=callback, + batch_size=jobs, + ) def get( self, @@ -617,47 +611,37 @@ def get( ) -> None: # Currently, the implementation is non-recursive if the paths are # provided as a list, and recursive if it's a single path. + from .generic import copy from .local import localfs - def get_file(rpath, lpath, **kwargs): - localfs.makedirs(localfs.path.parent(lpath), exist_ok=True) - self.fs.get_file(rpath, lpath, **kwargs) - - get_file = callback.wrap_and_branch(get_file) - if isinstance(from_info, list) and isinstance(to_info, list): - from_infos: List[AnyFSPath] = from_info - to_infos: List[AnyFSPath] = to_info + from_infos: Union[AnyFSPath, List[AnyFSPath]] = from_info + to_infos: Union[AnyFSPath, List[AnyFSPath]] = to_info else: assert isinstance(from_info, str) assert isinstance(to_info, str) - - if not self.isdir(from_info): - callback.set_size(1) - return get_file(from_info, to_info) - - from_infos = list(self.find(from_info)) - if not from_infos: - return localfs.makedirs(to_info, exist_ok=True) - - to_infos = [ - localfs.path.join(to_info, *self.path.relparts(info, from_info)) - for info in from_infos - ] + if self.isdir(from_info): + from_infos = list(self.find(from_info)) + if not from_infos: + return localfs.makedirs(to_info, exist_ok=True) + + to_infos = [ + localfs.path.join(to_info, *self.path.relparts(info, from_info)) + for info in from_infos + ] + else: + from_infos = from_info + to_infos = to_info jobs = batch_size or self.jobs - if self.fs.async_impl: - return self.fs.get( - from_infos, - to_infos, - callback=callback, - batch_size=jobs, - ) - - callback.set_size(len(from_infos)) - executor = ThreadPoolExecutor(max_workers=jobs, cancel_on_error=True) - with executor: - list(executor.imap_unordered(get_file, from_infos, to_infos)) + copy( + self, + from_infos, + localfs, + to_infos, + callback=callback, + batch_size=jobs, + ) def ukey(self, path: AnyFSPath) -> str: return self.fs.ukey(path)