In [1]:
#convert

# App

> Watches the filesystem for file changes and creates transactions based on changes.


## General definitions

All imports are done at the beginning.

In [3]:
#export
import os
import time
import json
from typing import Dict, Any, List
from functools import partial
from entangle.entanglement import Entanglement
from entangle.client import Client
from entangle.server import listen

from watchdog.observers import Observer  
from watchdog.events import FileSystemEventHandler, DirCreatedEvent, DirDeletedEvent, FileCreatedEvent, FileDeletedEvent, DirModifiedEvent, FileModifiedEvent, DirMovedEvent, FileMovedEvent

ModuleNotFoundError: No module named 'OpenSSL'

In [None]:
#export
import hashlib
def compute_md5(fname):
    hash_md5 = hashlib.md5()
    with open(fname, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()

In [None]:
#export
# Save the transaction database
def save_transactions(database_file: str, database: Dict):
    data = json.dumps(database)
    with open(database_file, "w") as f:
        f.write(data)

In [None]:
#export
# Load the transaction database
def load_transactions(database_file) -> Dict:
    if not os.path.exists(database_file):
        print("No database found, creating a new one.")
        database = {}
        save_transaction_database(database_file, database)

    with open(database_file, "r") as f:
        database = json.loads(f.read())
    print("Database successfully loaded. {} transactions, {} lru_filenames".format(len(database["transactions"]), len(database["lru_filenames"])))
    return database

In [None]:
#export
class FileChangeHandler(FileSystemEventHandler):
    exclude_patterns = []
    mappings = {}
    database_path = "database.json"
    
    def get_sync_name(self, fname):
        for namespace, path in mappings.items():
            if not path.endswith("/"):
                path += "/"
            if fname.startswith(path):
                return fname.replace(path, namespace + ":")

    def on_moved(self, event):
        """
        event.is_directory
            True | False
        event.src_path
            path/to/observed/file
        """
        if self.is_excluded(event):
            return
        
        fname = self.get_sync_name(event.src_path)
        fname_moved = self.get_sync_name(event.dest_path)
        
        transactions = load_transactions(self.database_path)
        transaction = {"timestamp": time.time(), "type": "moved", "new_location": fname_moved}
        transactions[fname] = transaction
        transaction = {"timestamp": time.time(), "type": "moved", "old_location": fname, "md5": compute_md5(event.dest_path)}
        transactions[fname] = transaction
        save_transactions(self.database_path, transactions)
        
        if len(fname) > 128:
            fname = fname[:63] + "..." + fname[-62:]            
        print("\rMoved: {:<128} (len queue: {})".format(fname, len(transactions)), end="")


    def on_created(self, event):
        if event.is_directory:
            return
        if self.is_excluded(event):
            return
        
        fname = self.get_sync_name(event.src_path)
        
        transactions = load_transactions(self.database_path)
        transaction = {"timestamp": time.time(), "type": "created", "md5": compute_md5(event.src_path)}
        transactions[fname] = transaction
        save_transactions(self.database_path, transactions)
        
        if len(fname) > 128:
            fname = fname[:63] + "..." + fname[-62:]            
        print("\rCreated: {:<128} (len queue: {})".format(fname, len(transactions)), end="")

    def on_deleted(self, event):
        if self.is_excluded(event):
            return
        
        fname = self.get_sync_name(event.src_path)
        
        transactions = load_transactions(self.database_path)
        transaction = {"timestamp": time.time(), "type": "deleted"}
        transactions[fname] = transaction
        save_transactions(self.database_path, transactions)
        
        if len(fname) > 128:
            fname = fname[:63] + "..." + fname[-62:]
        print("\rDeleted: {:<128} (len queue: {})".format(fname, len(transactions)), end="")

    def on_modified(self, event):
        if event.is_directory:
            return
        if self.is_excluded(event):
            return
        fname = self.get_sync_name(event.src_path)
        
        transactions = load_transactions(self.database_path)
        transaction = {"timestamp": time.time(), "type": "modified", "md5": compute_md5(event.src_path)}
        transactions[fname] = transaction
        save_transactions(self.database_path, transactions)
        
        if len(fname) > 128:
            fname = fname[:63] + "..." + fname[-62:]
        print("\rModified: {:<128} (len queue: {})".format(fname, len(transactions)), end="")

    def is_excluded(self, event):
        if event.src_path.endswith("Neues Textdokument.txt"):
            return True

        # Filter by exclude pattern.
        for pattern in self.exclude_patterns:
            pattern = pattern.replace("\\", os.sep)
            pattern = pattern.replace("/", os.sep)
            if pattern.endswith(os.sep):
                if pattern in event.src_path:
                    return True
                if event.is_directory and event.src_path.endswith(pattern[:-1]):
                    return True
            else:
                if event.src_path.split(os.sep)[-1].startswith(pattern):
                    return True
                if event.src_path.endswith(pattern):
                    return True

        return False

In [None]:
#export
def initial_scan(handler):
    tracked_files = []
    paths = list(handler.mappings.values())
    
    # check filelist for deletions or modifications
    print("\n\rScanning Tracked Files")
    transactions = load_transactions(handler.database_path)
    for fname, v in transactions:
        namespace, name = fname.split(":")
        disk_name = os.path.join(handler.mappings[namespace], name)
        if len(fname) > 128:
            fname = fname[:63] + "..." + fname[-62:]
        print("\rScanning: {:<128} (len queue: {})".format(fname, len(_transaction_send_queue)), end="")
        
        if os.path.exists(disk_name):
            tracked_files.append(t.local_fname)
            if compute_md5(disk_name) != v["md5"]:
                handler.on_modified(FileModifiedEvent(t.local_fname))
        elif not v["type"] == "deleted":
            handler.on_deleted(FileDeletedEvent(disk_name))
    print("\n\rScanning Completed")

    for path in paths:
        print("n\rScanning: {}".format(path))
        print("(no changes)", end="")
        # check if a file was created that is not yet in filelist
        for f in [os.path.join(root, name) for root, dirs, files in os.walk(path) for name in files]:
            f = f.replace("/", os.sep)
            if f not in tracked_files:
                handler.on_created(FileCreatedEvent(f))
        print("\n\rScanning Completed")

In [None]:
#export
def on_retrieve_file(state, entanglement, data: Dict):
    print("on_retrieve_file: {}".format(data))
    namespace, name = data["fname"].split(":")
    disk_name = os.path.join(state["handler"].mappings[namespace], name)
    transactions = load_transactions(state["handler"].database_path)
    transactions[data["fname"]] = data["transaction"]
    save_transactions(state["handler"].database_path, transactions)
    with open(disk_name, "wb") as f:
        f.write(base64.b64decode(data["data"].encode("utf-8")))

    state["open_tasks"] -= 1

def retrieve_file(state, entanglement, fname):
    print("retrieve_file: {}".format(fname))
    data = {}
    
    namespace, name = data["fname"].split(":")
    disk_name = os.path.join(state["handler"].mappings[namespace], name)
    
    transactions = load_transactions(state["handler"].database_path)
    data["transaction"] = transactions[fname]
    data["fname"] = fname
    
    with open(disk_name, "rb") as f:
        data["data"] = base64.b64encode(f.read()).decode("utf-8")
    entanglement.remote_fun("on_retrieve_file")(transactions)

def on_get_database(state, entanglement, transactions: Dict):
    print("on_get_database: {}".format(transactions))
    transactions_local = load_transactions(state["handler"].database_path)
    
    # TODO compare database against local one
    # And request files that are more up to date by others
    # delete files that were deleted on remote.
    
    state["open_tasks"] -= 1

def get_database(state, entanglement):
    print("get_database")
    transactions = load_transactions(state["handler"].database_path)
    entanglement.remote_fun("on_sync_get_database")(transactions)
    
    
def format_len(size):
    if size > 1e12:
        return "{:.1f} TB".format(size/1e12)
    elif size > 1e9:
        return "{:.1f} GB".format(size/1e9)
    elif size > 1e6:
        return "{:.1f} MB".format(size/1e6)
    elif size > 1e3:
        return "{:.1f} KB".format(size/1e3)
    else:
        return "{:.1f} B".format(size)

def on_entangle(entanglement):
    state = {}
    entanglement.on_sync_retrieve_file = partial(on_retrieve_file, state, entanglement)
    entanglement.on_sync_get_database = partial(on_get_database, state, entanglement)
    entanglement.sync_get_database = partial(get_database, state, entanglement)
    print("Waiting 5 seconds for server to be ready.")
    time.sleep(5)
    print("Connected. Syncing...")
    while True:
        print("Issuing update of local database...")
        state["open_tasks"] = 1
        entanglement.remote_fun("sync_get_database")()
        while state["open_tasks"] > 0:
            time.sleep(1)

        print("Waiting 5 seconds before next sync round.")
        time.sleep(5)

In [None]:
#export
def run_sync():
    # Load user_data
    if "AppData" in os.environ: # Windows
        config_file = os.path.join(os.environ["AppData"], "backup_sync", "config.json")
        syncignore_path = os.path.join(os.environ["AppData"], "backup_sync", ".syncignore")
        database_path = os.path.join(os.environ["AppData"], "backup_sync", "database.json")
    else: # Linux
        config_file = os.path.join("/home", os.environ["USER"], ".backup_sync", "config.json")
        syncignore_path = os.path.join("/home", os.environ["USER"], ".backup_sync", ".syncignore")
        database_path = os.path.join("/home", os.environ["USER"], ".backup_sync", "database.json")
    if not os.path.exists(config_file):
        raise RuntimeError("Config does not exist: {}".format(config_file))
    with open(config_file, "r") as f:
        config = json.loads(f.read())

    # Load exclude patterns
    exclude_patterns=_EXCLUDE_PATTERNS
    if os.path.exists(syncignore_path):
        with open(syncignore_path, "r") as f:
            exclude_patterns = f.readlines()
        exclude_patterns = [pattern.replace("\n", "") for pattern in exclude_patterns]
        exclude_patterns = [pattern for pattern in exclude_patterns if pattern != "" and not pattern.startswith("#")]
        print("Ignore Patterns: {}".format(exclude_patterns))
    observer = Observer()
    handler = FileChangeHandler()
    handler.exclude_patterns = exclude_patterns
    handler.database_path = database_path
    handler.mappings = config["sync_to_local_folder"]
    initial_scan(handler)
    for path in paths.values():
        observer.schedule(handler, path=path, recursive=True)
    observer.start()

    print("Connecting...")
    # 1. Try connecting to all known hosts
    clients = []
    for hosts in config["known_hosts"]:
        clients.append(Client(host=hosts["host"], port=hosts["port"], password=hosts["password"], user=hosts["user"], callback=on_entangle, blocking=False))
    # 2. Start own server
    listen(host=config["host"], port=config["port"], callback=on_entangle, users=config["users"])
    
    observer.stop()

    observer.join()

## Tests

With all implemented it is time to test the implementations.
First check what is in the database.

In [None]:
#export
if __name__ == "__main__":
    run_sync()