Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Properly handle unknown results for the stream change cache. #14592

Merged
merged 3 commits into from
Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14592.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a long-standing bug where a device list update might not be sent to clients in certain circumstances.
4 changes: 2 additions & 2 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1764,14 +1764,14 @@ async def _filter_all_presence_updates_for_user(
Returns:
A list of presence states for the given user to receive.
"""
updated_users = None
if from_key:
# Only return updates since the last sync
updated_users = self.store.presence_stream_cache.get_all_entities_changed(
from_key
)
if not updated_users:
updated_users = []
Comment on lines -1772 to -1773
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_filter_app_presence_updates_for_user is only called if additional_users_interested_in == PresenceRouter.ALL_USERS, so it seems reasonable here to treated this the same as not having a from_key (and returning info on all users).


if updated_users is not None:
# Get the actual presence update for each change
users_to_state = await self.get_presence_handler().current_state_for_users(
updated_users
Expand Down
33 changes: 19 additions & 14 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -842,12 +842,11 @@ async def get_users_whose_devices_changed(
user_ids, from_key
)

if not user_ids_to_check:
# If an empty set was returned, there's nothing to do.
if user_ids_to_check is not None and not user_ids_to_check:
return set()

def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]:
changes: Set[str] = set()

stream_id_where_clause = "stream_id > ?"
sql_args = [from_key]

Expand All @@ -858,19 +857,25 @@ def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]:
sql = f"""
SELECT DISTINCT user_id FROM device_lists_stream
WHERE {stream_id_where_clause}
AND
"""

# Query device changes with a batch of users at a time
# Assertion for mypy's benefit; see also
# https://mypy.readthedocs.io/en/stable/common_issues.html#narrowing-and-inner-functions
assert user_ids_to_check is not None
for chunk in batch_iter(user_ids_to_check, 100):
clause, args = make_in_list_sql_clause(
txn.database_engine, "user_id", chunk
)
txn.execute(sql + clause, sql_args + args)
changes.update(user_id for user_id, in txn)
# If the stream change cache gave us no information, fetch *all*
# users between the stream IDs.
if user_ids_to_check is None:
txn.execute(sql, sql_args)
return {user_id for user_id, in txn}

# Otherwise, fetch changes for the given users.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to additionally query the database if user_ids_check is non-None? Didn't the stream change cache give us the needed info?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't think we need to do that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit more complicated than initially thought as the database query is what is used to cap things at a max stream token (i.e. we only check the cache for if things have changed after from_key, but the changes might have been before or after to_key -- the cache doesn't know).

else:
changes: Set[str] = set()

# Query device changes with a batch of users at a time
for chunk in batch_iter(user_ids_to_check, 100):
clause, args = make_in_list_sql_clause(
txn.database_engine, "user_id", chunk
)
txn.execute(sql + " AND " + clause, sql_args + args)
changes.update(user_id for user_id, in txn)

return changes

Expand Down