Skip to content

Commit

Permalink
Configurable num batch and file workers
Browse files Browse the repository at this point in the history
  • Loading branch information
bhperry committed Mar 4, 2024
1 parent 04a4d4b commit 3becf36
Showing 1 changed file with 20 additions and 4 deletions.
24 changes: 20 additions & 4 deletions saturnfs/client/saturnfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1233,10 +1233,12 @@ class SaturnGenericFilesystem(GenericFileSystem):
def __init__(
self,
default_method="current",
max_workers: int = settings.SATURNFS_DEFAULT_MAX_WORKERS,
max_batch_workers: int = settings.SATURNFS_DEFAULT_MAX_WORKERS,
max_workers_per_file: int = 1,
**kwargs,
):
self._thread_pool = ThreadPoolExecutor(max_workers, thread_name_prefix="sfs-generic")
self._thread_pool = ThreadPoolExecutor(max_batch_workers, thread_name_prefix="sfs-generic")
self._max_workers_per_file = max_workers_per_file
super().__init__(default_method, **kwargs)

async def _cp_file(
Expand All @@ -1262,13 +1264,27 @@ async def _cp_file(
sfs = SaturnFS()
return await self.loop.run_in_executor(
self._thread_pool,
partial(sfs.put_file, path1, path2, block_size=blocksize, max_workers=1, **kwargs),
partial(
sfs.put_file,
path1,
path2,
block_size=blocksize,
max_workers=self._max_workers_per_file,
**kwargs,
),
)
elif self._is_saturnfs(proto1) and self._is_local(proto2):
sfs = SaturnFS()
return await self.loop.run_in_executor(
self._thread_pool,
partial(sfs.get_file, path1, path2, block_size=blocksize, max_workers=1, **kwargs),
partial(
sfs.get_file,
path1,
path2,
block_size=blocksize,
max_workers=self._max_workers_per_file,
**kwargs,
),
)

return await super()._cp_file(url, url2, blocksize, **kwargs)
Expand Down

0 comments on commit 3becf36

Please sign in to comment.