Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Track device list updates per room. #12321

Merged
merged 30 commits into from Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b5a9c6b
Operate on room IDs
erikjohnston Mar 25, 2022
6fa639e
Don't copy the hosts list
erikjohnston Mar 28, 2022
ecf98b9
Write to a new `device_lists_changes_in_room` table.
erikjohnston Mar 28, 2022
c5dd83f
Convert to doing everything in a transaction
erikjohnston Mar 28, 2022
bb44214
Track if we've calculated remote hosts
erikjohnston Mar 28, 2022
98fceb3
We actually don't want stream_id to be unique
erikjohnston Mar 28, 2022
eda0e64
Handle room pokes that haven't been converted to outbound pokes
erikjohnston Mar 29, 2022
c7790ab
Deduplicate outbound pokes
erikjohnston Mar 29, 2022
6e9b31a
Add a config option that allows using new code path
erikjohnston Mar 29, 2022
3b2ab93
Add tests for new code path
erikjohnston Mar 29, 2022
f8af30f
Newsfile
erikjohnston Mar 29, 2022
7266580
Fix tests
erikjohnston Mar 29, 2022
f24b70b
Merge remote-tracking branch 'origin/develop' into erikj/device_list_…
erikjohnston Mar 31, 2022
8bd8ee2
Update synapse/storage/schema/main/delta/69/01device_list_oubound_by_…
erikjohnston Apr 4, 2022
56f0913
Update synapse/storage/schema/main/delta/69/01device_list_oubound_by_…
erikjohnston Apr 4, 2022
90d41a0
Encode opentracing context just once.
erikjohnston Apr 4, 2022
c470a12
Rename var
erikjohnston Apr 4, 2022
d030062
Remove `if not room_ids` check.
erikjohnston Apr 4, 2022
ad5d46b
Add unique index
erikjohnston Apr 4, 2022
d5031b0
Note lack of foreign key constraint
erikjohnston Apr 4, 2022
28dacc8
Add comment about stream_id duplicates
erikjohnston Apr 4, 2022
f48527f
Update synapse_port_db
erikjohnston Apr 4, 2022
bd45f19
Inequality the wrong way round
erikjohnston Apr 4, 2022
dee8f55
Add note about 'num_stream_ids'
erikjohnston Apr 4, 2022
3574541
Merge remote-tracking branch 'origin/develop' into erikj/device_list_…
erikjohnston Apr 4, 2022
cf04f1a
Use different stream IDs for device_list_outbound_pokes
erikjohnston Apr 4, 2022
89e10d7
Correctly order device list stream updates
erikjohnston Apr 4, 2022
e54d2d4
Wake up replication after adding otubound pokes
erikjohnston Apr 4, 2022
7d79dee
Apply suggestions from code review
erikjohnston Apr 4, 2022
b61c5c7
Remove get_users_who_share_room_with_user stub in test
erikjohnston Apr 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12321.misc
@@ -0,0 +1 @@
Add ground work for speeding up device list updates for users in large numbers of rooms.
1 change: 1 addition & 0 deletions synapse/_scripts/synapse_port_db.py
Expand Up @@ -97,6 +97,7 @@
"users": ["shadow_banned"],
"e2e_fallback_keys_json": ["used"],
"access_tokens": ["used"],
"device_lists_changes_in_room": ["converted_to_destinations"],
}


Expand Down
8 changes: 8 additions & 0 deletions synapse/config/server.py
Expand Up @@ -680,6 +680,14 @@ def read_config(self, config, **kwargs):
config.get("use_account_validity_in_account_status") or False
)

# This is a temporary option that enables fully using the new
# `device_lists_changes_in_room` without the backwards compat code. This
# is primarily for testing. If enabled the server should *not* be
# downgraded, as it may lead to missing device list updates.
self.use_new_device_lists_changes_in_room = (
config.get("use_new_device_lists_changes_in_room") or False
)

self.rooms_to_exclude_from_sync: List[str] = (
config.get("exclude_rooms_from_sync") or []
)
Expand Down
132 changes: 120 additions & 12 deletions synapse/handlers/device.py
Expand Up @@ -37,7 +37,10 @@
SynapseError,
)
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.types import (
JsonDict,
StreamToken,
Expand Down Expand Up @@ -278,6 +281,22 @@ def __init__(self, hs: "HomeServer"):

hs.get_distributor().observe("user_left_room", self.user_left_room)

# Whether `_handle_new_device_update_async` is currently processing.
self._handle_new_device_update_is_processing = False

# If a new device update may have happened while the loop was
# processing.
self._handle_new_device_update_new_data = False

# On start up check if there are any updates pending.
hs.get_reactor().callWhenRunning(self._handle_new_device_update_async)

# Used to decide if we calculate outbound pokes up front or not. By
# default we do to allow safely downgrading Synapse.
self.use_new_device_lists_changes_in_room = (
hs.config.server.use_new_device_lists_changes_in_room
)

def _check_device_name_length(self, name: Optional[str]) -> None:
"""
Checks whether a device name is longer than the maximum allowed length.
Expand Down Expand Up @@ -469,19 +488,26 @@ async def notify_device_update(
# No changes to notify about, so this is a no-op.
return

users_who_share_room = await self.store.get_users_who_share_room_with_user(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FederationSenderDevicesTestCases still stubs this out. Is that still necessary?

user_id
)
room_ids = await self.store.get_rooms_for_user(user_id)

hosts: Optional[Set[str]] = None
if not self.use_new_device_lists_changes_in_room:
hosts = set()

hosts: Set[str] = set()
if self.hs.is_mine_id(user_id):
hosts.update(get_domain_from_id(u) for u in users_who_share_room)
hosts.discard(self.server_name)
if self.hs.is_mine_id(user_id):
for room_id in room_ids:
joined_users = await self.store.get_users_in_room(room_id)
hosts.update(get_domain_from_id(u) for u in joined_users)

set_tag("target_hosts", hosts)
set_tag("target_hosts", hosts)

hosts.discard(self.server_name)

position = await self.store.add_device_change_to_streams(
user_id, device_ids, list(hosts)
user_id,
device_ids,
hosts=hosts,
room_ids=room_ids,
)

if not position:
Expand All @@ -495,9 +521,12 @@ async def notify_device_update(

# specify the user ID too since the user should always get their own device list
# updates, even if they aren't in any rooms.
users_to_notify = users_who_share_room.union({user_id})
self.notifier.on_new_event(
"device_list_key", position, users={user_id}, rooms=room_ids
)

self.notifier.on_new_event("device_list_key", position, users=users_to_notify)
# We may need to do some processing asynchronously.
self._handle_new_device_update_async()

if hosts:
logger.info(
Expand Down Expand Up @@ -614,6 +643,85 @@ async def rehydrate_device(

return {"success": True}

@wrap_as_background_process("_handle_new_device_update_async")
async def _handle_new_device_update_async(self) -> None:
"""Called when we have a new local device list update that we need to
send out over federation.

This happens in the background so as not to block the original request
that generated the device update.
"""
if self._handle_new_device_update_is_processing:
self._handle_new_device_update_new_data = True
return

self._handle_new_device_update_is_processing = True

# The stream ID we processed previous iteration (if any), and the set of
# hosts we've already poked about for this update. This is so that we
# don't poke the same remote server about the same update repeatedly.
current_stream_id = None
hosts_already_sent_to: Set[str] = set()

try:
while True:
self._handle_new_device_update_new_data = False
rows = await self.store.get_uncoverted_outbound_room_pokes()
if not rows:
# If the DB returned nothing then there is nothing left to
# do, *unless* a new device list update happened during the
# DB query.
if self._handle_new_device_update_new_data:
continue
else:
return

for user_id, device_id, room_id, stream_id, opentracing_context in rows:
joined_user_ids = await self.store.get_users_in_room(room_id)
hosts = {get_domain_from_id(u) for u in joined_user_ids}
hosts.discard(self.server_name)

# Check if we've already sent this update to some hosts
if current_stream_id == stream_id:
hosts -= hosts_already_sent_to

await self.store.add_device_list_outbound_pokes(
user_id=user_id,
device_id=device_id,
room_id=room_id,
stream_id=stream_id,
hosts=hosts,
context=opentracing_context,
)

# Notify replication that we've updated the device list stream.
self.notifier.notify_replication()

if hosts:
logger.info(
"Sending device list update notif for %r to: %r",
user_id,
hosts,
)
for host in hosts:
self.federation_sender.send_device_messages(
host, immediate=False
)
log_kv(
{"message": "sent device update to host", "host": host}
)

if current_stream_id != stream_id:
# Clear the set of hosts we've already sent to as we're
# processing a new update.
hosts_already_sent_to.clear()

hosts_already_sent_to.update(hosts)
current_stream_id = stream_id

finally:
self._handle_new_device_update_is_processing = False


def _update_device_from_client_ips(
device: JsonDict, client_ips: Mapping[Tuple[str, str], Mapping[str, Any]]
Expand Down
1 change: 1 addition & 0 deletions synapse/replication/slave/storage/devices.py
Expand Up @@ -44,6 +44,7 @@ def __init__(
extra_tables=[
("user_signature_stream", "stream_id"),
("device_lists_outbound_pokes", "stream_id"),
("device_lists_changes_in_room", "stream_id"),
],
)
device_list_max = self._device_list_id_gen.get_current_token()
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/__init__.py
Expand Up @@ -146,6 +146,7 @@ def __init__(
extra_tables=[
("user_signature_stream", "stream_id"),
("device_lists_outbound_pokes", "stream_id"),
("device_lists_changes_in_room", "stream_id"),
],
)

Expand Down