From e42d21d7c25a14ede5290ef851fc495658495560 Mon Sep 17 00:00:00 2001 From: hugo Date: Thu, 14 Mar 2024 16:46:31 +0000 Subject: [PATCH 1/3] add ability to exclude files in rsync command --- saturnfs/client/saturnfs.py | 120 +++++++++++++++++++++++++++++++++++- saturnfs/settings.py | 2 +- 2 files changed, 120 insertions(+), 2 deletions(-) diff --git a/saturnfs/client/saturnfs.py b/saturnfs/client/saturnfs.py index dc75ef5..c5fb2a2 100644 --- a/saturnfs/client/saturnfs.py +++ b/saturnfs/client/saturnfs.py @@ -1,11 +1,13 @@ from __future__ import annotations +import logging import math import os import weakref from concurrent.futures import ThreadPoolExecutor from copy import copy from datetime import datetime +from fnmatch import fnmatch from functools import partial from glob import has_magic from io import BytesIO, TextIOWrapper @@ -48,6 +50,9 @@ DEFAULT_CALLBACK = NoOpCallback() +logger = logging.getLogger(__name__) + + class _CachedTyped(_Cached): # Add typing to the metaclass to get around an issue with pylance # https://github.com/microsoft/pylance-release/issues/4384 @@ -781,9 +786,10 @@ def rm_bulk(self, paths: List[str], callback: Callback = DEFAULT_CALLBACK): self.invalidate_cache(full_path(owner_name, path)) i += settings.OBJECT_STORAGE_MAX_LIST_COUNT + def rsync(self, source: str, destination: str, delete_missing: bool = False, **kwargs): kwargs["fs"] = SaturnGenericFilesystem() - return rsync(source, destination, delete_missing=delete_missing, **kwargs) + return _rsync(source, destination, delete_missing=delete_missing, **kwargs) def list_uploads( self, path: str, is_copy: Optional[bool] = None @@ -1324,3 +1330,115 @@ def _is_saturnfs(self, protocol: str) -> bool: register_implementation(SaturnFS.protocol, SaturnFS) + + +def check_exclude_globs(input_string, exclude_globs) -> bool: + """ + returns True if input_string matches a list of globs that we want to exclude + """ + for glob in exclude_globs: + if fnmatch(input_string, glob): + return True + return False + + +def _rsync( + source, + destination, + delete_missing=False, + source_field="size", + dest_field="size", + update_cond="different", + inst_kwargs=None, + fs=None, + exclude_globs=None, + **kwargs, +): + """Sync files between two directory trees + + (experimental) + + Parameters + ---------- + source: str + Root of the directory tree to take files from. This must be a directory, but + do not include any terminating "/" character + destination: str + Root path to copy into. The contents of this location should be + identical to the contents of ``source`` when done. This will be made a + directory, and the terminal "/" should not be included. + delete_missing: bool + If there are paths in the destination that don't exist in the + source and this is True, delete them. Otherwise, leave them alone. + source_field: str | callable + If ``update_field`` is "different", this is the key in the info + of source files to consider for difference. Maybe a function of the + info dict. + dest_field: str | callable + If ``update_field`` is "different", this is the key in the info + of destination files to consider for difference. May be a function of + the info dict. + update_cond: "different"|"always"|"never" + If "always", every file is copied, regardless of whether it exists in + the destination. If "never", files that exist in the destination are + not copied again. If "different" (default), only copy if the info + fields given by ``source_field`` and ``dest_field`` (usually "size") + are different. Other comparisons may be added in the future. + inst_kwargs: dict|None + If ``fs`` is None, use this set of keyword arguments to make a + GenericFileSystem instance + fs: GenericFileSystem|None + Instance to use if explicitly given. The instance defines how to + to make downstream file system instances from paths. + """ + if exclude_globs is None: + exclude_globs = [] + fs = fs or GenericFileSystem(**(inst_kwargs or {})) + source = fs._strip_protocol(source) + destination = fs._strip_protocol(destination) + allfiles = fs.find(source, withdirs=True, detail=True) + allfiles = {a: v for a, v in allfiles.items() if not check_exclude_globs(v["name"], exclude_globs)} + if not fs.isdir(source): + raise ValueError("Can only rsync on a directory") + otherfiles = fs.find(destination, withdirs=True, detail=True) + dirs = [ + a + for a, v in allfiles.items() + if v["type"] == "directory" and a.replace(source, destination) not in otherfiles + ] + logger.debug(f"{len(dirs)} directories to create") + for dirn in dirs: + # no async + fs.mkdirs(dirn.replace(source, destination), exist_ok=True) + allfiles = {a: v for a, v in allfiles.items() if v["type"] == "file"} + logger.debug(f"{len(allfiles)} files to consider for copy") + to_delete = [ + o + for o, v in otherfiles.items() + if o.replace(destination, source) not in allfiles and v["type"] == "file" + ] + for k, v in allfiles.copy().items(): + otherfile = k.replace(source, destination) + if otherfile in otherfiles: + if update_cond == "always": + allfiles[k] = otherfile + elif update_cond == "different": + inf1 = source_field(v) if callable(source_field) else v[source_field] + v2 = otherfiles[otherfile] + inf2 = dest_field(v2) if callable(dest_field) else v2[dest_field] + if inf1 != inf2: + # details mismatch, make copy + allfiles[k] = otherfile + else: + # details match, don't copy + allfiles.pop(k) + else: + # file not in target yet + allfiles[k] = otherfile + logger.debug(f"{len(allfiles)} files to copy") + if allfiles: + source_files, target_files = zip(*allfiles.items()) + fs.cp(source_files, target_files, **kwargs) + logger.debug(f"{len(to_delete)} files to delete") + if delete_missing: + fs.rm(to_delete) \ No newline at end of file diff --git a/saturnfs/settings.py b/saturnfs/settings.py index 0f1eb0d..8e0aea1 100644 --- a/saturnfs/settings.py +++ b/saturnfs/settings.py @@ -12,4 +12,4 @@ SATURNFS_PROTOCOL = "sfs" SATURNFS_FILE_PREFIX = f"{SATURNFS_PROTOCOL}://" -SATURNFS_DEFAULT_MAX_WORKERS = max(1, int(os.getenv("SATURNFS_DEFAULT_MAX_WORKERS", "10"))) +SATURNFS_DEFAULT_MAX_WORKERS = max(1, int(os.getenv("SATURNFS_DEFAULT_MAX_WORKERS", "20"))) From 03a10f4148cc69666a1b3804b1ae34ad09a75412 Mon Sep 17 00:00:00 2001 From: hugo Date: Thu, 14 Mar 2024 16:54:43 +0000 Subject: [PATCH 2/3] linting --- saturnfs/client/saturnfs.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/saturnfs/client/saturnfs.py b/saturnfs/client/saturnfs.py index cb9fc0c..3ee82cd 100644 --- a/saturnfs/client/saturnfs.py +++ b/saturnfs/client/saturnfs.py @@ -1419,7 +1419,9 @@ def _rsync( source = fs._strip_protocol(source) destination = fs._strip_protocol(destination) allfiles = fs.find(source, withdirs=True, detail=True) - allfiles = {a: v for a, v in allfiles.items() if not check_exclude_globs(v["name"], exclude_globs)} + allfiles = { + a: v for a, v in allfiles.items() if not check_exclude_globs(v["name"], exclude_globs) + } if not fs.isdir(source): raise ValueError("Can only rsync on a directory") otherfiles = fs.find(destination, withdirs=True, detail=True) @@ -1463,4 +1465,4 @@ def _rsync( fs.cp(source_files, target_files, **kwargs) logger.debug(f"{len(to_delete)} files to delete") if delete_missing: - fs.rm(to_delete) \ No newline at end of file + fs.rm(to_delete) From 49c37b09c9d1d42a60059ac63e9fc44d02e8ff34 Mon Sep 17 00:00:00 2001 From: hugo Date: Thu, 14 Mar 2024 16:56:59 +0000 Subject: [PATCH 3/3] linting --- .pylintrc | 2 ++ saturnfs/client/saturnfs.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.pylintrc b/.pylintrc index bb9c3df..c5df30e 100644 --- a/.pylintrc +++ b/.pylintrc @@ -59,3 +59,5 @@ disable= useless-return, wrong-import-order, wrong-import-position, + consider-using-min-builtin, + consider-using-max-builtin, \ No newline at end of file diff --git a/saturnfs/client/saturnfs.py b/saturnfs/client/saturnfs.py index 3ee82cd..2ac2d0b 100644 --- a/saturnfs/client/saturnfs.py +++ b/saturnfs/client/saturnfs.py @@ -17,7 +17,7 @@ from fsspec.caching import BaseCache from fsspec.callbacks import Callback, NoOpCallback from fsspec.core import split_protocol -from fsspec.generic import GenericFileSystem, rsync +from fsspec.generic import GenericFileSystem from fsspec.implementations.local import LocalFileSystem, make_path_posix from fsspec.registry import register_implementation from fsspec.spec import AbstractBufferedFile, AbstractFileSystem, _Cached