Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Faster joins: Avoid starting duplicate partial state syncs #14844

Merged
merged 8 commits into from Jan 20, 2023
Merged
49 changes: 43 additions & 6 deletions synapse/handlers/federation.py
Expand Up @@ -27,6 +27,7 @@
Iterable,
List,
Optional,
Set,
Tuple,
Union,
)
Expand Down Expand Up @@ -171,6 +172,11 @@ def __init__(self, hs: "HomeServer"):

self.third_party_event_rules = hs.get_third_party_event_rules()

# Tracks running partial state syncs by room ID.
# Partial state syncs currently only run on the main process, so it's okay to
# track them in-memory for now.
self._active_partial_state_syncs: Set[str] = set()

# if this is the main process, fire off a background process to resume
# any partial-state-resync operations which were in flight when we
# were shut down.
Expand Down Expand Up @@ -679,9 +685,7 @@ async def do_invite_join(
if ret.partial_state:
# Kick off the process of asynchronously fetching the state for this
# room.
run_as_background_process(
desc="sync_partial_state_room",
func=self._sync_partial_state_room,
self._start_partial_state_room_sync(
initial_destination=origin,
other_destinations=ret.servers_in_room,
room_id=room_id,
Expand Down Expand Up @@ -1666,14 +1670,47 @@ async def _resume_partial_state_room_sync(self) -> None:

partial_state_rooms = await self.store.get_partial_state_room_resync_info()
for room_id, resync_info in partial_state_rooms.items():
run_as_background_process(
desc="sync_partial_state_room",
func=self._sync_partial_state_room,
self._start_partial_state_room_sync(
initial_destination=resync_info.joined_via,
other_destinations=resync_info.servers_in_room,
room_id=room_id,
)

def _start_partial_state_room_sync(
self,
initial_destination: Optional[str],
other_destinations: Collection[str],
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
room_id: str,
) -> None:
"""Starts the background process to resync the state of a partial-state room,
if it is not already running.

Args:
initial_destination: the initial homeserver to pull the state from
other_destinations: other homeservers to try to pull the state from, if
`initial_destination` is unavailable
room_id: room to be resynced
"""

async def _sync_partial_state_room_wrapper() -> None:
if room_id in self._active_partial_state_syncs:
return

self._active_partial_state_syncs.add(room_id)

try:
await self._sync_partial_state_room(
initial_destination=initial_destination,
other_destinations=other_destinations,
room_id=room_id,
)
finally:
self._active_partial_state_syncs.remove(room_id)

run_as_background_process(
desc="sync_partial_state_room", func=_sync_partial_state_room_wrapper
)

async def _sync_partial_state_room(
self,
initial_destination: Optional[str],
Expand Down