From 6f3b76df09e51d17727a17c44ff458ab98079d9e Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Jun 2023 18:19:45 +0100 Subject: [PATCH 01/12] Add a cache invalidation clean-up task --- synapse/storage/databases/main/cache.py | 123 ++++++++++++++++++++++++ 1 file changed, 123 insertions(+) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index c940f864d173..8ec95a8f3629 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -18,6 +18,7 @@ from typing import TYPE_CHECKING, Any, Collection, Iterable, List, Optional, Tuple from synapse.api.constants import EventTypes +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.streams import BackfillStream, CachesStream from synapse.replication.tcp.streams.events import ( EventsStream, @@ -52,6 +53,21 @@ # As above, but for invalidating room caches on room deletion DELETE_ROOM_CACHE_NAME = "dr_cache_fake" +# How long between cache invalidation table cleanups, once we have caught up +# with the backlog. +REGULAR_CLEANUP_INTERVAL_MILLISEC = 60 * 60 * 1000 + +# How long between cache invalidation table cleanups, before we have caught +# up with the backlog. +CATCH_UP_CLEANUP_INTERVAL_MILLISEC = 5 * 60 * 1000 + +# Maximum number of cache invalidation rows to delete at once. +CLEAN_UP_MAX_BATCH_SIZE = 200_000 + +# Keep cache invalidations for 7 days +# (This is likely to be quite excessive.) +RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MILLISEC = 7 * 24 * 60 * 60 * 1000 + class CacheInvalidationWorkerStore(SQLBaseStore): def __init__( @@ -98,6 +114,13 @@ def __init__( else: self._cache_id_gen = None + # TODO should run background jobs? + if False and isinstance(self.database_engine, PostgresEngine): + self.hs.get_clock().call_later( + CATCH_UP_CLEANUP_INTERVAL_MILLISEC, + self._clean_up_cache_invalidation_wrapper, + ) + async def get_all_updated_caches( self, instance_name: str, last_id: int, current_id: int, limit: int ) -> Tuple[List[Tuple[int, tuple]], int, bool]: @@ -554,3 +577,103 @@ def get_cache_stream_token_for_writer(self, instance_name: str) -> int: return self._cache_id_gen.get_current_token_for_writer(instance_name) else: return 0 + + def _clean_up_cache_invalidation_wrapper(self) -> None: + async def _clean_up_cache_invalidation_background(): + delete_up_to: int = ( + self.hs.get_clock().time_msec() + - RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MILLISEC + ) + + in_backlog = await self._clean_up_batch_of_old_cache_invalidations( + delete_up_to + ) + + # Vary how long we wait before calling again depending on whether we + # are still sifting through backlog or we have caught up. + if in_backlog: + next_interval = CATCH_UP_CLEANUP_INTERVAL_MILLISEC + else: + next_interval = REGULAR_CLEANUP_INTERVAL_MILLISEC + + self.hs.get_clock().call_later( + next_interval, self._clean_up_cache_invalidation_wrapper + ) + + run_as_background_process( + "clean_up_old_cache_invalidations", _clean_up_cache_invalidation_background + ) + + async def _clean_up_batch_of_old_cache_invalidations( + self, delete_up_to_millisec: int + ) -> bool: + """ + Cleans up a batch of old cache invalidations. + + Up to `CLEAN_UP_BATCH_SIZE` rows will be deleted at once. + + Returns true iff we were limited by batch size (i.e. we are in backlog). + """ + + def _clean_up_batch_of_old_cache_invalidations_txn( + txn: LoggingTransaction, + ) -> bool: + # First get the earliest stream ID + txn.execute( + """ + SELECT stream_id FROM cache_invalidation_stream_by_instance + ORDER BY stream_id LIMIT 1 + """ + ) + row = txn.fetchone() + if row is None: + return False + earliest_stream_id: int = row[0] + + # Then find the last stream ID of the range we will delete + txn.execute( + """ + SELECT stream_id FROM cache_invalidation_stream_by_instance + WHERE stream_id <= ? AND invalidation_ts <= ? + ORDER BY stream_id DESC + LIMIT 1 + """, + (earliest_stream_id + CLEAN_UP_MAX_BATCH_SIZE, delete_up_to_millisec), + ) + row = txn.fetchone() + if row is None: + return False + cutoff_stream_id: int = row[0] + + # Determine whether we are caught up or still catching up + txn.execute( + """ + SELECT invalidation_ts FROM cache_invalidation_stream_by_instance + WHERE stream_id > ? + ORDER BY stream_id ASC + LIMIT 1 + """, + (cutoff_stream_id,), + ) + row = txn.fetchone() + if row is None: + in_backlog = False + else: + # We are in backlog if the next row could have been deleted + # if we didn't have such a small batch size + in_backlog = row[0] <= delete_up_to_millisec + + txn.execute( + """ + DELETE FROM cache_invalidation_stream_by_instance + WHERE ? <= stream_id AND stream_id <= ? + """, + (earliest_stream_id, cutoff_stream_id), + ) + + return in_backlog + + return await self.db_pool.runInteraction( + "clean_up_old_cache_invalidations", + _clean_up_batch_of_old_cache_invalidations_txn, + ) From f50c8002cd21c7e96a9cfba459806322551c0bd3 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Mon, 3 Jul 2023 13:27:53 +0100 Subject: [PATCH 02/12] Run the cache invalidation stream clean-up on the background worker --- synapse/storage/databases/main/cache.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 8ec95a8f3629..21df6da62de6 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -114,8 +114,12 @@ def __init__( else: self._cache_id_gen = None - # TODO should run background jobs? - if False and isinstance(self.database_engine, PostgresEngine): + if hs.config.worker.run_background_tasks and isinstance( + self.database_engine, PostgresEngine + ): + # Occasionally clean up the cache invalidations stream table. + # This is only applicable if we are on Postgres and therefore populate + # those tables. self.hs.get_clock().call_later( CATCH_UP_CLEANUP_INTERVAL_MILLISEC, self._clean_up_cache_invalidation_wrapper, From 19418ecd63090838974beb5bb5b0b511eadaa973 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Mon, 3 Jul 2023 13:51:07 +0100 Subject: [PATCH 03/12] Tune down --- synapse/storage/databases/main/cache.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 21df6da62de6..8c6c7d7d1db3 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -59,10 +59,10 @@ # How long between cache invalidation table cleanups, before we have caught # up with the backlog. -CATCH_UP_CLEANUP_INTERVAL_MILLISEC = 5 * 60 * 1000 +CATCH_UP_CLEANUP_INTERVAL_MILLISEC = 60 * 1000 # Maximum number of cache invalidation rows to delete at once. -CLEAN_UP_MAX_BATCH_SIZE = 200_000 +CLEAN_UP_MAX_BATCH_SIZE = 20_000 # Keep cache invalidations for 7 days # (This is likely to be quite excessive.) @@ -584,6 +584,12 @@ def get_cache_stream_token_for_writer(self, instance_name: str) -> int: def _clean_up_cache_invalidation_wrapper(self) -> None: async def _clean_up_cache_invalidation_background(): + """ + Clean up cache invalidation stream table entries occasionally. + If we are behind (i.e. there are entries old enough to + be deleted but too many of them to be deleted in one go), + then we run slightly more frequently. + """ delete_up_to: int = ( self.hs.get_clock().time_msec() - RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MILLISEC From 383b7c930f829d721282c19b9b892cd949c649e0 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Mon, 3 Jul 2023 14:11:21 +0100 Subject: [PATCH 04/12] call_later is in millis! --- synapse/storage/databases/main/cache.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 8c6c7d7d1db3..5f2ee1e93955 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -121,7 +121,7 @@ def __init__( # This is only applicable if we are on Postgres and therefore populate # those tables. self.hs.get_clock().call_later( - CATCH_UP_CLEANUP_INTERVAL_MILLISEC, + CATCH_UP_CLEANUP_INTERVAL_MILLISEC / 1000, self._clean_up_cache_invalidation_wrapper, ) @@ -607,7 +607,7 @@ async def _clean_up_cache_invalidation_background(): next_interval = REGULAR_CLEANUP_INTERVAL_MILLISEC self.hs.get_clock().call_later( - next_interval, self._clean_up_cache_invalidation_wrapper + next_interval / 1000, self._clean_up_cache_invalidation_wrapper ) run_as_background_process( From 3ca3d1ee5f71789a1188a4faefaf7ab72623fa30 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Mon, 3 Jul 2023 14:26:17 +0100 Subject: [PATCH 05/12] Newsfile Signed-off-by: Olivier Wilkinson (reivilibre) --- changelog.d/15868.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/15868.feature diff --git a/changelog.d/15868.feature b/changelog.d/15868.feature new file mode 100644 index 000000000000..a866bf5774dc --- /dev/null +++ b/changelog.d/15868.feature @@ -0,0 +1 @@ +Remove old rows from the `cache_invalidation_stream_by_instance` table automatically (this table is unused in SQLite). From fe7c6a9abe5258b7b73976d722358f051bf8ffde Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Mon, 3 Jul 2023 15:53:23 +0100 Subject: [PATCH 06/12] fixup! Add a cache invalidation clean-up task --- synapse/storage/databases/main/cache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 5f2ee1e93955..8e6f3ca1a5a4 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -583,7 +583,7 @@ def get_cache_stream_token_for_writer(self, instance_name: str) -> int: return 0 def _clean_up_cache_invalidation_wrapper(self) -> None: - async def _clean_up_cache_invalidation_background(): + async def _clean_up_cache_invalidation_background() -> None: """ Clean up cache invalidation stream table entries occasionally. If we are behind (i.e. there are entries old enough to From eb2838955f0e0e85ff94f943cb9f8066fc23820e Mon Sep 17 00:00:00 2001 From: reivilibre Date: Thu, 20 Jul 2023 11:38:07 +0100 Subject: [PATCH 07/12] Update synapse/storage/databases/main/cache.py Co-authored-by: Eric Eastwood --- synapse/storage/databases/main/cache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 8e6f3ca1a5a4..787593ffa310 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -618,7 +618,7 @@ async def _clean_up_batch_of_old_cache_invalidations( self, delete_up_to_millisec: int ) -> bool: """ - Cleans up a batch of old cache invalidations. + Remove old rows from the `cache_invalidation_stream_by_instance` table automatically (this table is unused in SQLite). Up to `CLEAN_UP_BATCH_SIZE` rows will be deleted at once. From 53e7030d86873090f243500372d8f900d3f64340 Mon Sep 17 00:00:00 2001 From: reivilibre Date: Thu, 20 Jul 2023 11:43:57 +0100 Subject: [PATCH 08/12] Update synapse/storage/databases/main/cache.py Co-authored-by: Eric Eastwood --- synapse/storage/databases/main/cache.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 787593ffa310..cd4c7e23500b 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -632,7 +632,8 @@ def _clean_up_batch_of_old_cache_invalidations_txn( txn.execute( """ SELECT stream_id FROM cache_invalidation_stream_by_instance - ORDER BY stream_id LIMIT 1 + ORDER BY stream_id ASC + LIMIT 1 """ ) row = txn.fetchone() From 2bef4ef43322888acb4ea150470f4d5086228209 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 21 Jul 2023 12:53:24 +0100 Subject: [PATCH 09/12] MILLISEC -> MS --- synapse/storage/databases/main/cache.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index cd4c7e23500b..5d181e932307 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -18,6 +18,7 @@ from typing import TYPE_CHECKING, Any, Collection, Iterable, List, Optional, Tuple from synapse.api.constants import EventTypes +from synapse.config._base import Config from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.streams import BackfillStream, CachesStream from synapse.replication.tcp.streams.events import ( @@ -55,18 +56,18 @@ # How long between cache invalidation table cleanups, once we have caught up # with the backlog. -REGULAR_CLEANUP_INTERVAL_MILLISEC = 60 * 60 * 1000 +REGULAR_CLEANUP_INTERVAL_MS = Config.parse_duration("1h") # How long between cache invalidation table cleanups, before we have caught # up with the backlog. -CATCH_UP_CLEANUP_INTERVAL_MILLISEC = 60 * 1000 +CATCH_UP_CLEANUP_INTERVAL_MS = Config.parse_duration("1m") # Maximum number of cache invalidation rows to delete at once. CLEAN_UP_MAX_BATCH_SIZE = 20_000 # Keep cache invalidations for 7 days # (This is likely to be quite excessive.) -RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MILLISEC = 7 * 24 * 60 * 60 * 1000 +RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS = Config.parse_duration("7d") class CacheInvalidationWorkerStore(SQLBaseStore): @@ -121,7 +122,7 @@ def __init__( # This is only applicable if we are on Postgres and therefore populate # those tables. self.hs.get_clock().call_later( - CATCH_UP_CLEANUP_INTERVAL_MILLISEC / 1000, + CATCH_UP_CLEANUP_INTERVAL_MS / 1000, self._clean_up_cache_invalidation_wrapper, ) @@ -592,7 +593,7 @@ async def _clean_up_cache_invalidation_background() -> None: """ delete_up_to: int = ( self.hs.get_clock().time_msec() - - RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MILLISEC + - RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS ) in_backlog = await self._clean_up_batch_of_old_cache_invalidations( @@ -602,9 +603,9 @@ async def _clean_up_cache_invalidation_background() -> None: # Vary how long we wait before calling again depending on whether we # are still sifting through backlog or we have caught up. if in_backlog: - next_interval = CATCH_UP_CLEANUP_INTERVAL_MILLISEC + next_interval = CATCH_UP_CLEANUP_INTERVAL_MS else: - next_interval = REGULAR_CLEANUP_INTERVAL_MILLISEC + next_interval = REGULAR_CLEANUP_INTERVAL_MS self.hs.get_clock().call_later( next_interval / 1000, self._clean_up_cache_invalidation_wrapper From 8ece6c0bf064813a7e76abf032fab679d7d325f5 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 21 Jul 2023 12:54:09 +0100 Subject: [PATCH 10/12] Expand on comment --- synapse/storage/databases/main/cache.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 5d181e932307..9acbc5e37254 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -623,7 +623,8 @@ async def _clean_up_batch_of_old_cache_invalidations( Up to `CLEAN_UP_BATCH_SIZE` rows will be deleted at once. - Returns true iff we were limited by batch size (i.e. we are in backlog). + Returns true if and only if we were limited by batch size (i.e. we are in backlog: + there are more things to clean up). """ def _clean_up_batch_of_old_cache_invalidations_txn( From 71fcc0e9406e22889b4bf880b96fb84f70ebd2f5 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 21 Jul 2023 12:57:04 +0100 Subject: [PATCH 11/12] Move and tweak comment about Postgres --- synapse/storage/databases/main/cache.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 9acbc5e37254..7937ec1bd201 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -115,12 +115,13 @@ def __init__( else: self._cache_id_gen = None + # Occasionally clean up the cache invalidations stream table by deleting + # old rows. + # This is only applicable when Postgres is in use; this table is unused + # and not populated at all when SQLite is the active database engine. if hs.config.worker.run_background_tasks and isinstance( self.database_engine, PostgresEngine ): - # Occasionally clean up the cache invalidations stream table. - # This is only applicable if we are on Postgres and therefore populate - # those tables. self.hs.get_clock().call_later( CATCH_UP_CLEANUP_INTERVAL_MS / 1000, self._clean_up_cache_invalidation_wrapper, From ca52419c5b201421041d8280fa23a0e2940cf3d2 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Mon, 7 Aug 2023 14:14:31 +0100 Subject: [PATCH 12/12] Use `wrap_as_background_process` --- synapse/storage/databases/main/cache.py | 49 +++++++++++-------------- 1 file changed, 21 insertions(+), 28 deletions(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 7937ec1bd201..2fbd389c7168 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -19,7 +19,7 @@ from synapse.api.constants import EventTypes from synapse.config._base import Config -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.replication.tcp.streams import BackfillStream, CachesStream from synapse.replication.tcp.streams.events import ( EventsStream, @@ -584,36 +584,29 @@ def get_cache_stream_token_for_writer(self, instance_name: str) -> int: else: return 0 - def _clean_up_cache_invalidation_wrapper(self) -> None: - async def _clean_up_cache_invalidation_background() -> None: - """ - Clean up cache invalidation stream table entries occasionally. - If we are behind (i.e. there are entries old enough to - be deleted but too many of them to be deleted in one go), - then we run slightly more frequently. - """ - delete_up_to: int = ( - self.hs.get_clock().time_msec() - - RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS - ) - - in_backlog = await self._clean_up_batch_of_old_cache_invalidations( - delete_up_to - ) + @wrap_as_background_process("clean_up_old_cache_invalidations") + async def _clean_up_cache_invalidation_wrapper(self) -> None: + """ + Clean up cache invalidation stream table entries occasionally. + If we are behind (i.e. there are entries old enough to + be deleted but too many of them to be deleted in one go), + then we run slightly more frequently. + """ + delete_up_to: int = ( + self.hs.get_clock().time_msec() - RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS + ) - # Vary how long we wait before calling again depending on whether we - # are still sifting through backlog or we have caught up. - if in_backlog: - next_interval = CATCH_UP_CLEANUP_INTERVAL_MS - else: - next_interval = REGULAR_CLEANUP_INTERVAL_MS + in_backlog = await self._clean_up_batch_of_old_cache_invalidations(delete_up_to) - self.hs.get_clock().call_later( - next_interval / 1000, self._clean_up_cache_invalidation_wrapper - ) + # Vary how long we wait before calling again depending on whether we + # are still sifting through backlog or we have caught up. + if in_backlog: + next_interval = CATCH_UP_CLEANUP_INTERVAL_MS + else: + next_interval = REGULAR_CLEANUP_INTERVAL_MS - run_as_background_process( - "clean_up_old_cache_invalidations", _clean_up_cache_invalidation_background + self.hs.get_clock().call_later( + next_interval / 1000, self._clean_up_cache_invalidation_wrapper ) async def _clean_up_batch_of_old_cache_invalidations(