Skip to content

Commit

Permalink
Optimize sfs rsync for many small files rather than few large files
Browse files Browse the repository at this point in the history
  • Loading branch information
bhperry committed Mar 4, 2024
1 parent 143e126 commit 04a4d4b
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions saturnfs/client/saturnfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import math
import os
import weakref
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from datetime import datetime
from functools import partial
from glob import has_magic
from io import BytesIO, TextIOWrapper
from typing import Any, BinaryIO, Dict, Iterable, List, Optional, Tuple, Union, overload
Expand Down Expand Up @@ -777,7 +779,7 @@ def rm_bulk(self, paths: List[str], callback: Callback = DEFAULT_CALLBACK):
i += settings.OBJECT_STORAGE_MAX_LIST_COUNT

def rsync(self, source: str, destination: str, delete_missing: bool = False, **kwargs):
kwargs["fs"] = SaturnGenericFilesystem(sfs=self)
kwargs["fs"] = SaturnGenericFilesystem()
return rsync(source, destination, delete_missing=delete_missing, **kwargs)

def list_uploads(
Expand Down Expand Up @@ -1231,10 +1233,10 @@ class SaturnGenericFilesystem(GenericFileSystem):
def __init__(
self,
default_method="current",
sfs: Optional[SaturnFS] = None,
max_workers: int = settings.SATURNFS_DEFAULT_MAX_WORKERS,
**kwargs,
):
self.sfs = sfs or SaturnFS()
self._thread_pool = ThreadPoolExecutor(max_workers, thread_name_prefix="sfs-generic")
super().__init__(default_method, **kwargs)

async def _cp_file(
Expand All @@ -1257,9 +1259,17 @@ async def _cp_file(
if self._is_local(proto1) and self._is_saturnfs(proto2):
if blocksize < settings.S3_MIN_PART_SIZE:
blocksize = settings.S3_MIN_PART_SIZE
return self.sfs.put_file(path1, path2, block_size=blocksize, **kwargs)
sfs = SaturnFS()
return await self.loop.run_in_executor(
self._thread_pool,
partial(sfs.put_file, path1, path2, block_size=blocksize, max_workers=1, **kwargs),
)
elif self._is_saturnfs(proto1) and self._is_local(proto2):
return self.sfs.get_file(path1, path2, block_size=blocksize, **kwargs)
sfs = SaturnFS()
return await self.loop.run_in_executor(
self._thread_pool,
partial(sfs.get_file, path1, path2, block_size=blocksize, max_workers=1, **kwargs),
)

return await super()._cp_file(url, url2, blocksize, **kwargs)

Expand Down

0 comments on commit 04a4d4b

Please sign in to comment.