Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Skip unused calculations in sync handler #14908

Merged
merged 19 commits into from Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14908.misc
@@ -0,0 +1 @@
Improve performance of `/sync` in a few situations.
3 changes: 3 additions & 0 deletions synapse/api/filtering.py
Expand Up @@ -283,6 +283,9 @@ async def filter_room_account_data(
await self._room_filter.filter(events)
)

def blocks_all_rooms(self) -> bool:
return self._room_filter.filters_all_rooms()

DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
def blocks_all_presence(self) -> bool:
return (
self._presence_filter.filters_all_types()
Expand Down
258 changes: 133 additions & 125 deletions synapse/handlers/sync.py
Expand Up @@ -1448,41 +1448,67 @@ async def generate_sync_result(
sync_result_builder
)

logger.debug("Fetching room data")

(
newly_joined_rooms,
newly_joined_or_invited_or_knocked_users,
newly_left_rooms,
newly_left_users,
) = await self._generate_sync_entry_for_rooms(
sync_result_builder, account_data_by_room
)

# Presence data is included if the server has it enabled and not filtered out.
include_presence_data = (
include_presence_data = bool(
self.hs_config.server.use_presence
and not sync_config.filter_collection.blocks_all_presence()
)
if include_presence_data:
logger.debug("Fetching presence data")
await self._generate_sync_entry_for_presence(
sync_result_builder,
# Device list updates are sent if a since token is provided.
include_device_list_updates = bool(since_token and since_token.device_list_key)

# If we do not care about the rooms or things which depend on the room
# data (namely presence and device list updates), then we can skip
# this process completely.
device_lists = DeviceListUpdates()
if (
not sync_result_builder.sync_config.filter_collection.blocks_all_rooms()
or include_presence_data
or include_device_list_updates
):
logger.debug("Fetching room data")

# Note that _generate_sync_entry_for_rooms sets sync_result_builder.joined, which
# is used in calculate_user_changes below.
(
newly_joined_rooms,
newly_joined_or_invited_or_knocked_users,
newly_left_rooms,
) = await self._generate_sync_entry_for_rooms(
sync_result_builder, account_data_by_room
)

# Work out which users have joined or left rooms we're in. We use this
# to build the presence and device_list parts of the sync response in
# `_generate_sync_entry_for_presence` and
# `_generate_sync_entry_for_device_list` respectively.
if include_presence_data or include_device_list_updates:
# This uses the sync_result_builder.joined which is set in
# `_generate_sync_entry_for_rooms`, if that didn't find any joined
# rooms for some reason it is a no-op.
(
newly_joined_or_invited_or_knocked_users,
newly_left_users,
) = sync_result_builder.calculate_user_changes()

if include_presence_data:
logger.debug("Fetching presence data")
await self._generate_sync_entry_for_presence(
sync_result_builder,
newly_joined_rooms,
newly_joined_or_invited_or_knocked_users,
)

if include_device_list_updates:
device_lists = await self._generate_sync_entry_for_device_list(
sync_result_builder,
newly_joined_rooms=newly_joined_rooms,
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
newly_left_rooms=newly_left_rooms,
newly_left_users=newly_left_users,
)

logger.debug("Fetching to-device data")
await self._generate_sync_entry_for_to_device(sync_result_builder)

device_lists = await self._generate_sync_entry_for_device_list(
sync_result_builder,
newly_joined_rooms=newly_joined_rooms,
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
newly_left_rooms=newly_left_rooms,
newly_left_users=newly_left_users,
)

logger.debug("Fetching OTK data")
device_id = sync_config.device_id
one_time_keys_count: JsonDict = {}
Expand Down Expand Up @@ -1551,99 +1577,93 @@ async def _generate_sync_entry_for_device_list(

user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
assert since_token is not None

# Take a copy since these fields will be mutated later.
newly_joined_or_invited_or_knocked_users = set(
newly_joined_or_invited_or_knocked_users
)
newly_left_users = set(newly_left_users)

if since_token and since_token.device_list_key:
# We want to figure out what user IDs the client should refetch
# device keys for, and which users we aren't going to track changes
# for anymore.
#
# For the first step we check:
# a. if any users we share a room with have updated their devices,
# and
# b. we also check if we've joined any new rooms, or if a user has
# joined a room we're in.
#
# For the second step we just find any users we no longer share a
# room with by looking at all users that have left a room plus users
# that were in a room we've left.
# We want to figure out what user IDs the client should refetch
# device keys for, and which users we aren't going to track changes
# for anymore.
#
# For the first step we check:
# a. if any users we share a room with have updated their devices,
# and
# b. we also check if we've joined any new rooms, or if a user has
# joined a room we're in.
#
# For the second step we just find any users we no longer share a
# room with by looking at all users that have left a room plus users
# that were in a room we've left.

users_that_have_changed = set()
users_that_have_changed = set()

joined_rooms = sync_result_builder.joined_room_ids
joined_rooms = sync_result_builder.joined_room_ids

# Step 1a, check for changes in devices of users we share a room
# with
#
# We do this in two different ways depending on what we have cached.
# If we already have a list of all the user that have changed since
# the last sync then it's likely more efficient to compare the rooms
# they're in with the rooms the syncing user is in.
#
# If we don't have that info cached then we get all the users that
# share a room with our user and check if those users have changed.
cache_result = self.store.get_cached_device_list_changes(
since_token.device_list_key
)
if cache_result.hit:
changed_users = cache_result.entities

result = await self.store.get_rooms_for_users(changed_users)

for changed_user_id, entries in result.items():
# Check if the changed user shares any rooms with the user,
# or if the changed user is the syncing user (as we always
# want to include device list updates of their own devices).
if user_id == changed_user_id or any(
rid in joined_rooms for rid in entries
):
users_that_have_changed.add(changed_user_id)
else:
users_that_have_changed = (
await self._device_handler.get_device_changes_in_shared_rooms(
user_id,
sync_result_builder.joined_room_ids,
from_token=since_token,
)
)

# Step 1b, check for newly joined rooms
for room_id in newly_joined_rooms:
joined_users = await self.store.get_users_in_room(room_id)
newly_joined_or_invited_or_knocked_users.update(joined_users)
# Step 1a, check for changes in devices of users we share a room
# with
#
# We do this in two different ways depending on what we have cached.
# If we already have a list of all the user that have changed since
# the last sync then it's likely more efficient to compare the rooms
# they're in with the rooms the syncing user is in.
#
# If we don't have that info cached then we get all the users that
# share a room with our user and check if those users have changed.
cache_result = self.store.get_cached_device_list_changes(
since_token.device_list_key
)
if cache_result.hit:
changed_users = cache_result.entities

# TODO: Check that these users are actually new, i.e. either they
# weren't in the previous sync *or* they left and rejoined.
users_that_have_changed.update(newly_joined_or_invited_or_knocked_users)
result = await self.store.get_rooms_for_users(changed_users)

user_signatures_changed = (
await self.store.get_users_whose_signatures_changed(
user_id, since_token.device_list_key
for changed_user_id, entries in result.items():
# Check if the changed user shares any rooms with the user,
# or if the changed user is the syncing user (as we always
# want to include device list updates of their own devices).
if user_id == changed_user_id or any(
rid in joined_rooms for rid in entries
):
users_that_have_changed.add(changed_user_id)
else:
users_that_have_changed = (
await self._device_handler.get_device_changes_in_shared_rooms(
user_id,
sync_result_builder.joined_room_ids,
from_token=since_token,
)
)
users_that_have_changed.update(user_signatures_changed)

# Now find users that we no longer track
for room_id in newly_left_rooms:
left_users = await self.store.get_users_in_room(room_id)
newly_left_users.update(left_users)
# Step 1b, check for newly joined rooms
for room_id in newly_joined_rooms:
joined_users = await self.store.get_users_in_room(room_id)
newly_joined_or_invited_or_knocked_users.update(joined_users)

# Remove any users that we still share a room with.
left_users_rooms = await self.store.get_rooms_for_users(newly_left_users)
for user_id, entries in left_users_rooms.items():
if any(rid in joined_rooms for rid in entries):
newly_left_users.discard(user_id)
# TODO: Check that these users are actually new, i.e. either they
# weren't in the previous sync *or* they left and rejoined.
users_that_have_changed.update(newly_joined_or_invited_or_knocked_users)

return DeviceListUpdates(
changed=users_that_have_changed, left=newly_left_users
)
else:
return DeviceListUpdates()
user_signatures_changed = await self.store.get_users_whose_signatures_changed(
user_id, since_token.device_list_key
)
users_that_have_changed.update(user_signatures_changed)

# Now find users that we no longer track
for room_id in newly_left_rooms:
left_users = await self.store.get_users_in_room(room_id)
newly_left_users.update(left_users)

# Remove any users that we still share a room with.
left_users_rooms = await self.store.get_rooms_for_users(newly_left_users)
for user_id, entries in left_users_rooms.items():
if any(rid in joined_rooms for rid in entries):
newly_left_users.discard(user_id)

return DeviceListUpdates(changed=users_that_have_changed, left=newly_left_users)

@trace
async def _generate_sync_entry_for_to_device(
Expand Down Expand Up @@ -1720,6 +1740,7 @@ async def _generate_sync_entry_for_account_data(
since_token = sync_result_builder.since_token

if since_token and not sync_result_builder.full_state:
# TODO Do not fetch room account data if it will be unused.
(
global_account_data,
account_data_by_room,
Expand All @@ -1736,6 +1757,7 @@ async def _generate_sync_entry_for_account_data(
sync_config.user
)
else:
# TODO Do not fetch room account data if it will be unused.
(
global_account_data,
account_data_by_room,
Expand Down Expand Up @@ -1818,7 +1840,7 @@ async def _generate_sync_entry_for_rooms(
self,
sync_result_builder: "SyncResultBuilder",
account_data_by_room: Dict[str, Dict[str, JsonDict]],
) -> Tuple[AbstractSet[str], AbstractSet[str], AbstractSet[str], AbstractSet[str]]:
) -> Tuple[AbstractSet[str], AbstractSet[str]]:
"""Generates the rooms portion of the sync response. Populates the
`sync_result_builder` with the result.

Expand All @@ -1831,24 +1853,22 @@ async def _generate_sync_entry_for_rooms(
account_data_by_room: Dictionary of per room account data

Returns:
Returns a 4-tuple describing rooms the user has joined or left, and users who've
joined or left rooms any rooms the user is in. This gets used later in
`_generate_sync_entry_for_device_list`.
Returns a 2-tuple describing rooms the user has joined or left.

Its entries are:
- newly_joined_rooms
- newly_joined_or_invited_or_knocked_users
- newly_left_rooms
- newly_left_users
"""

since_token = sync_result_builder.since_token
user_id = sync_result_builder.sync_config.user.to_string()

# 1. Start by fetching all ephemeral events in rooms we've joined (if required).
if (
sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral()
):
block_all_room_ephemeral = (
sync_result_builder.sync_config.filter_collection.blocks_all_rooms()
or sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral()
)
if block_all_room_ephemeral:
ephemeral_by_room: Dict[str, List[JsonDict]] = {}
else:
now_token, ephemeral_by_room = await self.ephemeral_by_room(
Expand All @@ -1870,7 +1890,7 @@ async def _generate_sync_entry_for_rooms(
)
if not tags_by_room:
logger.debug("no-oping sync")
return set(), set(), set(), set()
return set(), set()

# 3. Work out which rooms need reporting in the sync response.
ignored_users = await self.store.ignored_users(user_id)
Expand Down Expand Up @@ -1899,6 +1919,7 @@ async def _generate_sync_entry_for_rooms(
# joined or archived).
async def handle_room_entries(room_entry: "RoomSyncResultBuilder") -> None:
logger.debug("Generating room entry for %s", room_entry.room_id)
# Note that this mutates sync_result_builder.{joined,archived}.
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
await self._generate_room_entry(
sync_result_builder,
room_entry,
Expand All @@ -1915,20 +1936,7 @@ async def handle_room_entries(room_entry: "RoomSyncResultBuilder") -> None:
sync_result_builder.invited.extend(invited)
sync_result_builder.knocked.extend(knocked)

# 5. Work out which users have joined or left rooms we're in. We use this
# to build the device_list part of the sync response in
# `_generate_sync_entry_for_device_list`.
(
newly_joined_or_invited_or_knocked_users,
newly_left_users,
) = sync_result_builder.calculate_user_changes()

return (
set(newly_joined_rooms),
newly_joined_or_invited_or_knocked_users,
set(newly_left_rooms),
newly_left_users,
)
return set(newly_joined_rooms), set(newly_left_rooms)

async def _have_rooms_changed(
self, sync_result_builder: "SyncResultBuilder"
Expand Down