From 8885ea91cbfd442f4939f7401ab3b670ab0ced00 Mon Sep 17 00:00:00 2001 From: Stefan Berggren Date: Thu, 8 Jun 2023 20:27:02 +0200 Subject: [PATCH 01/10] Fix hash calc for filenames with whitespace --- src/bin/sync-created-watch | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bin/sync-created-watch b/src/bin/sync-created-watch index 68396b2..9a8abdf 100755 --- a/src/bin/sync-created-watch +++ b/src/bin/sync-created-watch @@ -25,7 +25,7 @@ watch_created() { done file_hash="$(sha1sum "$changed_path")" - file_hash="${file_hash% *}" + file_hash="${file_hash%% *}" user_id="$(extract_user_id_from_path "$changed_path")" path="$(extract_relative_path "$changed_path")" From 05302d337552690c400f6bdb12d0462f0e43ad08 Mon Sep 17 00:00:00 2001 From: Stefan Berggren Date: Thu, 8 Jun 2023 20:27:48 +0200 Subject: [PATCH 02/10] Hash and import existing files to database --- src/bin/sync-database-watch | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/bin/sync-database-watch b/src/bin/sync-database-watch index 93936d4..2abe825 100755 --- a/src/bin/sync-database-watch +++ b/src/bin/sync-database-watch @@ -26,6 +26,21 @@ for KEY in $(get_keys); do USER_PATH="$SNAP_COMMON/sync/$USER_ID" mkdir -p "$USER_PATH" + { + echo "Initial import of $USER_PATH ..." + find "$USER_PATH" -type f | while read -r file_path; do + if ! ignore_path "$file_path"; then + file_hash="$(sha1sum "$file_path")" + file_hash="${file_hash%% *}" + user_id="$(extract_user_id_from_path "$file_path")" + path="$(extract_relative_path "$file_path")" + + echo "Import: $path (user: $user_id, hash: $file_hash)" + save_hash_to_db "$user_id" "$path" "$file_hash" + fi + done + } + echo "Watching Immich database for deleted images ..." while sleep 10; do record="$(last_removed_file_by_user_from_immich_db "$USER_ID")" From aadfde198766cf3e686a69e250d5ff1617f05d9b Mon Sep 17 00:00:00 2001 From: Stefan Berggren Date: Fri, 9 Jun 2023 16:29:29 +0200 Subject: [PATCH 03/10] Only fetch files for the current user --- src/bin/sync | 1 + 1 file changed, 1 insertion(+) diff --git a/src/bin/sync b/src/bin/sync index bcb5057..5851fc3 100755 --- a/src/bin/sync +++ b/src/bin/sync @@ -22,6 +22,7 @@ last_removed_file_by_user_from_immich_db() { FROM assets_delete_audits INNER JOIN assets_filesync_lookup ON assets_delete_audits.checksum = assets_filesync_lookup.checksum + AND assets_delete_audits.user_id = assets_filesync_lookup.user_id WHERE assets_filesync_lookup.user_id = '$1' AND assets_delete_audits.file_removed = 'false' ORDER BY changed_on desc From 8bcc09ea38537f54316ae8c03ae061f59c1a763f Mon Sep 17 00:00:00 2001 From: Stefan Berggren Date: Sun, 11 Jun 2023 18:18:16 +0200 Subject: [PATCH 04/10] WIP This more or less works but the need of this hack shows that I should probably have chosen another path. --- src/bin/sync | 32 ++++++++++++++++++++++++++++++++ src/bin/sync-created-watch | 9 +++++++-- src/bin/sync-database-watch | 28 +++++++++++++++++++++++++++- src/etc/modify-db.sql | 14 ++++++++++++++ 4 files changed, 80 insertions(+), 3 deletions(-) diff --git a/src/bin/sync b/src/bin/sync index 5851fc3..6a026f8 100755 --- a/src/bin/sync +++ b/src/bin/sync @@ -40,6 +40,38 @@ mark_file_as_removed_in_immich_db() { " | query_db } +add_work_queue() { + local identifier="$1" + local user_id="$2" + local payload="$3" + + echo " + INSERT INTO sync_work_queue(identifier, user_id, payload, changed_on) + VALUES('$identifier', '$user_id', '$payload', NOW()) + " | query_db +} + +last_item_from_work_queue() { + echo " + SELECT id, identifier, payload + FROM sync_work_queue + WHERE completed = 'false' + AND user_id = '$1' + ORDER BY changed_on desc + LIMIT 1 + " | query_db -At +} + +mark_work_queue_item_as_done() { + local id="$1" + + echo " + UPDATE sync_work_queue + SET completed = 'true' + WHERE id = '$id' + " | query_db +} + save_hash_to_db() { local user="$1" local path="$2" diff --git a/src/bin/sync-created-watch b/src/bin/sync-created-watch index 9a8abdf..aeac79f 100755 --- a/src/bin/sync-created-watch +++ b/src/bin/sync-created-watch @@ -29,8 +29,13 @@ watch_created() { user_id="$(extract_user_id_from_path "$changed_path")" path="$(extract_relative_path "$changed_path")" - save_hash_to_db "$user_id" "$path" "$file_hash" - upload "$KEY" "$changed_path" + payload=$(jq --null-input \ + --arg path "$path" \ + --arg file_hash "$file_hash" \ + '{"path": $path, "file_hash": $file_hash}' + ) + + add_work_queue "add-file" "$user_id" "$payload" fi done } diff --git a/src/bin/sync-database-watch b/src/bin/sync-database-watch index 2abe825..b70d6c1 100755 --- a/src/bin/sync-database-watch +++ b/src/bin/sync-database-watch @@ -41,8 +41,11 @@ for KEY in $(get_keys); do done } - echo "Watching Immich database for deleted images ..." + echo "Watching Immich database for changes ..." while sleep 10; do + + # Look in the database for deleted assets, remove the file from the + # filesystem and mark the asset as removed in the database record="$(last_removed_file_by_user_from_immich_db "$USER_ID")" if [ -z "$record" ]; then @@ -61,6 +64,29 @@ for KEY in $(get_keys); do echo "File $USER_PATH/$file_path already deleted. Set asset $asset_id as removed in database." mark_file_as_removed_in_immich_db "$asset_id" fi + + # Look in the database for new tasks in the work queue + record="$(last_item_from_work_queue "$USER_ID")" + + if [ -z "$record" ]; then + continue + fi + + work_queue_id="$(echo "$record" | cut -d '|' -f 1)" + work_queue_identifier="$(echo "$record" | cut -d '|' -f 2)" + work_queue_payload="$(echo "$record" | cut -d '|' -f 3)" + + if [ "$work_queue_identifier" == "add-file" ]; then + # Process newly added files to the sync folder + + file_path="$(echo "$work_queue_payload" | jq -r '.path')" + file_hash="$(echo "$work_queue_payload" | jq -r '.file_hash')" + + save_hash_to_db "$USER_ID" "$file_path" "$file_hash" + upload "$KEY" "$file_path" + mark_work_queue_item_as_done "$work_queue_id" + fi + done fi } & diff --git a/src/etc/modify-db.sql b/src/etc/modify-db.sql index 3b5578a..c90d49c 100755 --- a/src/etc/modify-db.sql +++ b/src/etc/modify-db.sql @@ -1,3 +1,4 @@ +-- A audit lof of deleted assets CREATE TABLE IF NOT EXISTS assets_delete_audits ( id INT GENERATED ALWAYS AS IDENTITY, asset_id UUID NOT NULL, @@ -7,6 +8,7 @@ CREATE TABLE IF NOT EXISTS assets_delete_audits ( changed_on TIMESTAMP(6) NOT NULL ); +-- A lookup table for assets that are imported from the file system via sync feature CREATE TABLE IF NOT EXISTS assets_filesync_lookup ( id INT GENERATED ALWAYS AS IDENTITY, user_id VARCHAR(256) NULL, @@ -15,6 +17,17 @@ CREATE TABLE IF NOT EXISTS assets_filesync_lookup ( UNIQUE(user_id, asset_path) ); +-- A queue used by the sync feature to import assets from the file system +CREATE TABLE IF NOT EXISTS sync_work_queue ( + id INT GENERATED ALWAYS AS IDENTITY, + user_id VARCHAR(256) NULL, + identifier VARCHAR(256) NULL, + payload TEXT NOT NULL, + completed BOOLEAN DEFAULT FALSE, + changed_on TIMESTAMP(6) NOT NULL +); + +-- Function that logs the deletion of assets CREATE OR REPLACE FUNCTION log_assets_delete_audits() RETURNS TRIGGER LANGUAGE PLPGSQL @@ -27,6 +40,7 @@ BEGIN END; $$; +-- Trigger that calls the function above on deletetion of assets CREATE OR REPLACE TRIGGER trigger_assets_delete_audits BEFORE DELETE ON assets FOR EACH ROW From 05924ac1273fba05375f2c4aed4993196bbf9fe2 Mon Sep 17 00:00:00 2001 From: Stefan Berggren Date: Sat, 17 Jun 2023 11:02:49 +0200 Subject: [PATCH 05/10] Sync service in pure python --- snap/hooks/configure | 5 +- snap/snapcraft.yaml | 29 ++--- src/bin/load-env | 33 +++--- src/bin/sync | 141 ---------------------- src/bin/sync-created-watch | 56 --------- src/bin/sync-database-watch | 94 --------------- src/bin/sync-import-watch | 21 ++-- src/bin/sync-removed-watch | 53 --------- src/bin/sync-service.py | 231 ++++++++++++++++++++++++++++++++++++ src/bin/sync-service.sh | 30 +++++ src/etc/modify-db.sql | 14 +-- 11 files changed, 299 insertions(+), 408 deletions(-) delete mode 100755 src/bin/sync delete mode 100755 src/bin/sync-created-watch delete mode 100755 src/bin/sync-database-watch delete mode 100755 src/bin/sync-removed-watch create mode 100644 src/bin/sync-service.py create mode 100755 src/bin/sync-service.sh diff --git a/snap/hooks/configure b/snap/hooks/configure index f5f407e..ed03702 100755 --- a/snap/hooks/configure +++ b/snap/hooks/configure @@ -19,7 +19,4 @@ fi snapctl restart immich-distribution.postgres snapctl restart immich-distribution.typesense snapctl restart immich-distribution.haproxy - -snapctl restart immich-distribution.sync-removed-fs -snapctl restart immich-distribution.sync-created-fs -snapctl restart immich-distribution.sync-database-watch +snapctl restart immich-distribution.sync-service diff --git a/snap/snapcraft.yaml b/snap/snapcraft.yaml index 5c0bc28..5a05394 100644 --- a/snap/snapcraft.yaml +++ b/snap/snapcraft.yaml @@ -132,26 +132,8 @@ apps: plugs: - network - sync-removed-fs: - command: bin/sync-removed-watch - daemon: simple - restart-delay: 10s - after: - - immich-server - plugs: - - network - - sync-created-fs: - command: bin/sync-created-watch - daemon: simple - restart-delay: 10s - after: - - immich-server - plugs: - - network - - sync-database-watch: - command: bin/sync-database-watch + sync-service: + command: bin/sync-service.sh daemon: simple restart-delay: 10s after: @@ -373,6 +355,13 @@ parts: - jq - xz-utils + sync: + plugin: nil + override-build: + ../../python/install/usr/local/bin/python3 ../../python/install/usr/local/bin/pip3 install watchfiles requests psycopg2-binary --target $SNAPCRAFT_PART_INSTALL/opt/pipsyncenv + after: + - python + patches: source: patches plugin: dump diff --git a/src/bin/load-env b/src/bin/load-env index 04d025f..1388ea5 100755 --- a/src/bin/load-env +++ b/src/bin/load-env @@ -10,6 +10,8 @@ snapctl() { export LD_LIBRARY_PATH="$SNAP/usr/local/pgsql/lib:$SNAP/usr/lib/x86_64-linux-gnu/pulseaudio:$SNAP/usr/lib/x86_64-linux-gnu/libfswatch:$LD_LIBRARY_PATH" +export PGDATA="$SNAP_COMMON/pgsql/data" + export LC_ALL="C" export LC_CTYPE="C" @@ -61,33 +63,14 @@ immich_server_ready() { curl -s "$IMMICH_SERVER_URL/server-info/ping" -o /dev/null } -block_immich_server_ready() { - while ! immich_server_ready; do - sleep 2 - done -} - immich_api() { curl -s "$IMMICH_SERVER_URL/$2" -H 'Accept: application/json' -H "x-api-key: $1" } -immich_api_delete() { - curl -s "$IMMICH_SERVER_URL/$2" -X DELETE -H "Accept: application/json" \ - -H "Content-Type: application/json" -H "x-api-key: $1" \ - -d "$3" -} - user_id() { immich_api "$1" user/me | jq -r .id } -immich_delete_asset() { - local key="$1" - local asset_id="$2" - - immich_api_delete "$key" "asset" '{"ids": ["'$asset_id'"]}' -} - root_check() { if [ x$UID != x0 ]; then echo "You need to run this command as root (with sudo)" @@ -104,3 +87,15 @@ banner() { lego_staging_string() { [ x$ACME_STAGING == xtrue ] && echo "--server=https://acme-staging-v02.api.letsencrypt.org/directory" } + +sync_enabled() { + snapctl get sync-enabled | grep -q "true" +} + +query_db() { + if [ -z $SNAP ]; then + cat - | sudo immich-distribution.psql -d immich $@ + else + cat - | $SNAP/bin/psql -d immich $@ + fi +} diff --git a/src/bin/sync b/src/bin/sync deleted file mode 100755 index 6a026f8..0000000 --- a/src/bin/sync +++ /dev/null @@ -1,141 +0,0 @@ -#!/bin/bash - -if [ -z $SNAP ]; then - . src/bin/load-env -else - . $SNAP/bin/load-env -fi - -export PGDATA="$SNAP_COMMON/pgsql/data" - -query_db() { - if [ -z $SNAP ]; then - cat - | sudo immich-distribution.psql -d immich $@ - else - cat - | $SNAP/bin/psql -d immich $@ - fi -} - -last_removed_file_by_user_from_immich_db() { - echo " - SELECT assets_filesync_lookup.asset_path, assets_delete_audits.asset_id - FROM assets_delete_audits - INNER JOIN assets_filesync_lookup - ON assets_delete_audits.checksum = assets_filesync_lookup.checksum - AND assets_delete_audits.user_id = assets_filesync_lookup.user_id - WHERE assets_filesync_lookup.user_id = '$1' - AND assets_delete_audits.file_removed = 'false' - ORDER BY changed_on desc - LIMIT 1 - " | query_db -At -} - -mark_file_as_removed_in_immich_db() { - local asset="$1" - - echo " - UPDATE assets_delete_audits - SET file_removed = 'true' - WHERE asset_id = '$asset' - " | query_db -} - -add_work_queue() { - local identifier="$1" - local user_id="$2" - local payload="$3" - - echo " - INSERT INTO sync_work_queue(identifier, user_id, payload, changed_on) - VALUES('$identifier', '$user_id', '$payload', NOW()) - " | query_db -} - -last_item_from_work_queue() { - echo " - SELECT id, identifier, payload - FROM sync_work_queue - WHERE completed = 'false' - AND user_id = '$1' - ORDER BY changed_on desc - LIMIT 1 - " | query_db -At -} - -mark_work_queue_item_as_done() { - local id="$1" - - echo " - UPDATE sync_work_queue - SET completed = 'true' - WHERE id = '$id' - " | query_db -} - -save_hash_to_db() { - local user="$1" - local path="$2" - local hash="$(echo $3 | tr -cd '[:digit:]a-f')" - - echo " - INSERT INTO - assets_filesync_lookup(user_id, asset_path, checksum) - VALUES('$user','$path',E'\\\\x$hash') - ON CONFLICT (user_id, asset_path) DO - UPDATE SET checksum = E'\\\\x$hash' - WHERE assets_filesync_lookup.asset_path = '$path' - AND assets_filesync_lookup.user_id = '$user'; - " | query_db -} - -get_asset_by_path_from_db() { - local user="$1" - local path="$2" - - echo " - SELECT assets.id FROM assets - INNER JOIN assets_filesync_lookup - ON assets.checksum = assets_filesync_lookup.checksum - WHERE assets_filesync_lookup.asset_path = '$path' - AND assets_filesync_lookup.user_id = '$user' - " | query_db -At -} - -extract_user_id_from_path() { - echo "$1" | sed 's,.*/sync/,,' | sed 's,/.*,,' -} - -extract_relative_path() { - echo "$1" | sed 's,.*/sync/[^/]*/,,' -} - -upload() { - local key="$1" - shift - $SNAP/bin/immich-cli upload --key "$key" --yes "$@" -} - -get_keys() { - snapctl get sync -} - -sync_enabled() { - snapctl get sync-enabled | grep -q "true" -} - -block_sync_enabled() { - while ! sync_enabled; do - sleep 3600 - done - - echo "Starting service..." -} - -ignore_path() { - local name="$(basename "$1")" - - [[ "${name:0:1}" == "." ]] && return 0 - [ -d "$1" ] && return 0 - - return 1 -} diff --git a/src/bin/sync-created-watch b/src/bin/sync-created-watch deleted file mode 100755 index aeac79f..0000000 --- a/src/bin/sync-created-watch +++ /dev/null @@ -1,56 +0,0 @@ -#!/bin/bash - -if [ -z $SNAP ]; then - . src/bin/sync -else - . $SNAP/bin/sync -fi - -block_sync_enabled -block_immich_server_ready - -watch_created() { - cat - | while read -r changed_path; do - if ! ignore_path "$changed_path"; then - - init_file_size="$(stat --printf="%s" "$changed_path")" - while true; do - sleep 1 - current_file_size="$(stat --printf="%s" "$changed_path")" - if [ "$init_file_size" -eq "$current_file_size" ]; then - break - else - init_file_size="$current_file_size" - fi - done - - file_hash="$(sha1sum "$changed_path")" - file_hash="${file_hash%% *}" - user_id="$(extract_user_id_from_path "$changed_path")" - path="$(extract_relative_path "$changed_path")" - - payload=$(jq --null-input \ - --arg path "$path" \ - --arg file_hash "$file_hash" \ - '{"path": $path, "file_hash": $file_hash}' - ) - - add_work_queue "add-file" "$user_id" "$payload" - fi - done -} - -for KEY in $(get_keys); do - USER_ID="$(user_id $KEY)" - - if [ ! -z "$USER_ID" ]; then - USER_PATH="$SNAP_COMMON/sync/$USER_ID" - mkdir -p "$USER_PATH" - - { - echo "Watch for created events in $USER_PATH" - fswatch --recursive --event Created --event Updated "$USER_PATH" | watch_created - } & - fi -done -wait diff --git a/src/bin/sync-database-watch b/src/bin/sync-database-watch deleted file mode 100755 index b70d6c1..0000000 --- a/src/bin/sync-database-watch +++ /dev/null @@ -1,94 +0,0 @@ -#!/bin/bash - -if [ -z $SNAP ]; then - . src/bin/sync -else - . $SNAP/bin/sync -fi - -block_sync_enabled -block_immich_server_ready - -if [ ! -z $SNAP ]; then - while ! $SNAP/usr/local/pgsql/bin/pg_isready 2>&1 > /dev/null; do - echo "Postgres not ready yet, waiting ..." - sleep 2 - done - - cat $SNAP/etc/modify-db.sql | query_db -fi - -for KEY in $(get_keys); do - { - USER_ID="$(user_id $KEY)" - - if [ ! -z "$USER_ID" ]; then - USER_PATH="$SNAP_COMMON/sync/$USER_ID" - mkdir -p "$USER_PATH" - - { - echo "Initial import of $USER_PATH ..." - find "$USER_PATH" -type f | while read -r file_path; do - if ! ignore_path "$file_path"; then - file_hash="$(sha1sum "$file_path")" - file_hash="${file_hash%% *}" - user_id="$(extract_user_id_from_path "$file_path")" - path="$(extract_relative_path "$file_path")" - - echo "Import: $path (user: $user_id, hash: $file_hash)" - save_hash_to_db "$user_id" "$path" "$file_hash" - fi - done - } - - echo "Watching Immich database for changes ..." - while sleep 10; do - - # Look in the database for deleted assets, remove the file from the - # filesystem and mark the asset as removed in the database - record="$(last_removed_file_by_user_from_immich_db "$USER_ID")" - - if [ -z "$record" ]; then - continue - fi - - file_path="$(echo "$record" | cut -d '|' -f 1)" - asset_id="$(echo "$record" | cut -d '|' -f 2)" - - if [ -e "$USER_PATH/$file_path" ]; then - echo "Deleting $USER_PATH/$file_path" - if rm "$USER_PATH/$file_path"; then - mark_file_as_removed_in_immich_db "$asset_id" - fi - else - echo "File $USER_PATH/$file_path already deleted. Set asset $asset_id as removed in database." - mark_file_as_removed_in_immich_db "$asset_id" - fi - - # Look in the database for new tasks in the work queue - record="$(last_item_from_work_queue "$USER_ID")" - - if [ -z "$record" ]; then - continue - fi - - work_queue_id="$(echo "$record" | cut -d '|' -f 1)" - work_queue_identifier="$(echo "$record" | cut -d '|' -f 2)" - work_queue_payload="$(echo "$record" | cut -d '|' -f 3)" - - if [ "$work_queue_identifier" == "add-file" ]; then - # Process newly added files to the sync folder - - file_path="$(echo "$work_queue_payload" | jq -r '.path')" - file_hash="$(echo "$work_queue_payload" | jq -r '.file_hash')" - - save_hash_to_db "$USER_ID" "$file_path" "$file_hash" - upload "$KEY" "$file_path" - mark_work_queue_item_as_done "$work_queue_id" - fi - - done - fi - } & -done -wait diff --git a/src/bin/sync-import-watch b/src/bin/sync-import-watch index e58b19e..6b6d0b1 100755 --- a/src/bin/sync-import-watch +++ b/src/bin/sync-import-watch @@ -1,24 +1,23 @@ #!/bin/bash -if [ -z $SNAP ]; then - . src/bin/sync -else - . $SNAP/bin/sync -fi +. $SNAP/bin/load-env -if [ -z $SNAP ]; then - echo "Abort: Only run this inside the snap" - exit 0 -fi +upload() { + local key="$1" + shift + $SNAP/bin/immich-cli upload --key "$key" --yes "$@" +} if ! sync_enabled; then echo "Folder sync disabled" exit 0 fi -block_immich_server_ready +while ! immich_server_ready; do + sleep 2 +done -for KEY in $(get_keys); do +for KEY in $(snapctl get sync); do USER_ID="$(user_id $KEY)" if [ ! -z "$USER_ID" ]; then diff --git a/src/bin/sync-removed-watch b/src/bin/sync-removed-watch deleted file mode 100755 index a0c7b77..0000000 --- a/src/bin/sync-removed-watch +++ /dev/null @@ -1,53 +0,0 @@ -#!/bin/bash - -if [ -z $SNAP ]; then - . src/bin/sync -else - . $SNAP/bin/sync -fi - -block_sync_enabled -block_immich_server_ready - - -watch_removed() { - cat - | while read -r removed_path; do - if ! ignore_path "$removed_path"; then - user_id="$(extract_user_id_from_path "$removed_path")" - path="$(extract_relative_path "$removed_path")" - asset_id="$(get_asset_by_path_from_db "$user_id" "$path")" - - if [ -z "$asset_id" ]; then - echo "ERROR: Asset not found in database (user $user_id, path $path)" - continue - fi - - if ! immich_api "$KEY" "asset/assetById/$asset_id" | jq -e .id > /dev/null; then - echo "Asset $asset_id not found in Immich (user $user_id, path $path)" - continue - fi - - status="$(immich_delete_asset "$KEY" "$asset_id" | jq -r .[].status)" - if [ "x$status" == "xSUCCESS" ]; then - echo "Asset $asset_id removed from Immich (user $user_id, path $path)" - else - echo "ERROR: Asset $asset_id not removed from Immich (user $user_id, path $path)" - fi - fi - done -} - -for KEY in $(get_keys); do - USER_ID="$(user_id $KEY)" - - if [ ! -z "$USER_ID" ]; then - USER_PATH="$SNAP_COMMON/sync/$USER_ID" - mkdir -p "$USER_PATH" - - { - echo "Watch for removed events in $USER_PATH" - fswatch --recursive --event Removed "$USER_PATH" | watch_removed - } & - fi -done -wait diff --git a/src/bin/sync-service.py b/src/bin/sync-service.py new file mode 100644 index 0000000..e56fa85 --- /dev/null +++ b/src/bin/sync-service.py @@ -0,0 +1,231 @@ +import os +import hashlib +import subprocess +import requests +import threading +import time +import signal + +from watchfiles import watch, Change + +import psycopg2 +import psycopg2.extras + +def log(msg: str): + print(msg, flush=True) + +class ImmichDatabase: + def __init__(self, host, database, user, password, port): + self.conn = psycopg2.connect(host=host, database=database, user=user, password=password, port=port) + + def last_removed_asset(self, user_id): + with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute(""" + SELECT + assets_filesync_lookup.asset_path, + assets_delete_audits.asset_id + FROM assets_delete_audits + INNER JOIN assets_filesync_lookup + ON assets_delete_audits.checksum = assets_filesync_lookup.checksum + AND assets_delete_audits.user_id = assets_filesync_lookup.user_id + WHERE assets_filesync_lookup.user_id = %s + AND assets_delete_audits.file_removed = 'false' + ORDER BY changed_on desc + LIMIT 1 + """, (user_id,)) + + return cur.fetchall() + + def set_asset_removed(self, asset_id): + with self.conn.cursor() as cur: + cur.execute(""" + UPDATE assets_delete_audits + SET file_removed = 'true' + WHERE asset_id = %s + """, (asset_id,)) + self.conn.commit() + + def save_hash(self, user_id, asset_path, checksum): + with self.conn.cursor() as cur: + cur.execute(""" + INSERT INTO + assets_filesync_lookup(user_id, asset_path, checksum) + VALUES(%s, %s, %s) + ON CONFLICT (user_id, asset_path) DO + UPDATE SET checksum = %s + WHERE assets_filesync_lookup.asset_path = %s + AND assets_filesync_lookup.user_id = %s; + """, + (user_id, asset_path, checksum, + checksum, asset_path, user_id)) + self.conn.commit() + + def get_asset_id_by_path(self, user_id, asset_path): + with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute(""" + SELECT assets.id + FROM assets + INNER JOIN assets_filesync_lookup + ON assets.checksum = assets_filesync_lookup.checksum + WHERE assets_filesync_lookup.asset_path = %s + AND assets_filesync_lookup.user_id = %s + """, (asset_path, user_id)) + return cur.fetchone() + + def close(self): + self.conn.commit() + self.conn.close() + +class ImmichAPI: + def __init__(self, host, api_key): + self.host = host + self.headers = { + "Accept": "application/json", + "Content-Type": "application/json", + "x-api-key": api_key + } + + def get_user_id(self): + r = requests.get(f"{self.host}/user/me", headers=self.headers) + return r.json()["id"] + + def delete_asset(self, asset_id): + data = { "ids": [ asset_id ] } + r = requests.delete(f"{self.host}/asset", headers=self.headers, json=data) + return r.json() + +def hash_file(path): + file_hash = hashlib.sha1() + with open(path, "rb") as f: + fb = f.read(2048) + while len(fb) > 0: + file_hash.update(fb) + fb = f.read(2048) + return file_hash.digest() + +def ignored_paths(path): + if os.path.basename(path).startswith("."): + return True + + if os.path.isdir(path): + return True + +def hash_all_files(db, user_id, path): + for root, _, files in os.walk(path): + for file in files: + file_path = os.path.join(root, file) + relative_path = os.path.relpath(file_path, path) + db.save_hash(user_id, relative_path, hash_file(file_path)) + log(f"Hash for {file_path} stored in database") + +def import_asset(db, api, key, base_path, asset_path): + snap_path = os.getenv("SNAP") + relative_path = os.path.relpath(asset_path, base_path) + import_command = [ + f"{snap_path}/bin/immich-cli", "upload", + "--server", os.getenv("IMMICH_SERVER_ADDRESS"), + "--key", key, + "--yes", + asset_path + ] + + if snap_path: + result = subprocess.run(import_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + else: + result = subprocess.CompletedProcess([], 0) + log(f"MOC: {import_command}") + + if result and result.returncode != 0: + log(f"Error: Failed to import {asset_path}") + log(f"CLI (stdout): {result.stdout.decode('utf-8')}") + log(f"CLI (stderr): {result.stderr.decode('utf-8')}") + else: + checksum = hash_file(asset_path) + user_id = api.get_user_id() + db.save_hash(user_id, relative_path, checksum) + log(f"{relative_path} hash {checksum.hex()} user {user_id})") + +def delete_asset(db, api, asset_path, base_path): + relative_path = os.path.relpath(asset_path, base_path) + user_id = api.get_user_id() + asset = db.get_asset_id_by_path(user_id, relative_path) + if asset: + print(f"Asset {asset['id']} removed from database") + api.delete_asset(asset["id"]) + else: + print(f"ERROR: Asset {relative_path} not found in database") + +def file_watcher(event, db, api, api_key, user_path): + log("File watcher thread running...") + for changes in watch(user_path, recursive=True, stop_event=event): + for c_type, c_path in changes: + + if ignored_paths(c_path): + continue + + if c_type == Change.added: + log(f"{c_path} added, import asset to Immich") + import_asset(db, api, api_key, user_path, c_path) + elif c_type == Change.modified: + log(f"{c_path} modified, re-import asset to Immich") + import_asset(db, api, api_key, user_path, c_path) + elif c_type == Change.deleted: + log(f"{c_path} deleted, mark asset as removed") + delete_asset(db, api, c_path, user_path) + +def database_watcher(event, db, api, user_path): + log("Database watcher thread running...") + user_id = api.get_user_id() + while not event.is_set(): + for record in db.last_removed_asset(user_id): + asset_id = record['asset_id'] + asset_path = record['asset_path'] + full_path = f"{user_path}/{asset_path}" + if os.path.exists(full_path): + log(f"Remove asset {asset_id} user {user_id} path {asset_path}") + os.remove(full_path) + else: + log(f"Asset {asset_id} user {user_id} path {asset_path} already removed") + log(f"Mark asset {asset_id} as removed") + db.set_asset_removed(asset_id) + time.sleep(5) + +def main(): + db = ImmichDatabase( + host=os.getenv("DB_HOSTNAME"), + database=os.getenv("DB_DATABASE_NAME"), + user=os.getenv("DB_USERNAME"), + password=os.getenv("DB_PASSWORD"), + port=5432 + ) + + api_key = os.getenv("IMMICH_API_KEY") + immich = ImmichAPI(os.getenv("IMMICH_SERVER_URL"), api_key) + snap_common = os.getenv("SNAP_COMMON") + user_id = immich.get_user_id() + user_path = f"{snap_common}/sync/{user_id}" + + log(f"Starting sync for user {user_id} at {user_path}") + + log(f"Initial file hash import of all files in {user_path}") + hash_all_files(db, user_id, user_path) + + stop_event = threading.Event() + + watch_thread = threading.Thread( + target=file_watcher, + args=(stop_event, db, immich, api_key, user_path) + ) + + database_thread = threading.Thread( + target=database_watcher, + args=(stop_event, db, immich, user_path) + ) + + watch_thread.start() + database_thread.start() + + signal.signal(signal.SIGTERM, lambda signum, frame: stop_event.set()) + +if __name__ == '__main__': + main() diff --git a/src/bin/sync-service.sh b/src/bin/sync-service.sh new file mode 100755 index 0000000..7867f57 --- /dev/null +++ b/src/bin/sync-service.sh @@ -0,0 +1,30 @@ +#!/bin/bash + +. $SNAP/bin/load-env + +export PGDATA="$SNAP_COMMON/pgsql/data" +export PYTHONPATH="$SNAP/opt/pipsyncenv" + +while ! sync_enabled; do + sleep 3600 +done + +while ! immich_server_ready; do + echo "Immich server not ready yet, waiting ..." + sleep 10 +done + +while ! $SNAP/usr/local/pgsql/bin/pg_isready 2>&1 > /dev/null; do + echo "Postgres not ready yet, waiting ..." + sleep 2 +done + +cat $SNAP/etc/modify-db.sql | $SNAP/bin/psql -d immich + +for KEY in $(snapctl get sync); do + { + IMMICH_API_KEY="$KEY" $SNAP/usr/local/bin/python3 $SNAP/bin/sync-service.py + } & +done + +wait diff --git a/src/etc/modify-db.sql b/src/etc/modify-db.sql index c90d49c..4aa45bb 100755 --- a/src/etc/modify-db.sql +++ b/src/etc/modify-db.sql @@ -17,16 +17,6 @@ CREATE TABLE IF NOT EXISTS assets_filesync_lookup ( UNIQUE(user_id, asset_path) ); --- A queue used by the sync feature to import assets from the file system -CREATE TABLE IF NOT EXISTS sync_work_queue ( - id INT GENERATED ALWAYS AS IDENTITY, - user_id VARCHAR(256) NULL, - identifier VARCHAR(256) NULL, - payload TEXT NOT NULL, - completed BOOLEAN DEFAULT FALSE, - changed_on TIMESTAMP(6) NOT NULL -); - -- Function that logs the deletion of assets CREATE OR REPLACE FUNCTION log_assets_delete_audits() RETURNS TRIGGER @@ -45,3 +35,7 @@ CREATE OR REPLACE TRIGGER trigger_assets_delete_audits BEFORE DELETE ON assets FOR EACH ROW EXECUTE PROCEDURE log_assets_delete_audits(); + +-- Clean up deprecated work queue table +-- Remove this later when all installations have updated +DROP table IF EXISTS sync_work_queue; From c9352cc22aff9f1ace1402f36f31553dd7890ecf Mon Sep 17 00:00:00 2001 From: Stefan Berggren Date: Sat, 17 Jun 2023 12:13:53 +0200 Subject: [PATCH 06/10] Add types for better type checking --- src/bin/sync-service.py | 46 +++++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/src/bin/sync-service.py b/src/bin/sync-service.py index e56fa85..2fa99e7 100644 --- a/src/bin/sync-service.py +++ b/src/bin/sync-service.py @@ -15,10 +15,10 @@ def log(msg: str): print(msg, flush=True) class ImmichDatabase: - def __init__(self, host, database, user, password, port): + def __init__(self, host: str, database: str, user: str, password: str, port: int): self.conn = psycopg2.connect(host=host, database=database, user=user, password=password, port=port) - def last_removed_asset(self, user_id): + def last_removed_asset(self, user_id: str) -> list[psycopg2.extras.RealDictRow]: with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(""" SELECT @@ -36,7 +36,7 @@ def last_removed_asset(self, user_id): return cur.fetchall() - def set_asset_removed(self, asset_id): + def set_asset_removed(self, asset_id: str) -> None: with self.conn.cursor() as cur: cur.execute(""" UPDATE assets_delete_audits @@ -45,7 +45,7 @@ def set_asset_removed(self, asset_id): """, (asset_id,)) self.conn.commit() - def save_hash(self, user_id, asset_path, checksum): + def save_hash(self, user_id: str, asset_path: str, checksum: bytes) -> None: with self.conn.cursor() as cur: cur.execute(""" INSERT INTO @@ -60,7 +60,7 @@ def save_hash(self, user_id, asset_path, checksum): checksum, asset_path, user_id)) self.conn.commit() - def get_asset_id_by_path(self, user_id, asset_path): + def get_asset_id_by_path(self, user_id: str, asset_path: str) -> psycopg2.extras.RealDictRow | None: with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(""" SELECT assets.id @@ -77,7 +77,7 @@ def close(self): self.conn.close() class ImmichAPI: - def __init__(self, host, api_key): + def __init__(self, host: str, api_key: str): self.host = host self.headers = { "Accept": "application/json", @@ -85,16 +85,16 @@ def __init__(self, host, api_key): "x-api-key": api_key } - def get_user_id(self): + def get_user_id(self) -> str: r = requests.get(f"{self.host}/user/me", headers=self.headers) return r.json()["id"] - def delete_asset(self, asset_id): + def delete_asset(self, asset_id: str) -> dict: data = { "ids": [ asset_id ] } r = requests.delete(f"{self.host}/asset", headers=self.headers, json=data) return r.json() -def hash_file(path): +def hash_file(path: str) -> bytes: file_hash = hashlib.sha1() with open(path, "rb") as f: fb = f.read(2048) @@ -103,14 +103,16 @@ def hash_file(path): fb = f.read(2048) return file_hash.digest() -def ignored_paths(path): +def ignored_paths(path: str) -> bool: if os.path.basename(path).startswith("."): return True if os.path.isdir(path): return True + + return False -def hash_all_files(db, user_id, path): +def hash_all_files(db: ImmichDatabase, user_id: str, path: str) -> None: for root, _, files in os.walk(path): for file in files: file_path = os.path.join(root, file) @@ -118,7 +120,7 @@ def hash_all_files(db, user_id, path): db.save_hash(user_id, relative_path, hash_file(file_path)) log(f"Hash for {file_path} stored in database") -def import_asset(db, api, key, base_path, asset_path): +def import_asset(db: ImmichDatabase, api: ImmichAPI, key: str, base_path: str, asset_path: str) -> None: snap_path = os.getenv("SNAP") relative_path = os.path.relpath(asset_path, base_path) import_command = [ @@ -145,7 +147,7 @@ def import_asset(db, api, key, base_path, asset_path): db.save_hash(user_id, relative_path, checksum) log(f"{relative_path} hash {checksum.hex()} user {user_id})") -def delete_asset(db, api, asset_path, base_path): +def delete_asset(db: ImmichDatabase, api: ImmichAPI, asset_path: str, base_path: str) -> None: relative_path = os.path.relpath(asset_path, base_path) user_id = api.get_user_id() asset = db.get_asset_id_by_path(user_id, relative_path) @@ -155,7 +157,7 @@ def delete_asset(db, api, asset_path, base_path): else: print(f"ERROR: Asset {relative_path} not found in database") -def file_watcher(event, db, api, api_key, user_path): +def file_watcher(event: threading.Event, db: ImmichDatabase, api: ImmichAPI, api_key: str, user_path: str) -> None: log("File watcher thread running...") for changes in watch(user_path, recursive=True, stop_event=event): for c_type, c_path in changes: @@ -173,7 +175,7 @@ def file_watcher(event, db, api, api_key, user_path): log(f"{c_path} deleted, mark asset as removed") delete_asset(db, api, c_path, user_path) -def database_watcher(event, db, api, user_path): +def database_watcher(event: threading.Event, db: ImmichDatabase, api: ImmichAPI, user_path: str) -> None: log("Database watcher thread running...") user_id = api.get_user_id() while not event.is_set(): @@ -192,16 +194,16 @@ def database_watcher(event, db, api, user_path): def main(): db = ImmichDatabase( - host=os.getenv("DB_HOSTNAME"), - database=os.getenv("DB_DATABASE_NAME"), - user=os.getenv("DB_USERNAME"), - password=os.getenv("DB_PASSWORD"), + host=os.environ["DB_HOSTNAME"], + database=os.environ["DB_DATABASE_NAME"], + user=os.environ["DB_USERNAME"], + password=os.environ["DB_PASSWORD"], port=5432 ) - api_key = os.getenv("IMMICH_API_KEY") - immich = ImmichAPI(os.getenv("IMMICH_SERVER_URL"), api_key) - snap_common = os.getenv("SNAP_COMMON") + api_key = os.environ["IMMICH_API_KEY"] + immich = ImmichAPI(os.environ["IMMICH_SERVER_URL"], api_key) + snap_common = os.environ["SNAP_COMMON"] user_id = immich.get_user_id() user_path = f"{snap_common}/sync/{user_id}" From 8f0b0451f8f94ba8c3e8b07d72e28d6a48219ad7 Mon Sep 17 00:00:00 2001 From: Stefan Berggren Date: Sat, 17 Jun 2023 22:59:49 +0200 Subject: [PATCH 07/10] Use correct encoding --- src/bin/sync-service.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/bin/sync-service.py b/src/bin/sync-service.py index 2fa99e7..a18b449 100644 --- a/src/bin/sync-service.py +++ b/src/bin/sync-service.py @@ -17,6 +17,7 @@ def log(msg: str): class ImmichDatabase: def __init__(self, host: str, database: str, user: str, password: str, port: int): self.conn = psycopg2.connect(host=host, database=database, user=user, password=password, port=port) + self.conn.set_client_encoding('UTF8') def last_removed_asset(self, user_id: str) -> list[psycopg2.extras.RealDictRow]: with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: From e1df050c0075cac5786b57694c29ecd3bf239ce3 Mon Sep 17 00:00:00 2001 From: Stefan Berggren Date: Sat, 17 Jun 2023 23:00:12 +0200 Subject: [PATCH 08/10] Log errors if one of the threads has died --- src/bin/sync-service.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/bin/sync-service.py b/src/bin/sync-service.py index a18b449..9b68e25 100644 --- a/src/bin/sync-service.py +++ b/src/bin/sync-service.py @@ -230,5 +230,14 @@ def main(): signal.signal(signal.SIGTERM, lambda signum, frame: stop_event.set()) + while True: + print("Running thread watch") + + if not watch_thread.is_alive(): + print("Critical: Thread watch is not alive") + if not database_thread.is_alive(): + print("Critical: Thread database is not alive") + time.sleep(10) + if __name__ == '__main__': main() From 7ae16e5f3b743e1366c8e562efb3ef9e0c676930 Mon Sep 17 00:00:00 2001 From: Stefan Berggren Date: Sat, 17 Jun 2023 23:31:44 +0200 Subject: [PATCH 09/10] Tweak messages --- src/bin/sync-service.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/bin/sync-service.py b/src/bin/sync-service.py index 9b68e25..2fde65c 100644 --- a/src/bin/sync-service.py +++ b/src/bin/sync-service.py @@ -119,7 +119,7 @@ def hash_all_files(db: ImmichDatabase, user_id: str, path: str) -> None: file_path = os.path.join(root, file) relative_path = os.path.relpath(file_path, path) db.save_hash(user_id, relative_path, hash_file(file_path)) - log(f"Hash for {file_path} stored in database") + log(f"Hash {file_path} and store in database") def import_asset(db: ImmichDatabase, api: ImmichAPI, key: str, base_path: str, asset_path: str) -> None: snap_path = os.getenv("SNAP") @@ -146,17 +146,17 @@ def import_asset(db: ImmichDatabase, api: ImmichAPI, key: str, base_path: str, a checksum = hash_file(asset_path) user_id = api.get_user_id() db.save_hash(user_id, relative_path, checksum) - log(f"{relative_path} hash {checksum.hex()} user {user_id})") + log(f"Hash {relative_path} and store in database for user {user_id})") def delete_asset(db: ImmichDatabase, api: ImmichAPI, asset_path: str, base_path: str) -> None: relative_path = os.path.relpath(asset_path, base_path) user_id = api.get_user_id() asset = db.get_asset_id_by_path(user_id, relative_path) if asset: - print(f"Asset {asset['id']} removed from database") + log(f"Asset {asset['id']} removed from database") api.delete_asset(asset["id"]) else: - print(f"ERROR: Asset {relative_path} not found in database") + log(f"Asset {relative_path} not found in database") def file_watcher(event: threading.Event, db: ImmichDatabase, api: ImmichAPI, api_key: str, user_path: str) -> None: log("File watcher thread running...") @@ -231,12 +231,10 @@ def main(): signal.signal(signal.SIGTERM, lambda signum, frame: stop_event.set()) while True: - print("Running thread watch") - if not watch_thread.is_alive(): - print("Critical: Thread watch is not alive") + log("Critical: Thread watch is not alive") if not database_thread.is_alive(): - print("Critical: Thread database is not alive") + log("Critical: Thread database is not alive") time.sleep(10) if __name__ == '__main__': From 296fd05d3f2630dbd96914d982da8c7ac727a365 Mon Sep 17 00:00:00 2001 From: Stefan Berggren Date: Sat, 17 Jun 2023 23:54:27 +0200 Subject: [PATCH 10/10] Simplify code --- src/bin/sync-import-watch | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/bin/sync-import-watch b/src/bin/sync-import-watch index 6b6d0b1..4a0f1c5 100755 --- a/src/bin/sync-import-watch +++ b/src/bin/sync-import-watch @@ -2,12 +2,6 @@ . $SNAP/bin/load-env -upload() { - local key="$1" - shift - $SNAP/bin/immich-cli upload --key "$key" --yes "$@" -} - if ! sync_enabled; then echo "Folder sync disabled" exit 0 @@ -25,6 +19,6 @@ for KEY in $(snapctl get sync); do mkdir -p "$USER_PATH" echo "Full import of $USER_PATH for user $USER_ID" - upload "$KEY" --recursive "$USER_PATH" + $SNAP/bin/immich-cli upload --recursive --key "$KEY" --yes "$USER_PATH" fi done