From dd3e92655ab28ca1ed3e47c45502673edc239b56 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 17 May 2022 18:21:10 +0100 Subject: [PATCH 01/17] Raise an InvalidResponseError instead of an Exception when /state_ids returns an invalid response Signed-off-by: Sean Quah --- synapse/federation/federation_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 17eff60909a2..8eb330af51cf 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -416,7 +416,7 @@ async def get_room_state_ids( if not isinstance(state_event_ids, list) or not isinstance( auth_event_ids, list ): - raise Exception("invalid response from /state_ids") + raise InvalidResponseError("invalid response from /state_ids") return state_event_ids, auth_event_ids From 2d63f45596662d717373f4d569ff8f239341898a Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Thu, 19 May 2022 16:12:50 +0100 Subject: [PATCH 02/17] Document that `FederationError`s get raised from `FederationEventHandler.update_state_for_partial_event` and `_resolve_state_at_missing_prevs` Signed-off-by: Sean Quah --- synapse/handlers/federation_event.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 761caa04b726..f2d0a3e39e2a 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -485,6 +485,9 @@ async def update_state_for_partial_state_event( Args: destination: server to request full state from event: partial-state event to be de-partial-stated + + Raises: + FederationError if we fail to request state from the remote server. """ logger.info("Updating state for %s", event.event_id) with nested_logging_context(suffix=event.event_id): @@ -792,6 +795,10 @@ async def _resolve_state_at_missing_prevs( Returns: if we already had all the prev events, `None`. Otherwise, returns a list of the events in the state at `event`. + + Raises: + FederationError if we fail to get the state from the remote server after any + missing `prev_event`s. """ room_id = event.room_id event_id = event.event_id From 83a93103258c118621c289e5bdc7fffdaacdf052 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 17 May 2022 18:22:39 +0100 Subject: [PATCH 03/17] Try other destinations when resyncing the state of a partial-state room Signed-off-by: Sean Quah --- synapse/handlers/federation.py | 70 ++++++++++++++++++++++++++++++---- 1 file changed, 62 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index be5099b507f6..11a46b268a87 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -20,7 +20,16 @@ import logging from enum import Enum from http import HTTPStatus -from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union +from typing import ( + TYPE_CHECKING, + Collection, + Dict, + Iterable, + List, + Optional, + Tuple, + Union, +) import attr from signedjson.key import decode_verify_key_bytes @@ -34,6 +43,7 @@ CodeMessageException, Codes, FederationDeniedError, + FederationError, HttpResponseException, NotFoundError, RequestSendFailed, @@ -545,6 +555,7 @@ async def do_invite_join( desc="sync_partial_state_room", func=self._sync_partial_state_room, destination=origin, + destinations=ret.servers_in_room, room_id=room_id, ) @@ -1443,13 +1454,16 @@ async def get_room_complexity( async def _sync_partial_state_room( self, - destination: str, + destination: Optional[str], + destinations: Collection[str], room_id: str, ) -> None: """Background process to resync the state of a partial-state room Args: - destination: homeserver to pull the state from + destination: the initial homeserver to pull the state from + destinations: other homeservers to try to pull the state from, if + `destination` is unavailable room_id: room to be resynced """ @@ -1460,9 +1474,19 @@ async def _sync_partial_state_room( # TODO(faster_joins): what happens if we leave the room during a resync? if we # really leave, that might mean we have difficulty getting the room state over # federation. - # - # TODO(faster_joins): try other destinations if the one we have fails + # Make an infinite iterator of destinations to try. Once we find a working + # destination, we'll stick with it until it flakes. + if destination is not None: + # Move `destination` to the front of the list. + destinations = list(destinations) + if destination in destinations: + destinations.remove(destination) + destinations = [destination] + destinations + destination_iter = itertools.cycle(destinations) + + # `destination` is now the current remote homeserver we're pulling from. + destination = next(destination_iter) logger.info("Syncing state for room %s via %s", room_id, destination) # we work through the queue in order of increasing stream ordering. @@ -1498,6 +1522,36 @@ async def _sync_partial_state_room( allow_rejected=True, ) for event in events: - await self._federation_event_handler.update_state_for_partial_state_event( - destination, event - ) + for attempt in range(len(destinations)): + try: + await self._federation_event_handler.update_state_for_partial_state_event( + destination, event + ) + break + except FederationError as e: + if attempt == len(destinations) - 1: + # We have tried every remote server for this event. Give up. + logger.error( + "Failed to get state for %s at %s from %s because %s, " + "giving up!", + room_id, + event, + destination, + e, + ) + raise + + # Try the next remote server. + logger.info( + "Failed to get state for %s at %s from %s because %s", + room_id, + event, + destination, + e, + ) + destination = next(destination_iter) + logger.info( + "Syncing state for room %s via %s instead", + room_id, + destination, + ) From 808ad43b0ef87859b202e0254417ed3782ccfb6b Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 20 May 2022 13:14:36 +0100 Subject: [PATCH 04/17] Add newsfile --- changelog.d/12812.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/12812.misc diff --git a/changelog.d/12812.misc b/changelog.d/12812.misc new file mode 100644 index 000000000000..53cb936a022b --- /dev/null +++ b/changelog.d/12812.misc @@ -0,0 +1 @@ +Try other homeservers when re-syncing state for rooms with partial state. From 0cb986490158bfb23b9ec1bd196a58272b7a447b Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 17 May 2022 18:23:40 +0100 Subject: [PATCH 05/17] Resume state re-syncing after a Synapse restart Signed-off-by: Sean Quah --- synapse/handlers/federation.py | 24 +++++++++++++++++++-- synapse/storage/databases/main/room.py | 29 ++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 11a46b268a87..d52d350fd438 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -168,6 +168,11 @@ def __init__(self, hs: "HomeServer"): self.third_party_event_rules = hs.get_third_party_event_rules() + if not hs.config.worker.worker_app: + run_as_background_process( + "resume_sync_partial_state_room", self._resume_sync_partial_state_room + ) + async def maybe_backfill( self, room_id: str, current_depth: int, limit: int ) -> bool: @@ -469,6 +474,8 @@ async def do_invite_join( """ # TODO: We should be able to call this on workers, but the upgrading of # room stuff after join currently doesn't work on workers. + # TODO: Before we relax this condition, we need to allow re-syncing of + # partial room state to happen on workers. assert self.config.worker.worker_app is None logger.debug("Joining %s to %s", joinee, room_id) @@ -549,8 +556,6 @@ async def do_invite_join( if ret.partial_state: # Kick off the process of asynchronously fetching the state for this # room. - # - # TODO(faster_joins): pick this up again on restart run_as_background_process( desc="sync_partial_state_room", func=self._sync_partial_state_room, @@ -1452,6 +1457,20 @@ async def get_room_complexity( # well. return None + async def _resume_sync_partial_state_room(self) -> None: + """Resumes resyncing of all partial-state rooms after a restart.""" + assert not self.config.worker.worker_app + + partial_state_rooms = await self.store.get_partial_state_rooms_and_servers() + for room_id, servers_in_room in partial_state_rooms.items(): + run_as_background_process( + desc="sync_partial_state_room", + func=self._sync_partial_state_room, + destination=None, + destinations=servers_in_room, + room_id=room_id, + ) + async def _sync_partial_state_room( self, destination: Optional[str], @@ -1466,6 +1485,7 @@ async def _sync_partial_state_room( `destination` is unavailable room_id: room to be resynced """ + assert not self.config.worker.worker_app # TODO(faster_joins): do we need to lock to avoid races? What happens if other # worker processes kick off a resync in parallel? Perhaps we should just elect diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 87e9482c6054..305abc08292f 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -23,6 +23,7 @@ Collection, Dict, List, + Mapping, Optional, Tuple, Union, @@ -1077,6 +1078,34 @@ def get_rooms_for_retention_period_in_range_txn( get_rooms_for_retention_period_in_range_txn, ) + async def get_partial_state_rooms_and_servers( + self, + ) -> Mapping[str, Collection[str]]: + """Get all rooms containing events with partial state, and the servers known + to be in the room. + + Returns: + A dictionary of rooms with partial state, with room IDs as keys and + lists of servers in rooms as values. + """ + room_servers: Dict[str, List[str]] = {} + + rows = await self.db_pool.simple_select_list( + "partial_state_rooms_servers", + keyvalues={}, + retcols=("room_id", "server_name"), + desc="get_partial_state_rooms", + ) + + for row in rows: + room_id = row["room_id"] + server_name = row["server_name"] + if room_id not in room_servers: + room_servers[room_id] = [] + room_servers[room_id].append(server_name) + + return room_servers + async def clear_partial_state_room(self, room_id: str) -> bool: # this can race with incoming events, so we watch out for FK errors. # TODO(faster_joins): this still doesn't completely fix the race, since the persist process From 10b29adab3928977ad3b19f654f0ff2efc519939 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 17 May 2022 18:29:41 +0100 Subject: [PATCH 06/17] Add newsfile --- changelog.d/12813.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/12813.misc diff --git a/changelog.d/12813.misc b/changelog.d/12813.misc new file mode 100644 index 000000000000..8be9f3eb44b5 --- /dev/null +++ b/changelog.d/12813.misc @@ -0,0 +1 @@ +Resume state re-syncing for rooms with partial state after a Synapse restart. From c713c47c4f0d51eff5ce7ab09b98e359909a0754 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 27 May 2022 12:40:22 +0100 Subject: [PATCH 07/17] Update docstring for `get_room_state_ids` --- synapse/federation/federation_client.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 8eb330af51cf..b60b8983ea41 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -405,6 +405,9 @@ async def get_room_state_ids( Returns: a tuple of (state event_ids, auth event_ids) + + Raises: + InvalidResponseError: if fields in the response have the wrong type. """ result = await self.transport_layer.get_room_state_ids( destination, room_id, event_id=event_id From ab3ccf769897801e384ec1f745f1d02d56a801dc Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 27 May 2022 12:41:29 +0100 Subject: [PATCH 08/17] Rename _sync_partial_state_room parameters --- synapse/handlers/federation.py | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 75f51b71de50..9b0c664274b6 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1461,16 +1461,16 @@ async def get_room_complexity( async def _sync_partial_state_room( self, - destination: Optional[str], - destinations: Collection[str], + initial_destination: Optional[str], + other_destinations: Collection[str], room_id: str, ) -> None: """Background process to resync the state of a partial-state room Args: - destination: the initial homeserver to pull the state from - destinations: other homeservers to try to pull the state from, if - `destination` is unavailable + initial_destination: the initial homeserver to pull the state from + other_destinations: other homeservers to try to pull the state from, if + `initial_destination` is unavailable room_id: room to be resynced """ @@ -1484,15 +1484,17 @@ async def _sync_partial_state_room( # Make an infinite iterator of destinations to try. Once we find a working # destination, we'll stick with it until it flakes. - if destination is not None: - # Move `destination` to the front of the list. - destinations = list(destinations) - if destination in destinations: - destinations.remove(destination) - destinations = [destination] + destinations - destination_iter = itertools.cycle(destinations) - - # `destination` is now the current remote homeserver we're pulling from. + if initial_destination is not None: + # Move `initial_destination` to the front of the list. + destinations = list(other_destinations) + if initial_destination in destinations: + destinations.remove(initial_destination) + destinations = [initial_destination] + destinations + destination_iter = itertools.cycle(destinations) + else: + destination_iter = itertools.cycle(other_destinations) + + # `destination` is the current remote homeserver we're pulling from. destination = next(destination_iter) logger.info("Syncing state for room %s via %s", room_id, destination) From 55460dacf80ecdc0d0995b743e8a238681045532 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 27 May 2022 12:42:17 +0100 Subject: [PATCH 09/17] Complain if 0 destinations have been provided --- synapse/handlers/federation.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 9b0c664274b6..265cf19ce750 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1482,6 +1482,9 @@ async def _sync_partial_state_room( # really leave, that might mean we have difficulty getting the room state over # federation. + if initial_destination is None and len(other_destinations) == 0: + raise ValueError(f"Cannot resync state of {room_id}: no destinations provided") + # Make an infinite iterator of destinations to try. Once we find a working # destination, we'll stick with it until it flakes. if initial_destination is not None: From 972a02134e177b11c027bd446c78ff45c7362cbb Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 27 May 2022 12:42:47 +0100 Subject: [PATCH 10/17] Refactor loop to be infinite --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 265cf19ce750..3abc5d9502f9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1534,7 +1534,7 @@ async def _sync_partial_state_room( allow_rejected=True, ) for event in events: - for attempt in range(len(destinations)): + for attempt in itertools.count(): try: await self._federation_event_handler.update_state_for_partial_state_event( destination, event From ed290b2396688f6f22ddc9c722c8cf661b4146cf Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 27 May 2022 12:42:56 +0100 Subject: [PATCH 11/17] Add TODOs --- synapse/handlers/federation.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3abc5d9502f9..b55f17dbfef3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1481,6 +1481,10 @@ async def _sync_partial_state_room( # TODO(faster_joins): what happens if we leave the room during a resync? if we # really leave, that might mean we have difficulty getting the room state over # federation. + # + # TODO(faster_joins): we need some way of prioritising which homeservers in + # `other_destinations` to try first, otherwise we'll spend ages trying dead + # homeservers for large rooms. if initial_destination is None and len(other_destinations) == 0: raise ValueError(f"Cannot resync state of {room_id}: no destinations provided") @@ -1543,6 +1547,11 @@ async def _sync_partial_state_room( except FederationError as e: if attempt == len(destinations) - 1: # We have tried every remote server for this event. Give up. + # TODO(faster_joins) giving up isn't the right thing to do + # if there's a temporary network outage. retrying + # indefinitely is also not the right thing to do if we can + # reach all homeservers and they all claim they don't have + # the state we want. logger.error( "Failed to get state for %s at %s from %s because %s, " "giving up!", From 3efe1dff42689698525e0f1768824b0c4f536894 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 27 May 2022 12:47:22 +0100 Subject: [PATCH 12/17] Run linter --- synapse/handlers/federation.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b55f17dbfef3..736fc32b6b35 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1487,7 +1487,9 @@ async def _sync_partial_state_room( # homeservers for large rooms. if initial_destination is None and len(other_destinations) == 0: - raise ValueError(f"Cannot resync state of {room_id}: no destinations provided") + raise ValueError( + f"Cannot resync state of {room_id}: no destinations provided" + ) # Make an infinite iterator of destinations to try. Once we find a working # destination, we'll stick with it until it flakes. From dd6bf3fd223ec5ed1a78444a3f53b3ea5aa48e12 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 27 May 2022 13:03:54 +0100 Subject: [PATCH 13/17] Rename _sync_partial_state_room parameters, part 2 --- synapse/handlers/federation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 736fc32b6b35..97040f953360 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -555,8 +555,8 @@ async def do_invite_join( run_as_background_process( desc="sync_partial_state_room", func=self._sync_partial_state_room, - destination=origin, - destinations=ret.servers_in_room, + initial_destination=origin, + other_destinations=ret.servers_in_room, room_id=room_id, ) From ad48b30d7672c56e6b8aef4579f7f93fc562c74e Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Fri, 27 May 2022 13:42:55 +0100 Subject: [PATCH 14/17] Update synapse/handlers/federation.py Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/handlers/federation.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d52d350fd438..3ae1aa3b167c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -168,6 +168,9 @@ def __init__(self, hs: "HomeServer"): self.third_party_event_rules = hs.get_third_party_event_rules() + # if this is the main process, fire off a background process to resume + # any partial-state-resync operations which were in flight when we + # were shut down. if not hs.config.worker.worker_app: run_as_background_process( "resume_sync_partial_state_room", self._resume_sync_partial_state_room From 58741a3ebcbce73df5bae1fba43fef6b334cfd35 Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Fri, 27 May 2022 13:43:00 +0100 Subject: [PATCH 15/17] Update synapse/storage/databases/main/room.py Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/storage/databases/main/room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 305abc08292f..a8f0a65a167c 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1092,7 +1092,7 @@ async def get_partial_state_rooms_and_servers( rows = await self.db_pool.simple_select_list( "partial_state_rooms_servers", - keyvalues={}, + keyvalues=None, retcols=("room_id", "server_name"), desc="get_partial_state_rooms", ) From 0680f1cd10c6b5559c9966b58de5e97c85e3103d Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Fri, 27 May 2022 13:43:06 +0100 Subject: [PATCH 16/17] Update synapse/storage/databases/main/room.py Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/storage/databases/main/room.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index a8f0a65a167c..8486e56ee995 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1100,9 +1100,7 @@ async def get_partial_state_rooms_and_servers( for row in rows: room_id = row["room_id"] server_name = row["server_name"] - if room_id not in room_servers: - room_servers[room_id] = [] - room_servers[room_id].append(server_name) + room_servers.setdefault(room_id, []).append(server_name) return room_servers From 6172d86b10f6ad0adfef71f886281084a9a5ff81 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 27 May 2022 13:56:54 +0100 Subject: [PATCH 17/17] Fix argument names for _sync_partial_state_room --- synapse/handlers/federation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index fc46321b2097..508c50c7bb81 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1476,8 +1476,8 @@ async def _resume_sync_partial_state_room(self) -> None: run_as_background_process( desc="sync_partial_state_room", func=self._sync_partial_state_room, - destination=None, - destinations=servers_in_room, + initial_destination=None, + other_destinations=servers_in_room, room_id=room_id, )