Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Track presence state per-device and amalgamate to a user state.
Browse files Browse the repository at this point in the history
Tracks presence on an individual per-device basis and combines the
per-device state into a per-user state. This should help in situations
where a user has two devices with conflicting status (e.g. one is
syncing with unavailable and one is syncing with online).

The tie-breaking is done by priority:

    BUSY > ONLINE > UNAVAILABLE > OFFLINE
  • Loading branch information
clokep committed Aug 4, 2023
1 parent d98a43d commit 7c8b047
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 29 deletions.
2 changes: 2 additions & 0 deletions changelog.d/16066.bugfix
@@ -0,0 +1,2 @@
Fix a long-standing bug where multi-device accounts could cause high load due to presence.

10 changes: 10 additions & 0 deletions synapse/api/presence.py
Expand Up @@ -20,6 +20,16 @@
from synapse.types import JsonDict


@attr.s(slots=True, auto_attribs=True)
class UserDevicePresenceState:
user_id: str
device_id: Optional[str]
state: str
last_active_ts: int
last_user_sync_ts: int
status_msg: Optional[str]


@attr.s(slots=True, frozen=True, auto_attribs=True)
class UserPresenceState:
"""Represents the current presence state of the user.
Expand Down
1 change: 1 addition & 0 deletions synapse/handlers/events.py
Expand Up @@ -67,6 +67,7 @@ async def get_stream(

context = await presence_handler.user_syncing(
requester.user.to_string(),
requester.device_id,
affect_presence=affect_presence,
presence_state=PresenceState.ONLINE,
)
Expand Down
77 changes: 71 additions & 6 deletions synapse/handlers/presence.py
Expand Up @@ -49,7 +49,7 @@
import synapse.metrics
from synapse.api.constants import EduTypes, EventTypes, Membership, PresenceState
from synapse.api.errors import SynapseError
from synapse.api.presence import UserPresenceState
from synapse.api.presence import UserDevicePresenceState, UserPresenceState
from synapse.appservice import ApplicationService
from synapse.events.presence_router import PresenceRouter
from synapse.logging.context import run_in_background
Expand Down Expand Up @@ -150,11 +150,16 @@ def __init__(self, hs: "HomeServer"):
self._busy_presence_enabled = hs.config.experimental.msc3026_enabled

active_presence = self.store.take_presence_startup_info()
# The combine status across all user devices.
self.user_to_current_state = {state.user_id: state for state in active_presence}

@abc.abstractmethod
async def user_syncing(
self, user_id: str, affect_presence: bool, presence_state: str
self,
user_id: str,
device_id: Optional[str],
affect_presence: bool,
presence_state: str,
) -> ContextManager[None]:
"""Returns a context manager that should surround any stream requests
from the user.
Expand Down Expand Up @@ -241,6 +246,7 @@ async def current_state_for_user(self, user_id: str) -> UserPresenceState:
async def set_state(
self,
target_user: UserID,
device_id: Optional[str],
state: JsonDict,
ignore_status_msg: bool = False,
force_notify: bool = False,
Expand Down Expand Up @@ -368,7 +374,9 @@ async def send_full_presence_to_users(self, user_ids: StrCollection) -> None:
# We set force_notify=True here so that this presence update is guaranteed to
# increment the presence stream ID (which resending the current user's presence
# otherwise would not do).
await self.set_state(UserID.from_string(user_id), state, force_notify=True)
await self.set_state(
UserID.from_string(user_id), None, state, force_notify=True
)

async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool:
raise NotImplementedError(
Expand Down Expand Up @@ -472,7 +480,11 @@ def send_stop_syncing(self) -> None:
self.send_user_sync(user_id, False, last_sync_ms)

async def user_syncing(
self, user_id: str, affect_presence: bool, presence_state: str
self,
user_id: str,
device_id: Optional[str],
affect_presence: bool,
presence_state: str,
) -> ContextManager[None]:
"""Record that a user is syncing.
Expand All @@ -490,7 +502,10 @@ async def user_syncing(
# what the spec wants: see comment in the BasePresenceHandler version
# of this function.
await self.set_state(
UserID.from_string(user_id), {"presence": presence_state}, True
UserID.from_string(user_id),
device_id,
{"presence": presence_state},
True,
)

curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
Expand Down Expand Up @@ -586,6 +601,7 @@ def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
async def set_state(
self,
target_user: UserID,
device_id: Optional[str],
state: JsonDict,
ignore_status_msg: bool = False,
force_notify: bool = False,
Expand Down Expand Up @@ -623,6 +639,7 @@ async def set_state(
await self._set_state_client(
instance_name=self._presence_writer_instance,
user_id=user_id,
device_id=device_id,
state=state,
ignore_status_msg=ignore_status_msg,
force_notify=force_notify,
Expand Down Expand Up @@ -755,6 +772,11 @@ def run_persister() -> Awaitable[None]:
self._event_pos = self.store.get_room_max_stream_ordering()
self._event_processing = False

# The per-device presence state, maps user to devices to per-device presence state.
self.user_to_device_to_current_state: Dict[
str, Dict[Optional[str], UserDevicePresenceState]
] = {}

async def _on_shutdown(self) -> None:
"""Gets called when shutting down. This lets us persist any updates that
we haven't yet persisted, e.g. updates that only changes some internal
Expand Down Expand Up @@ -973,6 +995,7 @@ async def bump_presence_active_time(self, user: UserID) -> None:
async def user_syncing(
self,
user_id: str,
device_id: Optional[str],
affect_presence: bool = True,
presence_state: str = PresenceState.ONLINE,
) -> ContextManager[None]:
Expand All @@ -985,6 +1008,7 @@ async def user_syncing(
Args:
user_id
device_id
affect_presence: If false this function will be a no-op.
Useful for streams that are not associated with an actual
client that is being used by a user.
Expand All @@ -1010,7 +1034,10 @@ async def user_syncing(
# updated always, which is not what the spec calls for, but synapse has done
# this for... forever, I think.
await self.set_state(
UserID.from_string(user_id), {"presence": presence_state}, True
UserID.from_string(user_id),
device_id,
{"presence": presence_state},
True,
)
# Retrieve the new state for the logic below. This should come from the
# in-memory cache.
Expand Down Expand Up @@ -1213,6 +1240,7 @@ async def incoming_presence(self, origin: str, content: JsonDict) -> None:
async def set_state(
self,
target_user: UserID,
device_id: Optional[str],
state: JsonDict,
ignore_status_msg: bool = False,
force_notify: bool = False,
Expand All @@ -1221,6 +1249,7 @@ async def set_state(
Args:
target_user: The ID of the user to set the presence state of.
device_id: The optional device ID.
state: The presence state as a JSON dictionary.
ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
If False, the user's current status will be updated.
Expand Down Expand Up @@ -1249,6 +1278,41 @@ async def set_state(

prev_state = await self.current_state_for_user(user_id)

# Always update the device specific information.
device_state = self.user_to_device_to_current_state.setdefault(
user_id, {}
).setdefault(
device_id,
UserDevicePresenceState(
user_id,
device_id,
presence,
last_active_ts=self.clock.time_msec(),
last_user_sync_ts=self.clock.time_msec(),
status_msg=None,
),
)
device_state.state = presence
if presence:
device_state.status_msg = status_msg
device_state.last_active_ts = self.clock.time_msec()
device_state.last_user_sync_ts = self.clock.time_msec()

# Based on (all) the user's devices calculate the new presence state.
presence_by_priority = {
PresenceState.BUSY: 4,
PresenceState.ONLINE: 3,
PresenceState.UNAVAILABLE: 2,
PresenceState.OFFLINE: 1,
}
for device_state in self.user_to_device_to_current_state[user_id].values():
if (
presence_by_priority[device_state.state]
> presence_by_priority[presence]
):
presence = device_state.state

# The newly updated status as an amalgamation of all the device statuses.
new_fields = {"state": presence}

if not ignore_status_msg:
Expand Down Expand Up @@ -1962,6 +2026,7 @@ def handle_update(
# If the users are ours then we want to set up a bunch of timers
# to time things out.
if is_mine:
# TODO Maybe don't do this if currently active?
if new_state.state == PresenceState.ONLINE:
# Idle timer
wheel_timer.insert(
Expand Down
5 changes: 4 additions & 1 deletion synapse/replication/http/presence.py
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import logging
from typing import TYPE_CHECKING, Tuple
from typing import TYPE_CHECKING, Optional, Tuple

from twisted.web.server import Request

Expand Down Expand Up @@ -95,11 +95,13 @@ def __init__(self, hs: "HomeServer"):
@staticmethod
async def _serialize_payload( # type: ignore[override]
user_id: str,
device_id: Optional[str],
state: JsonDict,
ignore_status_msg: bool = False,
force_notify: bool = False,
) -> JsonDict:
return {
"device_id": device_id,
"state": state,
"ignore_status_msg": ignore_status_msg,
"force_notify": force_notify,
Expand All @@ -110,6 +112,7 @@ async def _handle_request( # type: ignore[override]
) -> Tuple[int, JsonDict]:
await self._presence_handler.set_state(
UserID.from_string(user_id),
content["device_id"],
content["state"],
content["ignore_status_msg"],
content["force_notify"],
Expand Down
2 changes: 1 addition & 1 deletion synapse/rest/client/presence.py
Expand Up @@ -97,7 +97,7 @@ async def on_PUT(
raise SynapseError(400, "Unable to parse state")

if self._use_presence:
await self.presence_handler.set_state(user, state)
await self.presence_handler.set_state(user, requester.device_id, state)

return 200, {}

Expand Down
1 change: 1 addition & 0 deletions synapse/rest/client/sync.py
Expand Up @@ -205,6 +205,7 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:

context = await self.presence_handler.user_syncing(
user.to_string(),
requester.device_id,
affect_presence=affect_presence,
presence_state=set_presence,
)
Expand Down

0 comments on commit 7c8b047

Please sign in to comment.