Skip to content

Commit

Permalink
Merge pull request #19 from saturncloud/bhperry/rsync-threadpooling
Browse files Browse the repository at this point in the history
rsync threadpool for batched file uploads
  • Loading branch information
bhperry committed Mar 5, 2024
2 parents 143e126 + 2ca5127 commit 5ce019c
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 9 deletions.
24 changes: 22 additions & 2 deletions saturnfs/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,25 @@ def delete(path: str, recursive: bool):
default=False,
help="Delete paths from the destination that are missing in the source",
)
def rsync(source_path: str, destination_path: str, delete_missing: bool, quiet: bool):
@click.option(
"--max-batch-workers",
type=int,
default=settings.SATURNFS_DEFAULT_MAX_WORKERS,
help="Maximum number of threads to run for batched file uploads",
)
@click.option(
"--max-file-workers",
type=int,
default=1,
help="Maximum number of threads to run per file for parallel chunk upload/download",
)
def rsync(
source_path: str,
destination_path: str,
delete_missing: bool,
quiet: bool,
**kwargs,
):
"""
Recursively sync files between two directory trees
"""
Expand All @@ -202,7 +220,9 @@ def rsync(source_path: str, destination_path: str, delete_missing: bool, quiet:
operation = file_op(src_is_local, dst_is_local)
callback = FileOpCallback(operation=operation)

sfs.rsync(source_path, destination_path, delete_missing=delete_missing, callback=callback)
sfs.rsync(
source_path, destination_path, delete_missing=delete_missing, callback=callback, **kwargs
)


@cli.command("ls")
Expand Down
77 changes: 70 additions & 7 deletions saturnfs/client/saturnfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import math
import os
import weakref
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from datetime import datetime
from functools import partial
from glob import has_magic
from io import BytesIO, TextIOWrapper
from typing import Any, BinaryIO, Dict, Iterable, List, Optional, Tuple, Union, overload
Expand Down Expand Up @@ -58,6 +60,9 @@ class SaturnFS(AbstractFileSystem, metaclass=_CachedTyped): # pylint: disable=i
protocol = "sfs"

def __init__(self, *args, **storage_options):
if self._cached:
# reusing instance, don't change
return
self.object_storage_client = ObjectStorageClient()
self.file_transfer = FileTransferClient()
weakref.finalize(self, self.close)
Expand Down Expand Up @@ -776,9 +781,22 @@ def rm_bulk(self, paths: List[str], callback: Callback = DEFAULT_CALLBACK):
self.invalidate_cache(full_path(owner_name, path))
i += settings.OBJECT_STORAGE_MAX_LIST_COUNT

def rsync(self, source: str, destination: str, delete_missing: bool = False, **kwargs):
kwargs["fs"] = SaturnGenericFilesystem(sfs=self)
return rsync(source, destination, delete_missing=delete_missing, **kwargs)
def rsync(
self,
source: str,
destination: str,
delete_missing: bool = False,
max_batch_workers: int = settings.SATURNFS_DEFAULT_MAX_WORKERS,
max_file_workers: int = 1,
**kwargs,
) -> None:
kwargs["fs"] = SaturnGenericFilesystem(
max_batch_workers=max_batch_workers, max_file_workers=max_file_workers
)
rsync(source, destination, delete_missing=delete_missing, **kwargs)
if destination.startswith(settings.SATURNFS_FILE_PREFIX):
self.invalidate_cache(destination)
return None

def list_uploads(
self, path: str, is_copy: Optional[bool] = None
Expand Down Expand Up @@ -1231,10 +1249,31 @@ class SaturnGenericFilesystem(GenericFileSystem):
def __init__(
self,
default_method="current",
sfs: Optional[SaturnFS] = None,
max_batch_workers: int = settings.SATURNFS_DEFAULT_MAX_WORKERS,
max_file_workers: int = 1,
**kwargs,
):
self.sfs = sfs or SaturnFS()
"""
Parameters
----------
default_method: str (optional)
Defines how to configure backend FS instances. Options are:
- "default": instantiate like FSClass(), with no
extra arguments; this is the default instance of that FS, and can be
configured via the config system
- "generic": takes instances from the `_generic_fs` dict in this module,
which you must populate before use. Keys are by protocol
- "current": takes the most recently instantiated version of each FS
max_batch_workers: int (optional)
Defines the max size of the thread pool used to copy files asynchronously
max_file_workers: int (optional)
Defines the maximum number of threads used for an individual file copy to transfer
multiple chunks in parallel. Files smaller than 5MiB will always run on a single thread.
The total maximum number of threads used will be max_batch_workers * max_file_workers
"""
self._thread_pool = ThreadPoolExecutor(max_batch_workers, thread_name_prefix="sfs-generic")
self._max_file_workers = max_file_workers
super().__init__(default_method, **kwargs)

async def _cp_file(
Expand All @@ -1257,9 +1296,33 @@ async def _cp_file(
if self._is_local(proto1) and self._is_saturnfs(proto2):
if blocksize < settings.S3_MIN_PART_SIZE:
blocksize = settings.S3_MIN_PART_SIZE
return self.sfs.put_file(path1, path2, block_size=blocksize, **kwargs)
# Ensure fresh instance for dedicated client sessions per thread
sfs = SaturnFS(skip_instance_cache=True)
return await self.loop.run_in_executor(
self._thread_pool,
partial(
sfs.put_file,
path1,
path2,
block_size=blocksize,
max_workers=self._max_file_workers,
**kwargs,
),
)
elif self._is_saturnfs(proto1) and self._is_local(proto2):
return self.sfs.get_file(path1, path2, block_size=blocksize, **kwargs)
# Ensure fresh instance for dedicated client sessions per thread
sfs = SaturnFS(skip_instance_cache=True)
return await self.loop.run_in_executor(
self._thread_pool,
partial(
sfs.get_file,
path1,
path2,
block_size=blocksize,
max_workers=self._max_file_workers,
**kwargs,
),
)

return await super()._cp_file(url, url2, blocksize, **kwargs)

Expand Down

0 comments on commit 5ce019c

Please sign in to comment.