Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove old rows from the cache_invalidation_stream_by_instance table automatically. (This table is not used when Synapse is configured to use SQLite.) #15868

Merged
merged 12 commits into from
Aug 8, 2023
123 changes: 123 additions & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing import TYPE_CHECKING, Any, Collection, Iterable, List, Optional, Tuple

from synapse.api.constants import EventTypes
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.streams import BackfillStream, CachesStream
from synapse.replication.tcp.streams.events import (
EventsStream,
Expand Down Expand Up @@ -52,6 +53,21 @@
# As above, but for invalidating room caches on room deletion
DELETE_ROOM_CACHE_NAME = "dr_cache_fake"

# How long between cache invalidation table cleanups, once we have caught up
# with the backlog.
REGULAR_CLEANUP_INTERVAL_MILLISEC = 60 * 60 * 1000
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
reivilibre marked this conversation as resolved.
Show resolved Hide resolved

# How long between cache invalidation table cleanups, before we have caught
# up with the backlog.
CATCH_UP_CLEANUP_INTERVAL_MILLISEC = 5 * 60 * 1000

# Maximum number of cache invalidation rows to delete at once.
CLEAN_UP_MAX_BATCH_SIZE = 200_000

# Keep cache invalidations for 7 days
# (This is likely to be quite excessive.)
RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MILLISEC = 7 * 24 * 60 * 60 * 1000


class CacheInvalidationWorkerStore(SQLBaseStore):
def __init__(
Expand Down Expand Up @@ -98,6 +114,13 @@ def __init__(
else:
self._cache_id_gen = None

# TODO should run background jobs?
if False and isinstance(self.database_engine, PostgresEngine):
self.hs.get_clock().call_later(
CATCH_UP_CLEANUP_INTERVAL_MILLISEC,
self._clean_up_cache_invalidation_wrapper,
)

async def get_all_updated_caches(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
Expand Down Expand Up @@ -554,3 +577,103 @@ def get_cache_stream_token_for_writer(self, instance_name: str) -> int:
return self._cache_id_gen.get_current_token_for_writer(instance_name)
else:
return 0

def _clean_up_cache_invalidation_wrapper(self) -> None:
async def _clean_up_cache_invalidation_background():
delete_up_to: int = (
self.hs.get_clock().time_msec()
- RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MILLISEC
)

in_backlog = await self._clean_up_batch_of_old_cache_invalidations(
delete_up_to
)

# Vary how long we wait before calling again depending on whether we
# are still sifting through backlog or we have caught up.
if in_backlog:
next_interval = CATCH_UP_CLEANUP_INTERVAL_MILLISEC
else:
next_interval = REGULAR_CLEANUP_INTERVAL_MILLISEC

self.hs.get_clock().call_later(
next_interval, self._clean_up_cache_invalidation_wrapper
)

run_as_background_process(
"clean_up_old_cache_invalidations", _clean_up_cache_invalidation_background
)

async def _clean_up_batch_of_old_cache_invalidations(
self, delete_up_to_millisec: int
) -> bool:
"""
Cleans up a batch of old cache invalidations.
reivilibre marked this conversation as resolved.
Show resolved Hide resolved

Up to `CLEAN_UP_BATCH_SIZE` rows will be deleted at once.

Returns true iff we were limited by batch size (i.e. we are in backlog).
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
"""

def _clean_up_batch_of_old_cache_invalidations_txn(
txn: LoggingTransaction,
) -> bool:
# First get the earliest stream ID
txn.execute(
"""
SELECT stream_id FROM cache_invalidation_stream_by_instance
ORDER BY stream_id LIMIT 1
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
"""
)
row = txn.fetchone()
if row is None:
return False
earliest_stream_id: int = row[0]

# Then find the last stream ID of the range we will delete
txn.execute(
"""
SELECT stream_id FROM cache_invalidation_stream_by_instance
WHERE stream_id <= ? AND invalidation_ts <= ?
ORDER BY stream_id DESC
LIMIT 1
""",
(earliest_stream_id + CLEAN_UP_MAX_BATCH_SIZE, delete_up_to_millisec),
)
row = txn.fetchone()
if row is None:
return False
cutoff_stream_id: int = row[0]

# Determine whether we are caught up or still catching up
txn.execute(
"""
SELECT invalidation_ts FROM cache_invalidation_stream_by_instance
WHERE stream_id > ?
ORDER BY stream_id ASC
LIMIT 1
""",
(cutoff_stream_id,),
)
row = txn.fetchone()
if row is None:
in_backlog = False
else:
# We are in backlog if the next row could have been deleted
# if we didn't have such a small batch size
in_backlog = row[0] <= delete_up_to_millisec
Comment on lines +655 to +671
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes sense, but I probably would have queried for something like

                SELECT stream_id FROM cache_invalidation_stream_by_instance
                WHERE stream_id > ? OR invalidation_ts > ?
                ORDER BY stream_id ASC
                LIMIT 1 

i.e. to see if the complement of rows covered by the previous query is nonempty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I genuinely do only want to look at the first row after our current marker: adding more conditions like you suggest is either going to cause more work for the DB as it'd have to scan all the remaining rows until finding one (I think?) or in the case of the OR condition, it's going to not be able to use any index for that part of the condition.

Have to be honest I don't quite follow your suggestion's query, at first I was thinking you wanted WHERE stream_id > ? AND invalidation_ts <= ? but altogether not sure. Ignoring that I don't fully grok what you're saying, I think OR invalidation_ts > ? is potentially problematic as we don't have an index on invalidation_ts — I can't see how the query planner would make a generally efficient plan for the query :).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first I thought you were deleting things based on both stream_id and invalidation_ts. If so, it seemed odd to not take both into account when working out if had deleted everything that you wanted to.

I see now that you're only deleting things based on stream_id, using the invalidation_ts only to select a sensible upper limit on stream_id.

Paranoid note: it is not necessarily the case that x.stream_id > y.stream_id implies x.invalidation_ts > y.invalidation_ts (consider e.g. one worker is slow to complete transactions or has a laggy clock). I wouldn't expect this to have any meaingful effect in practice because any drifts should be small.


txn.execute(
"""
DELETE FROM cache_invalidation_stream_by_instance
WHERE ? <= stream_id AND stream_id <= ?
""",
(earliest_stream_id, cutoff_stream_id),
)
Comment on lines +655 to +679
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we could easily determine the outcome of the deletion.

There is this to see how many rows were deleted if we're interested in that boilerplate.

(not saying we have to use it as the current way is simple, just cumbersome).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered checking the count of deleted rows but these streams can technically have gaps so you don't actually know how many rows are to be deleted, only the upper bound.

DELETE ... RETURNING ... was an option too but it felt less obvious to me and I wanted it to be 'obviously correct', whereas I wouldn't have as much confidence writing something that way


return in_backlog

return await self.db_pool.runInteraction(
"clean_up_old_cache_invalidations",
_clean_up_batch_of_old_cache_invalidations_txn,
)