Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Track device list updates per room. (#12321)
Browse files Browse the repository at this point in the history
This is a first step in dealing with #7721.

The idea is basically that rather than calculating the full set of users a device list update needs to be sent to up front, we instead simply record the rooms the user was in at the time of the change. This will allow a few things:

1. we can defer calculating the set of remote servers that need to be poked about the change; and
2. during `/sync` and `/keys/changes` we can avoid also avoid calculating users who share rooms with other users, and instead just look at the rooms that have changed.

However, care needs to be taken to correctly handle server downgrades. As such this PR writes to both `device_lists_changes_in_room` and the `device_lists_outbound_pokes` table synchronously. In a future release we can then bump the database schema compat version to `69` and then we can assume that the new `device_lists_changes_in_room` exists and is handled.

There is a temporary option to disable writing to `device_lists_outbound_pokes` synchronously, allowing us to test the new code path does work (and by implication upgrading to a future release and downgrading to this one will work correctly).

Note: Ideally we'd do the calculation of room to servers on a worker (e.g. the background worker), but currently only master can write to the `device_list_outbound_pokes` table.
  • Loading branch information
erikjohnston committed Apr 4, 2022
1 parent 80839a4 commit 5c9e39e
Show file tree
Hide file tree
Showing 11 changed files with 390 additions and 47 deletions.
1 change: 1 addition & 0 deletions changelog.d/12321.misc
@@ -0,0 +1 @@
Add ground work for speeding up device list updates for users in large numbers of rooms.
1 change: 1 addition & 0 deletions synapse/_scripts/synapse_port_db.py
Expand Up @@ -97,6 +97,7 @@
"users": ["shadow_banned"],
"e2e_fallback_keys_json": ["used"],
"access_tokens": ["used"],
"device_lists_changes_in_room": ["converted_to_destinations"],
}


Expand Down
8 changes: 8 additions & 0 deletions synapse/config/server.py
Expand Up @@ -680,6 +680,14 @@ def read_config(self, config, **kwargs):
config.get("use_account_validity_in_account_status") or False
)

# This is a temporary option that enables fully using the new
# `device_lists_changes_in_room` without the backwards compat code. This
# is primarily for testing. If enabled the server should *not* be
# downgraded, as it may lead to missing device list updates.
self.use_new_device_lists_changes_in_room = (
config.get("use_new_device_lists_changes_in_room") or False
)

self.rooms_to_exclude_from_sync: List[str] = (
config.get("exclude_rooms_from_sync") or []
)
Expand Down
132 changes: 120 additions & 12 deletions synapse/handlers/device.py
Expand Up @@ -37,7 +37,10 @@
SynapseError,
)
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.types import (
JsonDict,
StreamToken,
Expand Down Expand Up @@ -278,6 +281,22 @@ def __init__(self, hs: "HomeServer"):

hs.get_distributor().observe("user_left_room", self.user_left_room)

# Whether `_handle_new_device_update_async` is currently processing.
self._handle_new_device_update_is_processing = False

# If a new device update may have happened while the loop was
# processing.
self._handle_new_device_update_new_data = False

# On start up check if there are any updates pending.
hs.get_reactor().callWhenRunning(self._handle_new_device_update_async)

# Used to decide if we calculate outbound pokes up front or not. By
# default we do to allow safely downgrading Synapse.
self.use_new_device_lists_changes_in_room = (
hs.config.server.use_new_device_lists_changes_in_room
)

def _check_device_name_length(self, name: Optional[str]) -> None:
"""
Checks whether a device name is longer than the maximum allowed length.
Expand Down Expand Up @@ -469,19 +488,26 @@ async def notify_device_update(
# No changes to notify about, so this is a no-op.
return

users_who_share_room = await self.store.get_users_who_share_room_with_user(
user_id
)
room_ids = await self.store.get_rooms_for_user(user_id)

hosts: Optional[Set[str]] = None
if not self.use_new_device_lists_changes_in_room:
hosts = set()

hosts: Set[str] = set()
if self.hs.is_mine_id(user_id):
hosts.update(get_domain_from_id(u) for u in users_who_share_room)
hosts.discard(self.server_name)
if self.hs.is_mine_id(user_id):
for room_id in room_ids:
joined_users = await self.store.get_users_in_room(room_id)
hosts.update(get_domain_from_id(u) for u in joined_users)

set_tag("target_hosts", hosts)
set_tag("target_hosts", hosts)

hosts.discard(self.server_name)

position = await self.store.add_device_change_to_streams(
user_id, device_ids, list(hosts)
user_id,
device_ids,
hosts=hosts,
room_ids=room_ids,
)

if not position:
Expand All @@ -495,9 +521,12 @@ async def notify_device_update(

# specify the user ID too since the user should always get their own device list
# updates, even if they aren't in any rooms.
users_to_notify = users_who_share_room.union({user_id})
self.notifier.on_new_event(
"device_list_key", position, users={user_id}, rooms=room_ids
)

self.notifier.on_new_event("device_list_key", position, users=users_to_notify)
# We may need to do some processing asynchronously.
self._handle_new_device_update_async()

if hosts:
logger.info(
Expand Down Expand Up @@ -614,6 +643,85 @@ async def rehydrate_device(

return {"success": True}

@wrap_as_background_process("_handle_new_device_update_async")
async def _handle_new_device_update_async(self) -> None:
"""Called when we have a new local device list update that we need to
send out over federation.
This happens in the background so as not to block the original request
that generated the device update.
"""
if self._handle_new_device_update_is_processing:
self._handle_new_device_update_new_data = True
return

self._handle_new_device_update_is_processing = True

# The stream ID we processed previous iteration (if any), and the set of
# hosts we've already poked about for this update. This is so that we
# don't poke the same remote server about the same update repeatedly.
current_stream_id = None
hosts_already_sent_to: Set[str] = set()

try:
while True:
self._handle_new_device_update_new_data = False
rows = await self.store.get_uncoverted_outbound_room_pokes()
if not rows:
# If the DB returned nothing then there is nothing left to
# do, *unless* a new device list update happened during the
# DB query.
if self._handle_new_device_update_new_data:
continue
else:
return

for user_id, device_id, room_id, stream_id, opentracing_context in rows:
joined_user_ids = await self.store.get_users_in_room(room_id)
hosts = {get_domain_from_id(u) for u in joined_user_ids}
hosts.discard(self.server_name)

# Check if we've already sent this update to some hosts
if current_stream_id == stream_id:
hosts -= hosts_already_sent_to

await self.store.add_device_list_outbound_pokes(
user_id=user_id,
device_id=device_id,
room_id=room_id,
stream_id=stream_id,
hosts=hosts,
context=opentracing_context,
)

# Notify replication that we've updated the device list stream.
self.notifier.notify_replication()

if hosts:
logger.info(
"Sending device list update notif for %r to: %r",
user_id,
hosts,
)
for host in hosts:
self.federation_sender.send_device_messages(
host, immediate=False
)
log_kv(
{"message": "sent device update to host", "host": host}
)

if current_stream_id != stream_id:
# Clear the set of hosts we've already sent to as we're
# processing a new update.
hosts_already_sent_to.clear()

hosts_already_sent_to.update(hosts)
current_stream_id = stream_id

finally:
self._handle_new_device_update_is_processing = False


def _update_device_from_client_ips(
device: JsonDict, client_ips: Mapping[Tuple[str, str], Mapping[str, Any]]
Expand Down
1 change: 1 addition & 0 deletions synapse/replication/slave/storage/devices.py
Expand Up @@ -44,6 +44,7 @@ def __init__(
extra_tables=[
("user_signature_stream", "stream_id"),
("device_lists_outbound_pokes", "stream_id"),
("device_lists_changes_in_room", "stream_id"),
],
)
device_list_max = self._device_list_id_gen.get_current_token()
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/__init__.py
Expand Up @@ -146,6 +146,7 @@ def __init__(
extra_tables=[
("user_signature_stream", "stream_id"),
("device_lists_outbound_pokes", "stream_id"),
("device_lists_changes_in_room", "stream_id"),
],
)

Expand Down

0 comments on commit 5c9e39e

Please sign in to comment.