Skip to content

Commit

Permalink
Add callback on rsync CLI command
Browse files Browse the repository at this point in the history
  • Loading branch information
bhperry committed Feb 26, 2024
1 parent 70f7468 commit 497a3ad
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 5 deletions.
17 changes: 15 additions & 2 deletions saturnfs/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,19 +177,32 @@ def delete(path: str, recursive: bool):
@cli.command("rsync")
@click.argument("source_path", type=str)
@click.argument("destination_path", type=str)
@click.option("--quiet", "-q", is_flag=True, default=False, help="Do not print file operations")
@click.option(
"-d",
"--delete-missing",
is_flag=True,
default=False,
help="Delete paths from the destination that are missing in the source",
)
def rsync(source_path: str, destination_path: str, delete_missing: bool):
def rsync(source_path: str, destination_path: str, delete_missing: bool, quiet: bool):
"""
Recursively sync files between two directory trees
"""
sfs = SaturnFS()
sfs.rsync(source_path, destination_path, delete_missing=delete_missing)

src_is_local = not source_path.startswith(settings.SATURNFS_FILE_PREFIX)
dst_is_local = not destination_path.startswith(settings.SATURNFS_FILE_PREFIX)
if src_is_local and dst_is_local:
raise SaturnError(PathErrors.AT_LEAST_ONE_REMOTE_PATH)

if quiet:
callback = NoOpCallback()
else:
operation = file_op(src_is_local, dst_is_local)
callback = FileOpCallback(operation=operation)

sfs.rsync(source_path, destination_path, delete_missing=delete_missing, callback=callback)


@cli.command("ls")
Expand Down
12 changes: 9 additions & 3 deletions saturnfs/client/saturnfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from fsspec.spec import AbstractBufferedFile, AbstractFileSystem, _Cached
from fsspec.utils import other_paths
from saturnfs import settings
from saturnfs.cli.callback import FileOpCallback
from saturnfs.client.file_transfer import (
DownloadPart,
FileTransferClient,
Expand Down Expand Up @@ -1248,18 +1249,23 @@ async def _cp_file(
# by put/get instead of opening as a buffered file
proto1, path1 = split_protocol(url)
proto2, path2 = split_protocol(url2)
if isinstance(callback, FileOpCallback) and not callback.inner:
callback.branch(path1, path2, kwargs)
else:
kwargs["callback"] = callback

if self._is_local(proto1) and self._is_saturnfs(proto2):
if blocksize < settings.S3_MIN_PART_SIZE:
blocksize = settings.S3_MIN_PART_SIZE
return self.sfs.put_file(
path1, path2, callback=callback, block_size=blocksize, **kwargs
path1, path2, block_size=blocksize, **kwargs
)
elif self._is_saturnfs(proto1) and self._is_local(proto2):
return self.sfs.get_file(
path1, path2, callback=callback, block_size=blocksize, **kwargs
path1, path2, block_size=blocksize, **kwargs
)

return await super()._cp_file(url, url2, blocksize, callback, **kwargs)
return await super()._cp_file(url, url2, blocksize, **kwargs)

def _is_local(self, protocol: str) -> bool:
if isinstance(LocalFileSystem.protocol, tuple):
Expand Down

0 comments on commit 497a3ad

Please sign in to comment.