Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Faster room joins: Resume state re-syncing after a Synapse restart #12813

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12813.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Resume state re-syncing for rooms with partial state after a Synapse restart.
27 changes: 25 additions & 2 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ def __init__(self, hs: "HomeServer"):

self.third_party_event_rules = hs.get_third_party_event_rules()

# if this is the main process, fire off a background process to resume
# any partial-state-resync operations which were in flight when we
# were shut down.
if not hs.config.worker.worker_app:
run_as_background_process(
"resume_sync_partial_state_room", self._resume_sync_partial_state_room
)
squahtx marked this conversation as resolved.
Show resolved Hide resolved

async def maybe_backfill(
self, room_id: str, current_depth: int, limit: int
) -> bool:
Expand Down Expand Up @@ -470,6 +478,8 @@ async def do_invite_join(
"""
# TODO: We should be able to call this on workers, but the upgrading of
# room stuff after join currently doesn't work on workers.
# TODO: Before we relax this condition, we need to allow re-syncing of
# partial room state to happen on workers.
assert self.config.worker.worker_app is None

logger.debug("Joining %s to %s", joinee, room_id)
Expand Down Expand Up @@ -550,8 +560,6 @@ async def do_invite_join(
if ret.partial_state:
# Kick off the process of asynchronously fetching the state for this
# room.
#
# TODO(faster_joins): pick this up again on restart
run_as_background_process(
desc="sync_partial_state_room",
func=self._sync_partial_state_room,
Expand Down Expand Up @@ -1463,6 +1471,20 @@ async def get_room_complexity(
# well.
return None

async def _resume_sync_partial_state_room(self) -> None:
"""Resumes resyncing of all partial-state rooms after a restart."""
assert not self.config.worker.worker_app

partial_state_rooms = await self.store.get_partial_state_rooms_and_servers()
for room_id, servers_in_room in partial_state_rooms.items():
run_as_background_process(
desc="sync_partial_state_room",
func=self._sync_partial_state_room,
initial_destination=None,
other_destinations=servers_in_room,
room_id=room_id,
)

async def _sync_partial_state_room(
self,
initial_destination: Optional[str],
Expand All @@ -1477,6 +1499,7 @@ async def _sync_partial_state_room(
`initial_destination` is unavailable
room_id: room to be resynced
"""
assert not self.config.worker.worker_app
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is implied by the identical assert in do_invite_join, which starts the background task.


# TODO(faster_joins): do we need to lock to avoid races? What happens if other
# worker processes kick off a resync in parallel? Perhaps we should just elect
Expand Down
27 changes: 27 additions & 0 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Collection,
Dict,
List,
Mapping,
Optional,
Tuple,
Union,
Expand Down Expand Up @@ -1081,6 +1082,32 @@ def get_rooms_for_retention_period_in_range_txn(
get_rooms_for_retention_period_in_range_txn,
)

async def get_partial_state_rooms_and_servers(
self,
) -> Mapping[str, Collection[str]]:
"""Get all rooms containing events with partial state, and the servers known
to be in the room.

Returns:
A dictionary of rooms with partial state, with room IDs as keys and
lists of servers in rooms as values.
"""
room_servers: Dict[str, List[str]] = {}

rows = await self.db_pool.simple_select_list(
"partial_state_rooms_servers",
keyvalues=None,
retcols=("room_id", "server_name"),
desc="get_partial_state_rooms",
)

for row in rows:
room_id = row["room_id"]
server_name = row["server_name"]
room_servers.setdefault(room_id, []).append(server_name)

return room_servers

async def clear_partial_state_room(self, room_id: str) -> bool:
# this can race with incoming events, so we watch out for FK errors.
# TODO(faster_joins): this still doesn't completely fix the race, since the persist process
Expand Down