In [29]:
from pathlib import Path
from typing import Dict, List, Set
import hashlib
from fnmatch import fnmatch
import logging
from concurrent.futures import ThreadPoolExecutor
import os
import abc
import shutil
from collections import defaultdict

from tqdm import tqdm

In [30]:
class FileHasher(abc.ABC):
    @abc.abstractmethod
    def __call__(self, file_path:Path) -> str:
        raise NotImplementedError

class ModificationStampHasher(FileHasher):
    def __init__(self, base_path: Path) -> None:
        self.base_path = base_path

    def __call__(self, file_path:Path) -> str:
        rel_path = file_path.relative_to(self.base_path)
        return f"{rel_path}_{os.path.getmtime(file_path)}"

class MD5Hasher(FileHasher):
    def __call__(self, file_path:Path) -> str:
        """Return the md5 hash of a file."""
        hasher = hashlib.md5()
        block_size = 64 * 1024

        with open(file_path, "rb") as file:
            while True:
                data = file.read(block_size)
                if not data:
                    break
                hasher.update(data)

        return hasher.hexdigest()

In [31]:
class Syncer:
    def __init__(
        self,
        source_folder: Path,
        target_folder: Path,
        excluded_paths: List[Path] = None,
        hashing_method: str = "md5",
        sync_method: str = "path",
        n_threads: int = 1,
    ) -> None:
        self.source_folder = source_folder
        self.target_folder = target_folder
        self.excluded_paths = excluded_paths or []
        self.n_threads = n_threads

        self.sync_method = None
        if sync_method == "path":
            self.sync_method = self._sync_by_path
        else:
            raise ValueError(f"Unknown sync method {sync_method}")

        self.source_file_hash_func: FileHasher = None
        self.target_file_hash_func: FileHasher = None
        if hashing_method == "md5":
            self.source_file_hash_func = MD5Hasher()
            self.target_file_hash_func = MD5Hasher()
        elif hashing_method == "metadata":
            self.source_file_hash_func = ModificationStampHasher(self.source_folder)
            self.target_file_hash_func = ModificationStampHasher(self.target_folder)
        else:
            raise ValueError(f"Unknown hashing method {hashing_method}")

    def _validate_file(self, file_path: Path) -> bool:
        """Return True if file is valid, False otherwise."""
        try:
            if file_path.is_file() and not any(
                fnmatch(str(file_path), str(excluded_path))
                for excluded_path in self.excluded_paths
            ):
                return True
        except OSError:
            logging.warning(f"Could not read {file_path}")

    def _all_files(self, folder_path: Path) -> List[Path]:
        """Return a list of all valid files in a folder."""
        paths = list(folder_path.rglob("*"))

        with ThreadPoolExecutor(max_workers=self.n_threads) as executor:
            with tqdm(total=len(paths)) as pbar:
                futures = []
                for p in paths:
                    future = executor.submit(self._validate_file, p)
                    future.add_done_callback(lambda _: pbar.update())
                    futures.append(future)

                paths = [p for p, future in zip(paths, futures) if future.result()]

        return paths

    def _path_hash_mapping(
        self, file_paths: List[Path], file_hash_func
    ) -> Dict[Path, str]:
        with ThreadPoolExecutor(max_workers=self.n_threads) as executor:
            with tqdm(total=len(file_paths)) as pbar:
                futures = []
                for file_path in file_paths:
                    future = executor.submit(file_hash_func, file_path)
                    future.add_done_callback(lambda _: pbar.update())
                    futures.append(future)

                file_hashes = [future.result() for future in futures]

        result = dict(zip(file_paths, file_hashes))

        return result

    def _relative_path_hash_mapping(self, mapping, base_path) -> Dict[Path, str]:
        result = {}
        for path, hash in mapping.items():
            rel_path = path.relative_to(base_path)
            result[rel_path] = hash

        return result

    def _sync_by_path(
        self,
        source_path_hash_mapping: Dict[Path, str],
        target_path_hash_mapping: Dict[Path, str],
    ) -> None:
        statistics = {"unchanged": 0, "changed": 0, "new": 0, "removed": 0}

        logging.info("Deleting files in target that are not in source...")
        for rel_path, hash in tqdm(target_path_hash_mapping.items()):
            if rel_path not in source_path_hash_mapping.keys():
                (self.target_folder / rel_path).unlink()
                statistics["removed"] += 1

        logging.info("Copying files from source to target...")
        for rel_path, hash in tqdm(source_path_hash_mapping.items()):
            if rel_path in target_path_hash_mapping.keys():
                if hash != target_path_hash_mapping[rel_path]:
                    (self.target_folder / rel_path).unlink()
                    shutil.copy2(
                        self.source_folder / rel_path,
                        (self.target_folder / rel_path).parent,
                    )
                    statistics["changed"] += 1
                else:
                    statistics["unchanged"] += 1

            else:
                (self.target_folder / rel_path).parent.mkdir(
                    parents=True, exist_ok=True
                )
                shutil.copy2(
                    self.source_folder / rel_path,
                    (self.target_folder / rel_path).parent,
                )
                statistics["new"] += 1

        logging.info(f"Synced {sum(statistics.values())} files:")
        for key, value in statistics.items():
            logging.info(f"{key}: {value}")

    def _invert_mapping(self, mapping: Dict[Path, str]) -> Dict[str, List[Path]]:
        result = defaultdict(list)
        for path, hash in mapping.items():
            result[hash].append(path)

        return result

    def _sync_by_hash(
        source_path_hash_mapping: Dict[Path, str],
        target_path_hash_mapping: Dict[Path, str],
    ) -> None:
        # will be faster when large subtrees are moved or renamed but the file contents are the same
        # not tested yet

        raise NotImplementedError

        source_hash_paths_mapping = self._invert_mapping(source_path_hash_mapping)
        target_hash_paths_mapping = self._invert_mapping(target_path_hash_mapping)

        statistics = {"unchanged": 0, "changed": 0, "new": 0, "removed": 0}

        logging.info("Deleting files in target that are not in source...")
        for hash, paths in tqdm(target_hash_paths_mapping.items()):
            if hash not in source_hash_paths_mapping.keys():
                for path in paths:
                    path.unlink()
                    statistics["removed"] += 1

        logging.info("Copying files from source to target...")
        for hash, paths in tqdm(source_hash_paths_mapping.items()):
            if hash not in target_hash_paths_mapping.keys():
                for path in paths:
                    (self.target_folder / path).parent.mkdir(
                        parents=True, exist_ok=True
                    )
                    shutil.copy2(
                        self.source_folder / path, (self.target_folder / path).parent
                    )
                    statistics["new"] += 1
                continue
            
            if paths == target_hash_paths_mapping[hash]:
                statistics["unchanged"] += len(paths)
                continue

            new_paths = paths - target_hash_paths_mapping[hash]
            old_paths = target_hash_paths_mapping[hash] - paths
            for path in new_paths:
                (self.target_folder / path).parent.mkdir(
                    parents=True, exist_ok=True
                )
                if len(old_paths) > 0:
                    shutil.move(
                        self.source_folder / old_paths.pop(),
                        self.target_folder / path,
                    )
                    statistics["moved"] += 1
                else:
                    shutil.copy2(
                        self.source_folder / path,
                        (self.target_folder / path).parent,
                    )
                    statistics["new"] += 1

        logging.info(f"Synced {sum(statistics.values())} files:")
        for key, value in statistics.items():
            logging.info(f"{key}: {value}")

    def __call__(self) -> None:
        logging.info(f"Syncing {self.source_folder} to {self.target_folder}")

        logging.info("Getting all valid files in source folder...")
        source_file_paths = self._all_files(self.source_folder)
        logging.info("Hashing all valid files in source folder...")
        source_path_hash_mapping = self._path_hash_mapping(
            source_file_paths, self.source_file_hash_func
        )
        logging.info("Making paths relative to source folder...")
        source_path_map_hashing = self._relative_path_hash_mapping(
            source_path_hash_mapping, self.source_folder
        )

        logging.info("Getting all valid files in target folder...")
        target_file_paths = self._all_files(self.target_folder)
        logging.info("Hashing all valid files in target folder...")
        target_path_hash_mapping = self._path_hash_mapping(
            target_file_paths, self.target_file_hash_func
        )
        logging.info("Making paths relative to target folder...")
        target_path_hash_mapping = self._relative_path_hash_mapping(
            target_path_hash_mapping, self.target_folder
        )

        logging.info("Syncing files...")
        self.sync_method(source_path_map_hashing, target_path_hash_mapping)

In [32]:
logging.basicConfig(level=logging.INFO)

source_path = Path.cwd().parent / "tests" / "test_data" / "semester_10"
target_path = Path.cwd().parent / "tests" / "test_data" / "semester_10_copy"

syncer = Syncer(
    source_folder=source_path,
    target_folder=target_path,
    excluded_paths=None,
    sync_method="path",
    hashing_method="metadata",
    n_threads=4,
)

syncer()


INFO:root:Syncing d:\Dokumente\Software_Projekte\FolderSync\tests\test_data\semester_10 to d:\Dokumente\Software_Projekte\FolderSync\tests\test_data\semester_10_copy
INFO:root:Getting all valid files in source folder...
100%|██████████| 2165/2165 [00:00<00:00, 3567.50it/s]
INFO:root:Hashing all valid files in source folder...
100%|██████████| 1583/1583 [00:00<00:00, 2155.54it/s]
INFO:root:Making paths relative to source folder...
INFO:root:Getting all valid files in target folder...
100%|██████████| 2118/2118 [00:00<00:00, 4540.57it/s]
INFO:root:Hashing all valid files in target folder...
100%|██████████| 1582/1582 [00:00<00:00, 3638.45it/s]
INFO:root:Making paths relative to target folder...
INFO:root:Syncing files...
INFO:root:Deleting files in target that are not in source...
100%|██████████| 1582/1582 [00:00<00:00, 264316.00it/s]
INFO:root:Copying files from source to target...
100%|██████████| 1583/1583 [00:00<00:00, 83563.02it/s]
INFO:root:Synced 1583 files:
INFO:root:unchanged: 