Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove legacy code of single user device resync api #15418

Merged
merged 10 commits into from Apr 21, 2023
1 change: 1 addition & 0 deletions changelog.d/15418.misc
@@ -0,0 +1 @@
Always use multi-user device resync replication endpoints.
58 changes: 5 additions & 53 deletions synapse/handlers/device.py
Expand Up @@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from http import HTTPStatus
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -919,12 +918,8 @@ class DeviceListWorkerUpdater:
def __init__(self, hs: "HomeServer"):
from synapse.replication.http.devices import (
ReplicationMultiUserDevicesResyncRestServlet,
ReplicationUserDevicesResyncRestServlet,
)

self._user_device_resync_client = (
ReplicationUserDevicesResyncRestServlet.make_client(hs)
)
self._multi_user_device_resync_client = (
ReplicationMultiUserDevicesResyncRestServlet.make_client(hs)
)
Expand All @@ -946,37 +941,7 @@ async def multi_user_device_resync(
# Shortcut empty requests
return {}

try:
return await self._multi_user_device_resync_client(user_ids=user_ids)
except SynapseError as err:
if not (
err.code == HTTPStatus.NOT_FOUND and err.errcode == Codes.UNRECOGNIZED
):
raise

# Fall back to single requests
result: Dict[str, Optional[JsonDict]] = {}
for user_id in user_ids:
result[user_id] = await self._user_device_resync_client(user_id=user_id)
return result

async def user_device_resync(
self, user_id: str, mark_failed_as_stale: bool = True
) -> Optional[JsonDict]:
"""Fetches all devices for a user and updates the device cache with them.

Args:
user_id: The user's id whose device_list will be updated.
mark_failed_as_stale: Whether to mark the user's device list as stale
if the attempt to resync failed.
Returns:
A dict with device info as under the "devices" in the result of this
request:
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
None when we weren't able to fetch the device info for some reason,
e.g. due to a connection problem.
"""
return (await self.multi_user_device_resync([user_id]))[user_id]
return await self._multi_user_device_resync_client(user_ids=user_ids)


class DeviceListUpdater(DeviceListWorkerUpdater):
Expand Down Expand Up @@ -1129,7 +1094,7 @@ async def _handle_device_updates(self, user_id: str) -> None:
)

if resync:
await self.user_device_resync(user_id)
await self.multi_user_device_resync([user_id])
else:
# Simply update the single device, since we know that is the only
# change (because of the single prev_id matching the current cache)
Expand Down Expand Up @@ -1196,10 +1161,9 @@ async def _maybe_retry_device_resync(self) -> None:
for user_id in need_resync:
try:
# Try to resync the current user's devices list.
Copy link
Contributor

@clokep clokep Apr 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably add a TODO comment here that says to resync multiple users at once...

But even if we did that here it seems like the next layer down doesn't do true batching. 🤷

result = await self.user_device_resync(
user_id=user_id,
mark_failed_as_stale=False,
)
result = (await self.multi_user_device_resync([user_id], False))[
user_id
]

# user_device_resync only returns a result if it managed to
# successfully resync and update the database. Updating the table
Expand Down Expand Up @@ -1258,18 +1222,6 @@ async def multi_user_device_resync(

return result

async def user_device_resync(
self, user_id: str, mark_failed_as_stale: bool = True
) -> Optional[JsonDict]:
result, failed = await self._user_device_resync_returning_failed(user_id)

if failed and mark_failed_as_stale:
# Mark the remote user's device list as stale so we know we need to retry
# it later.
await self.store.mark_remote_users_device_caches_as_stale((user_id,))

return result

async def _user_device_resync_returning_failed(
self, user_id: str
) -> Tuple[Optional[JsonDict], bool]:
Expand Down
14 changes: 8 additions & 6 deletions synapse/handlers/devicemessage.py
Expand Up @@ -25,7 +25,9 @@
log_kv,
set_tag,
)
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.replication.http.devices import (
ReplicationMultiUserDevicesResyncRestServlet,
)
from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id
from synapse.util import json_encoder
from synapse.util.stringutils import random_string
Expand Down Expand Up @@ -71,12 +73,12 @@ def __init__(self, hs: "HomeServer"):
# sync. We do all device list resyncing on the master instance, so if
# we're on a worker we hit the device resync replication API.
if hs.config.worker.worker_app is None:
self._user_device_resync = (
hs.get_device_handler().device_list_updater.user_device_resync
self._multi_user_device_resync = (
hs.get_device_handler().device_list_updater.multi_user_device_resync
)
else:
self._user_device_resync = (
ReplicationUserDevicesResyncRestServlet.make_client(hs)
self._multi_user_device_resync = (
ReplicationMultiUserDevicesResyncRestServlet.make_client(hs)
)

# a rate limiter for room key requests. The keys are
Expand Down Expand Up @@ -198,7 +200,7 @@ async def _check_for_unknown_devices(
await self.store.mark_remote_users_device_caches_as_stale((sender_user_id,))

# Immediately attempt a resync in the background
run_in_background(self._user_device_resync, user_id=sender_user_id)
run_in_background(self._multi_user_device_resync, user_ids=[sender_user_id])

async def send_device_message(
self,
Expand Down
14 changes: 9 additions & 5 deletions synapse/handlers/federation_event.py
Expand Up @@ -70,7 +70,9 @@
trace,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.replication.http.devices import (
ReplicationMultiUserDevicesResyncRestServlet,
)
from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet,
)
Expand Down Expand Up @@ -167,8 +169,8 @@ def __init__(self, hs: "HomeServer"):

self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
if hs.config.worker.worker_app:
self._user_device_resync = (
ReplicationUserDevicesResyncRestServlet.make_client(hs)
self._multi_user_device_resync = (
ReplicationMultiUserDevicesResyncRestServlet.make_client(hs)
Comment on lines +172 to +173
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to suggest renaming all of these things to remove the multi, that can be done as a follow-up though.

)
else:
self._device_list_updater = hs.get_device_handler().device_list_updater
Expand Down Expand Up @@ -1487,9 +1489,11 @@ async def _resync_device(self, sender: str) -> None:

# Immediately attempt a resync in the background
if self._config.worker.worker_app:
await self._user_device_resync(user_id=sender)
await self._multi_user_device_resync(user_ids=[sender])
else:
await self._device_list_updater.user_device_resync(sender)
await self._device_list_updater.multi_user_device_resync(
user_ids=[sender]
)
except Exception:
logger.exception("Failed to resync device for %s", sender)

Expand Down
57 changes: 0 additions & 57 deletions synapse/replication/http/devices.py
Expand Up @@ -28,62 +28,6 @@
logger = logging.getLogger(__name__)


class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
"""Ask master to resync the device list for a user by contacting their
server.

This must happen on master so that the results can be correctly cached in
the database and streamed to workers.

Request format:

POST /_synapse/replication/user_device_resync/:user_id

{}

Response is equivalent to ` /_matrix/federation/v1/user/devices/:user_id`
response, e.g.:

{
"user_id": "@alice:example.org",
"devices": [
{
"device_id": "JLAFKJWSCS",
"keys": { ... },
"device_display_name": "Alice's Mobile Phone"
}
]
}
"""

NAME = "user_device_resync"
PATH_ARGS = ("user_id",)
CACHE = False

def __init__(self, hs: "HomeServer"):
super().__init__(hs)

from synapse.handlers.device import DeviceHandler

handler = hs.get_device_handler()
assert isinstance(handler, DeviceHandler)
self.device_list_updater = handler.device_list_updater

self.store = hs.get_datastores().main
self.clock = hs.get_clock()

@staticmethod
async def _serialize_payload(user_id: str) -> JsonDict: # type: ignore[override]
return {}

async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict, user_id: str
) -> Tuple[int, Optional[JsonDict]]:
user_devices = await self.device_list_updater.user_device_resync(user_id)

return 200, user_devices


class ReplicationMultiUserDevicesResyncRestServlet(ReplicationEndpoint):
"""Ask master to resync the device list for multiple users from the same
remote server by contacting their server.
Expand Down Expand Up @@ -216,6 +160,5 @@ async def _handle_request( # type: ignore[override]


def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReplicationUserDevicesResyncRestServlet(hs).register(http_server)
ReplicationMultiUserDevicesResyncRestServlet(hs).register(http_server)
ReplicationUploadKeysForUserRestServlet(hs).register(http_server)
4 changes: 3 additions & 1 deletion tests/test_federation.py
Expand Up @@ -267,7 +267,9 @@ def test_cross_signing_keys_retry(self) -> None:
# Resync the device list.
device_handler = self.hs.get_device_handler()
self.get_success(
device_handler.device_list_updater.user_device_resync(remote_user_id),
device_handler.device_list_updater.multi_user_device_resync(
[remote_user_id]
),
)

# Retrieve the cross-signing keys for this user.
Expand Down