diff --git a/saturnfs/client/saturnfs.py b/saturnfs/client/saturnfs.py index c95cc07..0c76114 100644 --- a/saturnfs/client/saturnfs.py +++ b/saturnfs/client/saturnfs.py @@ -3,8 +3,10 @@ import math import os import weakref +from concurrent.futures import ThreadPoolExecutor from copy import copy from datetime import datetime +from functools import partial from glob import has_magic from io import BytesIO, TextIOWrapper from typing import Any, BinaryIO, Dict, Iterable, List, Optional, Tuple, Union, overload @@ -777,7 +779,7 @@ def rm_bulk(self, paths: List[str], callback: Callback = DEFAULT_CALLBACK): i += settings.OBJECT_STORAGE_MAX_LIST_COUNT def rsync(self, source: str, destination: str, delete_missing: bool = False, **kwargs): - kwargs["fs"] = SaturnGenericFilesystem(sfs=self) + kwargs["fs"] = SaturnGenericFilesystem() return rsync(source, destination, delete_missing=delete_missing, **kwargs) def list_uploads( @@ -1231,10 +1233,10 @@ class SaturnGenericFilesystem(GenericFileSystem): def __init__( self, default_method="current", - sfs: Optional[SaturnFS] = None, + max_workers: int = settings.SATURNFS_DEFAULT_MAX_WORKERS, **kwargs, ): - self.sfs = sfs or SaturnFS() + self._thread_pool = ThreadPoolExecutor(max_workers, thread_name_prefix="sfs-generic") super().__init__(default_method, **kwargs) async def _cp_file( @@ -1257,9 +1259,17 @@ async def _cp_file( if self._is_local(proto1) and self._is_saturnfs(proto2): if blocksize < settings.S3_MIN_PART_SIZE: blocksize = settings.S3_MIN_PART_SIZE - return self.sfs.put_file(path1, path2, block_size=blocksize, **kwargs) + sfs = SaturnFS() + return await self.loop.run_in_executor( + self._thread_pool, + partial(sfs.put_file, path1, path2, block_size=blocksize, max_workers=1, **kwargs), + ) elif self._is_saturnfs(proto1) and self._is_local(proto2): - return self.sfs.get_file(path1, path2, block_size=blocksize, **kwargs) + sfs = SaturnFS() + return await self.loop.run_in_executor( + self._thread_pool, + partial(sfs.get_file, path1, path2, block_size=blocksize, max_workers=1, **kwargs), + ) return await super()._cp_file(url, url2, blocksize, **kwargs)