Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Prefill more stream change caches. #12372

Merged
merged 7 commits into from
Apr 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12372.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce overhead of restarting synchrotrons.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't it actually create more overhead, since we need to pull thousands of rows out of the database at startup?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so the reason this works is that we do the work once at start up, rather than many times for lots of different requests. The key thing here is that the stream change caches do not get updated on reads, but on the writes (or when we prefill them on startup).

Also c.f. #12367 (comment) why prefilling is good in general

Copy link
Contributor

@babolivier babolivier Apr 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get why prefilling is a good thing, I'm just curious about the changelog claiming it reduces the restart the overhead of restarting synchrotrons but it actually increases it.

Edit: I reread the comment you're linking to and realised that it does somewhat answer my question

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think I see what you mean: yes it does cause the initial restart to take longer, but then the first 10 minutes of up time uses much less DB and CPU. So I mean more along the lines of "reduce resource usage for the first few minutes after restart". Not sure if there is a better way of wording the changelog?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if there is a better way of wording the changelog?

Don't think there is no, at least not in the way that would be user-friendly.

25 changes: 2 additions & 23 deletions synapse/replication/slave/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main.devices import DeviceWorkerStore
from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyWorkerStore
from synapse.util.caches.stream_change_cache import StreamChangeCache

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand All @@ -33,8 +32,6 @@ def __init__(
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)

self.hs = hs

self._device_list_id_gen = SlavedIdTracker(
Expand All @@ -47,26 +44,8 @@ def __init__(
("device_lists_changes_in_room", "stream_id"),
],
)
device_list_max = self._device_list_id_gen.get_current_token()
device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict(
db_conn,
"device_lists_stream",
entity_column="user_id",
stream_column="stream_id",
max_value=device_list_max,
limit=1000,
)
self._device_list_stream_cache = StreamChangeCache(
"DeviceListStreamChangeCache",
min_device_list_id,
prefilled_cache=device_list_prefill,
)
self._user_signature_stream_cache = StreamChangeCache(
"UserSignatureStreamChangeCache", device_list_max
)
self._device_list_federation_stream_cache = StreamChangeCache(
"DeviceListFederationStreamChangeCache", device_list_max
)

super().__init__(database, db_conn, hs)

def get_device_stream_token(self) -> int:
return self._device_list_id_gen.get_current_token()
Expand Down
43 changes: 27 additions & 16 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2030,29 +2030,40 @@ def get_cache_dict(
max_value: int,
limit: int = 100000,
) -> Tuple[Dict[Any, int], int]:
# Fetch a mapping of room_id -> max stream position for "recent" rooms.
# It doesn't really matter how many we get, the StreamChangeCache will
# do the right thing to ensure it respects the max size of cache.
sql = (
"SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s"
" WHERE %(stream)s > ? - %(limit)s"
" GROUP BY %(entity)s"
) % {
"table": table,
"entity": entity_column,
"stream": stream_column,
"limit": limit,
}
"""Gets roughly the last N changes in the given stream table as a
map from entity to the stream ID of the most recent change.

Also returns the minimum stream ID.
"""

# This may return many rows for the same entity, but the `limit` is only
# a suggestion so we don't care that much.
#
# Note: Some stream tables can have multiple rows with the same stream
# ID. Instead of handling this with complicated SQL, we instead simply
# add one to the returned minimum stream ID to ensure correctness.
sql = f"""
SELECT {entity_column}, {stream_column}
FROM {table}
ORDER BY {stream_column} DESC
LIMIT ?
"""
Comment on lines +2045 to +2050
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change means that if an entity has many updates to its name (and others don't) we will end up with a smaller cache than before, right? Are we fine with that (eg because the limit is high enough that we don't really have to care)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we were looking for rows in the range [MAX - LIMIT, MAX], which will return less than LIMIT ¹, so I think this will always return more rows than before.

¹ ... except if we have rows with the same stream ID, in which case this breaks.


txn = db_conn.cursor(txn_name="get_cache_dict")
txn.execute(sql, (int(max_value),))
txn.execute(sql, (limit,))

cache = {row[0]: int(row[1]) for row in txn}
# The rows come out in reverse stream ID order, so we want to keep the
# stream ID of the first row for each entity.
cache: Dict[Any, int] = {}
for row in txn:
cache.setdefault(row[0], int(row[1]))

txn.close()

if cache:
min_val = min(cache.values())
# We add one here as we don't know if we have all rows for the
# minimum stream ID.
min_val = min(cache.values()) + 1
else:
min_val = max_value

Expand Down
21 changes: 0 additions & 21 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,27 +183,6 @@ def __init__(

super().__init__(database, db_conn, hs)

device_list_max = self._device_list_id_gen.get_current_token()
device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict(
db_conn,
"device_lists_stream",
entity_column="user_id",
stream_column="stream_id",
max_value=device_list_max,
limit=1000,
)
self._device_list_stream_cache = StreamChangeCache(
"DeviceListStreamChangeCache",
min_device_list_id,
prefilled_cache=device_list_prefill,
)
self._user_signature_stream_cache = StreamChangeCache(
"UserSignatureStreamChangeCache", device_list_max
)
self._device_list_federation_stream_cache = StreamChangeCache(
"DeviceListFederationStreamChangeCache", device_list_max
)

events_max = self._stream_id_gen.get_current_token()
curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(
db_conn,
Expand Down
50 changes: 50 additions & 0 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from synapse.util import json_decoder, json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.iterutils import batch_iter
from synapse.util.stringutils import shortstr

Expand All @@ -71,6 +72,55 @@ def __init__(
):
super().__init__(database, db_conn, hs)

device_list_max = self._device_list_id_gen.get_current_token()
device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict(
db_conn,
"device_lists_stream",
entity_column="user_id",
stream_column="stream_id",
max_value=device_list_max,
limit=10000,
)
self._device_list_stream_cache = StreamChangeCache(
"DeviceListStreamChangeCache",
min_device_list_id,
prefilled_cache=device_list_prefill,
)

(
user_signature_stream_prefill,
user_signature_stream_list_id,
) = self.db_pool.get_cache_dict(
db_conn,
"user_signature_stream",
entity_column="from_user_id",
stream_column="stream_id",
max_value=device_list_max,
limit=1000,
)
self._user_signature_stream_cache = StreamChangeCache(
"UserSignatureStreamChangeCache",
user_signature_stream_list_id,
prefilled_cache=user_signature_stream_prefill,
)

(
device_list_federation_prefill,
device_list_federation_list_id,
) = self.db_pool.get_cache_dict(
db_conn,
"device_lists_outbound_pokes",
entity_column="destination",
stream_column="stream_id",
max_value=device_list_max,
limit=10000,
)
self._device_list_federation_stream_cache = StreamChangeCache(
"DeviceListFederationStreamChangeCache",
device_list_federation_list_id,
prefilled_cache=device_list_federation_prefill,
)

if hs.config.worker.run_background_tasks:
self._clock.looping_call(
self._prune_old_outbound_device_pokes, 60 * 60 * 1000
Expand Down
13 changes: 12 additions & 1 deletion synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,19 @@ def __init__(

super().__init__(database, db_conn, hs)

max_receipts_stream_id = self.get_max_receipt_stream_id()
receipts_stream_prefill, min_receipts_stream_id = self.db_pool.get_cache_dict(
db_conn,
"receipts_linearized",
entity_column="room_id",
stream_column="stream_id",
max_value=max_receipts_stream_id,
limit=10000,
)
self._receipts_stream_cache = StreamChangeCache(
"ReceiptsRoomChangeCache", self.get_max_receipt_stream_id()
"ReceiptsRoomChangeCache",
min_receipts_stream_id,
prefilled_cache=receipts_stream_prefill,
)

def get_max_receipt_stream_id(self) -> int:
Expand Down