Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix presence timeouts when synchrotron restarts. #6212

Merged
merged 3 commits into from Oct 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6212.bugfix
@@ -0,0 +1 @@
Fix bug where presence would not get timed out correctly if a synchrotron worker is used and restarted.
13 changes: 9 additions & 4 deletions synapse/handlers/presence.py
Expand Up @@ -24,6 +24,7 @@

import logging
from contextlib import contextmanager
from typing import Dict, Set

from six import iteritems, itervalues

Expand Down Expand Up @@ -179,8 +180,9 @@ def __init__(self, hs):
# we assume that all the sync requests on that process have stopped.
# Stored as a dict from process_id to set of user_id, and a dict of
# process_id to millisecond timestamp last updated.
self.external_process_to_current_syncs = {}
self.external_process_last_updated_ms = {}
self.external_process_to_current_syncs = {} # type: Dict[int, Set[str]]
self.external_process_last_updated_ms = {} # type: Dict[int, int]

self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")

# Start a LoopingCall in 30s that fires every 5s.
Expand Down Expand Up @@ -349,10 +351,13 @@ def _handle_timeouts(self):
if now - last_update > EXTERNAL_PROCESS_EXPIRY
]
for process_id in expired_process_ids:
# For each expired process drop tracking info and check the users
# that were syncing on that process to see if they need to be timed
# out.
users_to_check.update(
self.external_process_last_updated_ms.pop(process_id, ())
self.external_process_to_current_syncs.pop(process_id, ())
)
self.external_process_last_update.pop(process_id)
self.external_process_last_updated_ms.pop(process_id)

states = [
self.user_to_current_state.get(user_id, UserPresenceState.default(user_id))
Expand Down
39 changes: 39 additions & 0 deletions tests/handlers/test_presence.py
Expand Up @@ -22,6 +22,7 @@
from synapse.events import room_version_to_event_format
from synapse.events.builder import EventBuilder
from synapse.handlers.presence import (
EXTERNAL_PROCESS_EXPIRY,
FEDERATION_PING_INTERVAL,
FEDERATION_TIMEOUT,
IDLE_TIMER,
Expand Down Expand Up @@ -413,6 +414,44 @@ def test_last_active(self):
self.assertEquals(state, new_state)


class PresenceHandlerTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, hs):
self.presence_handler = hs.get_presence_handler()
self.clock = hs.get_clock()

def test_external_process_timeout(self):
"""Test that if an external process doesn't update the records for a while
we time out their syncing users presence.
"""
process_id = 1
user_id = "@test:server"

# Notify handler that a user is now syncing.
self.get_success(
self.presence_handler.update_external_syncs_row(
process_id, user_id, True, self.clock.time_msec()
)
)

# Check that if we wait a while without telling the handler the user has
# stopped syncing that their presence state doesn't get timed out.
self.reactor.advance(EXTERNAL_PROCESS_EXPIRY / 2)

state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, PresenceState.ONLINE)

# Check that if the external process timeout fires, then the syncing
# user gets timed out
self.reactor.advance(EXTERNAL_PROCESS_EXPIRY)

state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, PresenceState.OFFLINE)


class PresenceJoinTestCase(unittest.HomeserverTestCase):
"""Tests remote servers get told about presence of users in the room when
they join and when new local users join.
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Expand Up @@ -161,7 +161,7 @@ basepython = python3.7
skip_install = True
deps =
{[base]deps}
mypy
mypy==0.730
mypy-zope
env =
MYPYPATH = stubs/
Expand Down