Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Clean up Notifier.on_new_room_event code path #8288

Merged
merged 8 commits into from Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8288.misc
@@ -0,0 +1 @@
Refactor notifier code to correctly use the max event stream position.
3 changes: 0 additions & 3 deletions synapse/handlers/federation.py
Expand Up @@ -128,7 +128,6 @@ def __init__(self, hs):
self.keyring = hs.get_keyring()
self.action_generator = hs.get_action_generator()
self.is_mine_id = hs.is_mine_id
self.pusher_pool = hs.get_pusherpool()
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
self._message_handler = hs.get_message_handler()
Expand Down Expand Up @@ -2939,8 +2938,6 @@ async def _notify_persisted_event(
event, event_stream_id, max_stream_id, extra_users=extra_users
)

await self.pusher_pool.on_new_notifications(max_stream_id)

async def _clean_room_for_join(self, room_id: str) -> None:
"""Called to clean up any data in DB for a given room, ready for the
server to join the room.
Expand Down
4 changes: 0 additions & 4 deletions synapse/handlers/message.py
Expand Up @@ -387,8 +387,6 @@ def __init__(self, hs: "HomeServer"):
# This is only used to get at ratelimit function, and maybe_kick_guest_users
self.base_handler = BaseHandler(hs)

self.pusher_pool = hs.get_pusherpool()

# We arbitrarily limit concurrent event creation for a room to 5.
# This is to stop us from diverging history *too* much.
self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
Expand Down Expand Up @@ -1145,8 +1143,6 @@ def is_inviter_member_event(e):
# If there's an expiry timestamp on the event, schedule its expiry.
self._message_handler.maybe_schedule_expiry(event)

await self.pusher_pool.on_new_notifications(max_stream_id)

def _notify():
try:
self.notifier.on_new_room_event(
Expand Down
62 changes: 39 additions & 23 deletions synapse/notifier.py
Expand Up @@ -25,7 +25,6 @@
Set,
Tuple,
TypeVar,
Union,
)

from prometheus_client import Counter
Expand Down Expand Up @@ -187,7 +186,7 @@ def __init__(self, hs: "synapse.server.HomeServer"):
self.store = hs.get_datastore()
self.pending_new_room_events = (
[]
) # type: List[Tuple[int, EventBase, Collection[Union[str, UserID]]]]
) # type: List[Tuple[int, EventBase, Collection[UserID]]]

# Called when there are new things to stream over replication
self.replication_callbacks = [] # type: List[Callable[[], None]]
Expand All @@ -198,6 +197,7 @@ def __init__(self, hs: "synapse.server.HomeServer"):

self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
self._pusher_pool = hs.get_pusherpool()

self.federation_sender = None
if hs.should_send_federation():
Expand Down Expand Up @@ -247,7 +247,7 @@ def on_new_room_event(
event: EventBase,
room_stream_id: int,
max_room_stream_id: int,
extra_users: Collection[Union[str, UserID]] = [],
extra_users: Collection[UserID] = [],
):
""" Used by handlers to inform the notifier something has happened
in the room, room event wise.
Expand All @@ -274,47 +274,63 @@ def _notify_pending_new_room_events(self, max_room_stream_id: int):
"""
pending = self.pending_new_room_events
self.pending_new_room_events = []

users = set() # type: Set[UserID]
rooms = set() # type: Set[str]

for room_stream_id, event, extra_users in pending:
if room_stream_id > max_room_stream_id:
self.pending_new_room_events.append(
(room_stream_id, event, extra_users)
)
else:
self._on_new_room_event(event, room_stream_id, extra_users)
if (
event.type == EventTypes.Member
and event.membership == Membership.JOIN
):
self._user_joined_room(event.state_key, event.room_id)

users.update(extra_users)
rooms.add(event.room_id)

if users or rooms:
self.on_new_event("room_key", max_room_stream_id, users=users, rooms=rooms)
self._on_updated_room_token(max_room_stream_id)

def _on_updated_room_token(self, max_room_stream_id: int):
"""Poke services that might care that the room position has been
updated.
"""

def _on_new_room_event(
self,
event: EventBase,
room_stream_id: int,
extra_users: Collection[Union[str, UserID]] = [],
):
"""Notify any user streams that are interested in this room event"""
# poke any interested application service.
run_as_background_process(
"notify_app_services", self._notify_app_services, room_stream_id
"_notify_app_services", self._notify_app_services, max_room_stream_id
)

if self.federation_sender:
self.federation_sender.notify_new_events(room_stream_id)

if event.type == EventTypes.Member and event.membership == Membership.JOIN:
self._user_joined_room(event.state_key, event.room_id)

self.on_new_event(
"room_key", room_stream_id, users=extra_users, rooms=[event.room_id]
run_as_background_process(
"_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_id
)

async def _notify_app_services(self, room_stream_id: int):
if self.federation_sender:
self.federation_sender.notify_new_events(max_room_stream_id)

async def _notify_app_services(self, max_room_stream_id: int):
try:
await self.appservice_handler.notify_interested_services(room_stream_id)
await self.appservice_handler.notify_interested_services(max_room_stream_id)
except Exception:
logger.exception("Error notifying application services of event")

async def _notify_pusher_pool(self, max_room_stream_id: int):
try:
await self._pusher_pool.on_new_notifications(max_room_stream_id)
except Exception:
logger.exception("Error pusher pool of event")

def on_new_event(
self,
stream_key: str,
new_token: int,
users: Collection[Union[str, UserID]] = [],
users: Collection[UserID] = [],
rooms: Collection[str] = [],
):
""" Used to inform listeners that something has happened event wise.
Expand Down
2 changes: 1 addition & 1 deletion synapse/push/pusherpool.py
Expand Up @@ -184,7 +184,7 @@ async def remove_pushers_by_access_token(self, user_id, access_tokens):
)
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])

async def on_new_notifications(self, max_stream_id):
async def on_new_notifications(self, max_stream_id: int):
if not self.pushers:
# nothing to do here.
return
Expand Down
9 changes: 3 additions & 6 deletions synapse/replication/tcp/client.py
Expand Up @@ -29,6 +29,7 @@
EventsStreamEventRow,
EventsStreamRow,
)
from synapse.types import UserID
from synapse.util.async_helpers import timeout_deferred
from synapse.util.metrics import Measure

Expand Down Expand Up @@ -98,7 +99,6 @@ class ReplicationDataHandler:

def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
self.pusher_pool = hs.get_pusherpool()
self.notifier = hs.get_notifier()
self._reactor = hs.get_reactor()
self._clock = hs.get_clock()
Expand Down Expand Up @@ -148,15 +148,12 @@ async def on_rdata(
if event.rejected_reason:
continue

extra_users = () # type: Tuple[str, ...]
extra_users = () # type: Tuple[UserID, ...]
if event.type == EventTypes.Member:
extra_users = (event.state_key,)
extra_users = (UserID.from_string(event.state_key),)
max_token = self.store.get_room_max_stream_ordering()
self.notifier.on_new_room_event(event, token, max_token, extra_users)

max_token = self.store.get_room_max_stream_ordering()
await self.pusher_pool.on_new_notifications(max_token)

# Notify any waiting deferreds. The list is ordered by position so we
# just iterate through the list until we reach a position that is
# greater than the received row position.
Expand Down