Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add event_stream_ordering column to membership state tables #14979

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14979.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add denormalised event stream ordering column to membership state tables for future use. Contributed by Nick @ Beeper (@fizzadar).
23 changes: 17 additions & 6 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1147,11 +1147,15 @@ def _update_current_state_txn(
# been inserted into room_memberships.
txn.execute_batch(
"""INSERT INTO current_state_events
(room_id, type, state_key, event_id, membership)
VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
(room_id, type, state_key, event_id, membership, event_stream_ordering)
VALUES (
?, ?, ?, ?,
(SELECT membership FROM room_memberships WHERE event_id = ?),
(SELECT stream_ordering FROM events WHERE event_id = ?)
)
""",
[
(room_id, key[0], key[1], ev_id, ev_id)
(room_id, key[0], key[1], ev_id, ev_id, ev_id)
for key, ev_id in to_insert.items()
],
)
Expand All @@ -1178,11 +1182,15 @@ def _update_current_state_txn(
if to_insert:
txn.execute_batch(
"""INSERT INTO local_current_membership
(room_id, user_id, event_id, membership)
VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
(room_id, user_id, event_id, membership, event_stream_ordering)
VALUES (
?, ?, ?,
(SELECT membership FROM room_memberships WHERE event_id = ?),
(SELECT stream_ordering FROM events WHERE event_id = ?)
)
""",
[
(room_id, key[1], ev_id, ev_id)
(room_id, key[1], ev_id, ev_id, ev_id)
for key, ev_id in to_insert.items()
if key[0] == EventTypes.Member and self.is_mine_id(key[1])
],
Expand Down Expand Up @@ -1790,6 +1798,7 @@ def _store_room_members_txn(
table="room_memberships",
keys=(
"event_id",
"event_stream_ordering",
"user_id",
"sender",
"room_id",
Expand All @@ -1800,6 +1809,7 @@ def _store_room_members_txn(
values=[
(
event.event_id,
event.internal_metadata.stream_ordering,
event.state_key,
event.user_id,
event.room_id,
Expand Down Expand Up @@ -1832,6 +1842,7 @@ def _store_room_members_txn(
keyvalues={"room_id": event.room_id, "user_id": event.state_key},
values={
"event_id": event.event_id,
"event_stream_ordering": event.internal_metadata.stream_ordering,
"membership": event.membership,
},
)
Expand Down
104 changes: 103 additions & 1 deletion synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import attr

from synapse.api.constants import EventContentFields, RelationTypes
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import make_event_from_dict
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
Expand Down Expand Up @@ -71,6 +71,10 @@ class _BackgroundUpdates:

EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index"

POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING = (
"populate_membership_event_stream_ordering"
)


@attr.s(slots=True, frozen=True, auto_attribs=True)
class _CalculateChainCover:
Expand Down Expand Up @@ -99,6 +103,10 @@ def __init__(
):
super().__init__(database, db_conn, hs)

self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING,
self._populate_membership_event_stream_ordering,
)
self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME,
self._background_reindex_origin_server_ts,
Expand Down Expand Up @@ -1498,3 +1506,97 @@ def _populate_txn(txn: LoggingTransaction) -> bool:
)

return batch_size

async def _populate_membership_event_stream_ordering(
self, progress: JsonDict, batch_size: int
) -> int:
def _populate_membership_event_stream_ordering(
txn: LoggingTransaction,
) -> bool:

if "max_stream_ordering" in progress:
max_stream_ordering = progress["max_stream_ordering"]
else:
txn.execute("SELECT max(stream_ordering) FROM events")
res = txn.fetchone()
if res is None or res[0] is None:
return True
else:
max_stream_ordering = res[0]

start = progress.get("stream_ordering", 0)
stop = start + batch_size

sql = f"""
SELECT room_id, event_id, stream_ordering
FROM events
WHERE
type = '{EventTypes.Member}'
AND stream_ordering >= ?
AND stream_ordering < ?
"""
txn.execute(sql, (start, stop))

rows: List[Tuple[str, str, int]] = cast(
List[Tuple[str, str, int]], txn.fetchall()
)

event_ids: List[Tuple[str]] = []
event_stream_orderings: List[Tuple[int]] = []

for _, event_id, event_stream_ordering in rows:
event_ids.append((event_id,))
event_stream_orderings.append((event_stream_ordering,))

self.db_pool.simple_update_many_txn(
txn,
table="current_state_events",
key_names=("event_id",),
key_values=event_ids,
value_names=("event_stream_ordering",),
value_values=event_stream_orderings,
)

self.db_pool.simple_update_many_txn(
txn,
table="room_memberships",
key_names=("event_id",),
key_values=event_ids,
value_names=("event_stream_ordering",),
value_values=event_stream_orderings,
)

# NOTE: local_current_membership has no index on event_id, so only
# the room ID here will reduce the query rows read.
for room_id, event_id, event_stream_ordering in rows:
txn.execute(
"""
UPDATE local_current_membership
SET event_stream_ordering = ?
WHERE room_id = ? AND event_id = ?
""",
(event_stream_ordering, room_id, event_id),
)

self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING,
{
"stream_ordering": stop,
"max_stream_ordering": max_stream_ordering,
},
)

return stop > max_stream_ordering

finished = await self.db_pool.runInteraction(
"_populate_membership_event_stream_ordering",
_populate_membership_event_stream_ordering,
)

if finished:
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING
)

return batch_size
8 changes: 4 additions & 4 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1779,7 +1779,7 @@ def get_ex_outlier_stream_rows_txn(
txn: LoggingTransaction,
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
sql = (
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
"SELECT out.event_stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL,"
" e.outlier"
" FROM events AS e"
Expand All @@ -1791,10 +1791,10 @@ def get_ex_outlier_stream_rows_txn(
" LEFT JOIN event_relations USING (event_id)"
" LEFT JOIN room_memberships USING (event_id)"
" LEFT JOIN rejections USING (event_id)"
" WHERE ? < event_stream_ordering"
" AND event_stream_ordering <= ?"
" WHERE ? < out.event_stream_ordering"
" AND out.event_stream_ordering <= ?"
" AND out.instance_name = ?"
" ORDER BY event_stream_ordering ASC"
" ORDER BY out.event_stream_ordering ASC"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This disambiguates the query. Unfortunately it means that old old Synapses (without this code change) will fail to run this query against an upgraded database.

Not sure how we should handle this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)

txn.execute(sql, (last_id, current_id, instance_name))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/* Copyright 2022 Beeper
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT;
ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT;
ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT;

INSERT INTO background_updates (update_name, progress_json) VALUES
('populate_membership_event_stream_ordering', '{}');
Comment on lines +20 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably should have defined an ordering, right now it jumped ahead of all the other pending updates which seems not great.