diff --git a/snap/hooks/configure b/snap/hooks/configure index f5f407e..ed03702 100755 --- a/snap/hooks/configure +++ b/snap/hooks/configure @@ -19,7 +19,4 @@ fi snapctl restart immich-distribution.postgres snapctl restart immich-distribution.typesense snapctl restart immich-distribution.haproxy - -snapctl restart immich-distribution.sync-removed-fs -snapctl restart immich-distribution.sync-created-fs -snapctl restart immich-distribution.sync-database-watch +snapctl restart immich-distribution.sync-service diff --git a/snap/snapcraft.yaml b/snap/snapcraft.yaml index 5c0bc28..5a05394 100644 --- a/snap/snapcraft.yaml +++ b/snap/snapcraft.yaml @@ -132,26 +132,8 @@ apps: plugs: - network - sync-removed-fs: - command: bin/sync-removed-watch - daemon: simple - restart-delay: 10s - after: - - immich-server - plugs: - - network - - sync-created-fs: - command: bin/sync-created-watch - daemon: simple - restart-delay: 10s - after: - - immich-server - plugs: - - network - - sync-database-watch: - command: bin/sync-database-watch + sync-service: + command: bin/sync-service.sh daemon: simple restart-delay: 10s after: @@ -373,6 +355,13 @@ parts: - jq - xz-utils + sync: + plugin: nil + override-build: + ../../python/install/usr/local/bin/python3 ../../python/install/usr/local/bin/pip3 install watchfiles requests psycopg2-binary --target $SNAPCRAFT_PART_INSTALL/opt/pipsyncenv + after: + - python + patches: source: patches plugin: dump diff --git a/src/bin/load-env b/src/bin/load-env index 04d025f..1388ea5 100755 --- a/src/bin/load-env +++ b/src/bin/load-env @@ -10,6 +10,8 @@ snapctl() { export LD_LIBRARY_PATH="$SNAP/usr/local/pgsql/lib:$SNAP/usr/lib/x86_64-linux-gnu/pulseaudio:$SNAP/usr/lib/x86_64-linux-gnu/libfswatch:$LD_LIBRARY_PATH" +export PGDATA="$SNAP_COMMON/pgsql/data" + export LC_ALL="C" export LC_CTYPE="C" @@ -61,33 +63,14 @@ immich_server_ready() { curl -s "$IMMICH_SERVER_URL/server-info/ping" -o /dev/null } -block_immich_server_ready() { - while ! immich_server_ready; do - sleep 2 - done -} - immich_api() { curl -s "$IMMICH_SERVER_URL/$2" -H 'Accept: application/json' -H "x-api-key: $1" } -immich_api_delete() { - curl -s "$IMMICH_SERVER_URL/$2" -X DELETE -H "Accept: application/json" \ - -H "Content-Type: application/json" -H "x-api-key: $1" \ - -d "$3" -} - user_id() { immich_api "$1" user/me | jq -r .id } -immich_delete_asset() { - local key="$1" - local asset_id="$2" - - immich_api_delete "$key" "asset" '{"ids": ["'$asset_id'"]}' -} - root_check() { if [ x$UID != x0 ]; then echo "You need to run this command as root (with sudo)" @@ -104,3 +87,15 @@ banner() { lego_staging_string() { [ x$ACME_STAGING == xtrue ] && echo "--server=https://acme-staging-v02.api.letsencrypt.org/directory" } + +sync_enabled() { + snapctl get sync-enabled | grep -q "true" +} + +query_db() { + if [ -z $SNAP ]; then + cat - | sudo immich-distribution.psql -d immich $@ + else + cat - | $SNAP/bin/psql -d immich $@ + fi +} diff --git a/src/bin/sync b/src/bin/sync deleted file mode 100755 index bcb5057..0000000 --- a/src/bin/sync +++ /dev/null @@ -1,108 +0,0 @@ -#!/bin/bash - -if [ -z $SNAP ]; then - . src/bin/load-env -else - . $SNAP/bin/load-env -fi - -export PGDATA="$SNAP_COMMON/pgsql/data" - -query_db() { - if [ -z $SNAP ]; then - cat - | sudo immich-distribution.psql -d immich $@ - else - cat - | $SNAP/bin/psql -d immich $@ - fi -} - -last_removed_file_by_user_from_immich_db() { - echo " - SELECT assets_filesync_lookup.asset_path, assets_delete_audits.asset_id - FROM assets_delete_audits - INNER JOIN assets_filesync_lookup - ON assets_delete_audits.checksum = assets_filesync_lookup.checksum - WHERE assets_filesync_lookup.user_id = '$1' - AND assets_delete_audits.file_removed = 'false' - ORDER BY changed_on desc - LIMIT 1 - " | query_db -At -} - -mark_file_as_removed_in_immich_db() { - local asset="$1" - - echo " - UPDATE assets_delete_audits - SET file_removed = 'true' - WHERE asset_id = '$asset' - " | query_db -} - -save_hash_to_db() { - local user="$1" - local path="$2" - local hash="$(echo $3 | tr -cd '[:digit:]a-f')" - - echo " - INSERT INTO - assets_filesync_lookup(user_id, asset_path, checksum) - VALUES('$user','$path',E'\\\\x$hash') - ON CONFLICT (user_id, asset_path) DO - UPDATE SET checksum = E'\\\\x$hash' - WHERE assets_filesync_lookup.asset_path = '$path' - AND assets_filesync_lookup.user_id = '$user'; - " | query_db -} - -get_asset_by_path_from_db() { - local user="$1" - local path="$2" - - echo " - SELECT assets.id FROM assets - INNER JOIN assets_filesync_lookup - ON assets.checksum = assets_filesync_lookup.checksum - WHERE assets_filesync_lookup.asset_path = '$path' - AND assets_filesync_lookup.user_id = '$user' - " | query_db -At -} - -extract_user_id_from_path() { - echo "$1" | sed 's,.*/sync/,,' | sed 's,/.*,,' -} - -extract_relative_path() { - echo "$1" | sed 's,.*/sync/[^/]*/,,' -} - -upload() { - local key="$1" - shift - $SNAP/bin/immich-cli upload --key "$key" --yes "$@" -} - -get_keys() { - snapctl get sync -} - -sync_enabled() { - snapctl get sync-enabled | grep -q "true" -} - -block_sync_enabled() { - while ! sync_enabled; do - sleep 3600 - done - - echo "Starting service..." -} - -ignore_path() { - local name="$(basename "$1")" - - [[ "${name:0:1}" == "." ]] && return 0 - [ -d "$1" ] && return 0 - - return 1 -} diff --git a/src/bin/sync-created-watch b/src/bin/sync-created-watch deleted file mode 100755 index 68396b2..0000000 --- a/src/bin/sync-created-watch +++ /dev/null @@ -1,51 +0,0 @@ -#!/bin/bash - -if [ -z $SNAP ]; then - . src/bin/sync -else - . $SNAP/bin/sync -fi - -block_sync_enabled -block_immich_server_ready - -watch_created() { - cat - | while read -r changed_path; do - if ! ignore_path "$changed_path"; then - - init_file_size="$(stat --printf="%s" "$changed_path")" - while true; do - sleep 1 - current_file_size="$(stat --printf="%s" "$changed_path")" - if [ "$init_file_size" -eq "$current_file_size" ]; then - break - else - init_file_size="$current_file_size" - fi - done - - file_hash="$(sha1sum "$changed_path")" - file_hash="${file_hash% *}" - user_id="$(extract_user_id_from_path "$changed_path")" - path="$(extract_relative_path "$changed_path")" - - save_hash_to_db "$user_id" "$path" "$file_hash" - upload "$KEY" "$changed_path" - fi - done -} - -for KEY in $(get_keys); do - USER_ID="$(user_id $KEY)" - - if [ ! -z "$USER_ID" ]; then - USER_PATH="$SNAP_COMMON/sync/$USER_ID" - mkdir -p "$USER_PATH" - - { - echo "Watch for created events in $USER_PATH" - fswatch --recursive --event Created --event Updated "$USER_PATH" | watch_created - } & - fi -done -wait diff --git a/src/bin/sync-database-watch b/src/bin/sync-database-watch deleted file mode 100755 index 93936d4..0000000 --- a/src/bin/sync-database-watch +++ /dev/null @@ -1,53 +0,0 @@ -#!/bin/bash - -if [ -z $SNAP ]; then - . src/bin/sync -else - . $SNAP/bin/sync -fi - -block_sync_enabled -block_immich_server_ready - -if [ ! -z $SNAP ]; then - while ! $SNAP/usr/local/pgsql/bin/pg_isready 2>&1 > /dev/null; do - echo "Postgres not ready yet, waiting ..." - sleep 2 - done - - cat $SNAP/etc/modify-db.sql | query_db -fi - -for KEY in $(get_keys); do - { - USER_ID="$(user_id $KEY)" - - if [ ! -z "$USER_ID" ]; then - USER_PATH="$SNAP_COMMON/sync/$USER_ID" - mkdir -p "$USER_PATH" - - echo "Watching Immich database for deleted images ..." - while sleep 10; do - record="$(last_removed_file_by_user_from_immich_db "$USER_ID")" - - if [ -z "$record" ]; then - continue - fi - - file_path="$(echo "$record" | cut -d '|' -f 1)" - asset_id="$(echo "$record" | cut -d '|' -f 2)" - - if [ -e "$USER_PATH/$file_path" ]; then - echo "Deleting $USER_PATH/$file_path" - if rm "$USER_PATH/$file_path"; then - mark_file_as_removed_in_immich_db "$asset_id" - fi - else - echo "File $USER_PATH/$file_path already deleted. Set asset $asset_id as removed in database." - mark_file_as_removed_in_immich_db "$asset_id" - fi - done - fi - } & -done -wait diff --git a/src/bin/sync-import-watch b/src/bin/sync-import-watch index e58b19e..4a0f1c5 100755 --- a/src/bin/sync-import-watch +++ b/src/bin/sync-import-watch @@ -1,24 +1,17 @@ #!/bin/bash -if [ -z $SNAP ]; then - . src/bin/sync -else - . $SNAP/bin/sync -fi - -if [ -z $SNAP ]; then - echo "Abort: Only run this inside the snap" - exit 0 -fi +. $SNAP/bin/load-env if ! sync_enabled; then echo "Folder sync disabled" exit 0 fi -block_immich_server_ready +while ! immich_server_ready; do + sleep 2 +done -for KEY in $(get_keys); do +for KEY in $(snapctl get sync); do USER_ID="$(user_id $KEY)" if [ ! -z "$USER_ID" ]; then @@ -26,6 +19,6 @@ for KEY in $(get_keys); do mkdir -p "$USER_PATH" echo "Full import of $USER_PATH for user $USER_ID" - upload "$KEY" --recursive "$USER_PATH" + $SNAP/bin/immich-cli upload --recursive --key "$KEY" --yes "$USER_PATH" fi done diff --git a/src/bin/sync-removed-watch b/src/bin/sync-removed-watch deleted file mode 100755 index a0c7b77..0000000 --- a/src/bin/sync-removed-watch +++ /dev/null @@ -1,53 +0,0 @@ -#!/bin/bash - -if [ -z $SNAP ]; then - . src/bin/sync -else - . $SNAP/bin/sync -fi - -block_sync_enabled -block_immich_server_ready - - -watch_removed() { - cat - | while read -r removed_path; do - if ! ignore_path "$removed_path"; then - user_id="$(extract_user_id_from_path "$removed_path")" - path="$(extract_relative_path "$removed_path")" - asset_id="$(get_asset_by_path_from_db "$user_id" "$path")" - - if [ -z "$asset_id" ]; then - echo "ERROR: Asset not found in database (user $user_id, path $path)" - continue - fi - - if ! immich_api "$KEY" "asset/assetById/$asset_id" | jq -e .id > /dev/null; then - echo "Asset $asset_id not found in Immich (user $user_id, path $path)" - continue - fi - - status="$(immich_delete_asset "$KEY" "$asset_id" | jq -r .[].status)" - if [ "x$status" == "xSUCCESS" ]; then - echo "Asset $asset_id removed from Immich (user $user_id, path $path)" - else - echo "ERROR: Asset $asset_id not removed from Immich (user $user_id, path $path)" - fi - fi - done -} - -for KEY in $(get_keys); do - USER_ID="$(user_id $KEY)" - - if [ ! -z "$USER_ID" ]; then - USER_PATH="$SNAP_COMMON/sync/$USER_ID" - mkdir -p "$USER_PATH" - - { - echo "Watch for removed events in $USER_PATH" - fswatch --recursive --event Removed "$USER_PATH" | watch_removed - } & - fi -done -wait diff --git a/src/bin/sync-service.py b/src/bin/sync-service.py new file mode 100644 index 0000000..2fde65c --- /dev/null +++ b/src/bin/sync-service.py @@ -0,0 +1,241 @@ +import os +import hashlib +import subprocess +import requests +import threading +import time +import signal + +from watchfiles import watch, Change + +import psycopg2 +import psycopg2.extras + +def log(msg: str): + print(msg, flush=True) + +class ImmichDatabase: + def __init__(self, host: str, database: str, user: str, password: str, port: int): + self.conn = psycopg2.connect(host=host, database=database, user=user, password=password, port=port) + self.conn.set_client_encoding('UTF8') + + def last_removed_asset(self, user_id: str) -> list[psycopg2.extras.RealDictRow]: + with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute(""" + SELECT + assets_filesync_lookup.asset_path, + assets_delete_audits.asset_id + FROM assets_delete_audits + INNER JOIN assets_filesync_lookup + ON assets_delete_audits.checksum = assets_filesync_lookup.checksum + AND assets_delete_audits.user_id = assets_filesync_lookup.user_id + WHERE assets_filesync_lookup.user_id = %s + AND assets_delete_audits.file_removed = 'false' + ORDER BY changed_on desc + LIMIT 1 + """, (user_id,)) + + return cur.fetchall() + + def set_asset_removed(self, asset_id: str) -> None: + with self.conn.cursor() as cur: + cur.execute(""" + UPDATE assets_delete_audits + SET file_removed = 'true' + WHERE asset_id = %s + """, (asset_id,)) + self.conn.commit() + + def save_hash(self, user_id: str, asset_path: str, checksum: bytes) -> None: + with self.conn.cursor() as cur: + cur.execute(""" + INSERT INTO + assets_filesync_lookup(user_id, asset_path, checksum) + VALUES(%s, %s, %s) + ON CONFLICT (user_id, asset_path) DO + UPDATE SET checksum = %s + WHERE assets_filesync_lookup.asset_path = %s + AND assets_filesync_lookup.user_id = %s; + """, + (user_id, asset_path, checksum, + checksum, asset_path, user_id)) + self.conn.commit() + + def get_asset_id_by_path(self, user_id: str, asset_path: str) -> psycopg2.extras.RealDictRow | None: + with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute(""" + SELECT assets.id + FROM assets + INNER JOIN assets_filesync_lookup + ON assets.checksum = assets_filesync_lookup.checksum + WHERE assets_filesync_lookup.asset_path = %s + AND assets_filesync_lookup.user_id = %s + """, (asset_path, user_id)) + return cur.fetchone() + + def close(self): + self.conn.commit() + self.conn.close() + +class ImmichAPI: + def __init__(self, host: str, api_key: str): + self.host = host + self.headers = { + "Accept": "application/json", + "Content-Type": "application/json", + "x-api-key": api_key + } + + def get_user_id(self) -> str: + r = requests.get(f"{self.host}/user/me", headers=self.headers) + return r.json()["id"] + + def delete_asset(self, asset_id: str) -> dict: + data = { "ids": [ asset_id ] } + r = requests.delete(f"{self.host}/asset", headers=self.headers, json=data) + return r.json() + +def hash_file(path: str) -> bytes: + file_hash = hashlib.sha1() + with open(path, "rb") as f: + fb = f.read(2048) + while len(fb) > 0: + file_hash.update(fb) + fb = f.read(2048) + return file_hash.digest() + +def ignored_paths(path: str) -> bool: + if os.path.basename(path).startswith("."): + return True + + if os.path.isdir(path): + return True + + return False + +def hash_all_files(db: ImmichDatabase, user_id: str, path: str) -> None: + for root, _, files in os.walk(path): + for file in files: + file_path = os.path.join(root, file) + relative_path = os.path.relpath(file_path, path) + db.save_hash(user_id, relative_path, hash_file(file_path)) + log(f"Hash {file_path} and store in database") + +def import_asset(db: ImmichDatabase, api: ImmichAPI, key: str, base_path: str, asset_path: str) -> None: + snap_path = os.getenv("SNAP") + relative_path = os.path.relpath(asset_path, base_path) + import_command = [ + f"{snap_path}/bin/immich-cli", "upload", + "--server", os.getenv("IMMICH_SERVER_ADDRESS"), + "--key", key, + "--yes", + asset_path + ] + + if snap_path: + result = subprocess.run(import_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + else: + result = subprocess.CompletedProcess([], 0) + log(f"MOC: {import_command}") + + if result and result.returncode != 0: + log(f"Error: Failed to import {asset_path}") + log(f"CLI (stdout): {result.stdout.decode('utf-8')}") + log(f"CLI (stderr): {result.stderr.decode('utf-8')}") + else: + checksum = hash_file(asset_path) + user_id = api.get_user_id() + db.save_hash(user_id, relative_path, checksum) + log(f"Hash {relative_path} and store in database for user {user_id})") + +def delete_asset(db: ImmichDatabase, api: ImmichAPI, asset_path: str, base_path: str) -> None: + relative_path = os.path.relpath(asset_path, base_path) + user_id = api.get_user_id() + asset = db.get_asset_id_by_path(user_id, relative_path) + if asset: + log(f"Asset {asset['id']} removed from database") + api.delete_asset(asset["id"]) + else: + log(f"Asset {relative_path} not found in database") + +def file_watcher(event: threading.Event, db: ImmichDatabase, api: ImmichAPI, api_key: str, user_path: str) -> None: + log("File watcher thread running...") + for changes in watch(user_path, recursive=True, stop_event=event): + for c_type, c_path in changes: + + if ignored_paths(c_path): + continue + + if c_type == Change.added: + log(f"{c_path} added, import asset to Immich") + import_asset(db, api, api_key, user_path, c_path) + elif c_type == Change.modified: + log(f"{c_path} modified, re-import asset to Immich") + import_asset(db, api, api_key, user_path, c_path) + elif c_type == Change.deleted: + log(f"{c_path} deleted, mark asset as removed") + delete_asset(db, api, c_path, user_path) + +def database_watcher(event: threading.Event, db: ImmichDatabase, api: ImmichAPI, user_path: str) -> None: + log("Database watcher thread running...") + user_id = api.get_user_id() + while not event.is_set(): + for record in db.last_removed_asset(user_id): + asset_id = record['asset_id'] + asset_path = record['asset_path'] + full_path = f"{user_path}/{asset_path}" + if os.path.exists(full_path): + log(f"Remove asset {asset_id} user {user_id} path {asset_path}") + os.remove(full_path) + else: + log(f"Asset {asset_id} user {user_id} path {asset_path} already removed") + log(f"Mark asset {asset_id} as removed") + db.set_asset_removed(asset_id) + time.sleep(5) + +def main(): + db = ImmichDatabase( + host=os.environ["DB_HOSTNAME"], + database=os.environ["DB_DATABASE_NAME"], + user=os.environ["DB_USERNAME"], + password=os.environ["DB_PASSWORD"], + port=5432 + ) + + api_key = os.environ["IMMICH_API_KEY"] + immich = ImmichAPI(os.environ["IMMICH_SERVER_URL"], api_key) + snap_common = os.environ["SNAP_COMMON"] + user_id = immich.get_user_id() + user_path = f"{snap_common}/sync/{user_id}" + + log(f"Starting sync for user {user_id} at {user_path}") + + log(f"Initial file hash import of all files in {user_path}") + hash_all_files(db, user_id, user_path) + + stop_event = threading.Event() + + watch_thread = threading.Thread( + target=file_watcher, + args=(stop_event, db, immich, api_key, user_path) + ) + + database_thread = threading.Thread( + target=database_watcher, + args=(stop_event, db, immich, user_path) + ) + + watch_thread.start() + database_thread.start() + + signal.signal(signal.SIGTERM, lambda signum, frame: stop_event.set()) + + while True: + if not watch_thread.is_alive(): + log("Critical: Thread watch is not alive") + if not database_thread.is_alive(): + log("Critical: Thread database is not alive") + time.sleep(10) + +if __name__ == '__main__': + main() diff --git a/src/bin/sync-service.sh b/src/bin/sync-service.sh new file mode 100755 index 0000000..7867f57 --- /dev/null +++ b/src/bin/sync-service.sh @@ -0,0 +1,30 @@ +#!/bin/bash + +. $SNAP/bin/load-env + +export PGDATA="$SNAP_COMMON/pgsql/data" +export PYTHONPATH="$SNAP/opt/pipsyncenv" + +while ! sync_enabled; do + sleep 3600 +done + +while ! immich_server_ready; do + echo "Immich server not ready yet, waiting ..." + sleep 10 +done + +while ! $SNAP/usr/local/pgsql/bin/pg_isready 2>&1 > /dev/null; do + echo "Postgres not ready yet, waiting ..." + sleep 2 +done + +cat $SNAP/etc/modify-db.sql | $SNAP/bin/psql -d immich + +for KEY in $(snapctl get sync); do + { + IMMICH_API_KEY="$KEY" $SNAP/usr/local/bin/python3 $SNAP/bin/sync-service.py + } & +done + +wait diff --git a/src/etc/modify-db.sql b/src/etc/modify-db.sql index 3b5578a..4aa45bb 100755 --- a/src/etc/modify-db.sql +++ b/src/etc/modify-db.sql @@ -1,3 +1,4 @@ +-- A audit lof of deleted assets CREATE TABLE IF NOT EXISTS assets_delete_audits ( id INT GENERATED ALWAYS AS IDENTITY, asset_id UUID NOT NULL, @@ -7,6 +8,7 @@ CREATE TABLE IF NOT EXISTS assets_delete_audits ( changed_on TIMESTAMP(6) NOT NULL ); +-- A lookup table for assets that are imported from the file system via sync feature CREATE TABLE IF NOT EXISTS assets_filesync_lookup ( id INT GENERATED ALWAYS AS IDENTITY, user_id VARCHAR(256) NULL, @@ -15,6 +17,7 @@ CREATE TABLE IF NOT EXISTS assets_filesync_lookup ( UNIQUE(user_id, asset_path) ); +-- Function that logs the deletion of assets CREATE OR REPLACE FUNCTION log_assets_delete_audits() RETURNS TRIGGER LANGUAGE PLPGSQL @@ -27,7 +30,12 @@ BEGIN END; $$; +-- Trigger that calls the function above on deletetion of assets CREATE OR REPLACE TRIGGER trigger_assets_delete_audits BEFORE DELETE ON assets FOR EACH ROW EXECUTE PROCEDURE log_assets_delete_audits(); + +-- Clean up deprecated work queue table +-- Remove this later when all installations have updated +DROP table IF EXISTS sync_work_queue;