Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Clean up Notifier.on_new_room_event code path #8288

Merged
merged 8 commits into from Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8288.misc
@@ -0,0 +1 @@
Refactor notifier code to correctly use the max event stream position.
3 changes: 0 additions & 3 deletions synapse/handlers/federation.py
Expand Up @@ -128,7 +128,6 @@ def __init__(self, hs):
self.keyring = hs.get_keyring()
self.action_generator = hs.get_action_generator()
self.is_mine_id = hs.is_mine_id
self.pusher_pool = hs.get_pusherpool()
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
self._message_handler = hs.get_message_handler()
Expand Down Expand Up @@ -2939,8 +2938,6 @@ async def _notify_persisted_event(
event, event_stream_id, max_stream_id, extra_users=extra_users
)

await self.pusher_pool.on_new_notifications(max_stream_id)

async def _clean_room_for_join(self, room_id: str) -> None:
"""Called to clean up any data in DB for a given room, ready for the
server to join the room.
Expand Down
4 changes: 0 additions & 4 deletions synapse/handlers/message.py
Expand Up @@ -387,8 +387,6 @@ def __init__(self, hs: "HomeServer"):
# This is only used to get at ratelimit function, and maybe_kick_guest_users
self.base_handler = BaseHandler(hs)

self.pusher_pool = hs.get_pusherpool()

# We arbitrarily limit concurrent event creation for a room to 5.
# This is to stop us from diverging history *too* much.
self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
Expand Down Expand Up @@ -1145,8 +1143,6 @@ def is_inviter_member_event(e):
# If there's an expiry timestamp on the event, schedule its expiry.
self._message_handler.maybe_schedule_expiry(event)

await self.pusher_pool.on_new_notifications(max_stream_id)

def _notify():
try:
self.notifier.on_new_room_event(
Expand Down
57 changes: 38 additions & 19 deletions synapse/notifier.py
Expand Up @@ -198,6 +198,7 @@ def __init__(self, hs: "synapse.server.HomeServer"):

self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
self._pusher_pool = hs.get_pusherpool()

self.federation_sender = None
if hs.should_send_federation():
Expand Down Expand Up @@ -274,42 +275,60 @@ def _notify_pending_new_room_events(self, max_room_stream_id: int):
"""
pending = self.pending_new_room_events
self.pending_new_room_events = []

users = set() # type: Set[Union[str, UserID]]
clokep marked this conversation as resolved.
Show resolved Hide resolved
rooms = set() # type: Set[str]

for room_stream_id, event, extra_users in pending:
if room_stream_id > max_room_stream_id:
self.pending_new_room_events.append(
(room_stream_id, event, extra_users)
)
else:
self._on_new_room_event(event, room_stream_id, extra_users)
if (
event.type == EventTypes.Member
and event.membership == Membership.JOIN
):
self._user_joined_room(event.state_key, event.room_id)

users.update(extra_users)
rooms.add(event.room_id)

if users or rooms:
self.on_new_event(
"room_key", max_room_stream_id, users=extra_users, rooms=[event.room_id]
)
clokep marked this conversation as resolved.
Show resolved Hide resolved
self._on_updated_room_token(max_room_stream_id)

def _on_updated_room_token(self, max_room_stream_id: int):
"""Poke services that might care that the room position has been
updated.
"""

def _on_new_room_event(
self,
event: EventBase,
room_stream_id: int,
extra_users: Collection[Union[str, UserID]] = [],
):
"""Notify any user streams that are interested in this room event"""
# poke any interested application service.
run_as_background_process(
"notify_app_services", self._notify_app_services, room_stream_id
"notify_app_services", self._notify_app_services, max_room_stream_id
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
)

if self.federation_sender:
self.federation_sender.notify_new_events(room_stream_id)

if event.type == EventTypes.Member and event.membership == Membership.JOIN:
self._user_joined_room(event.state_key, event.room_id)

self.on_new_event(
"room_key", room_stream_id, users=extra_users, rooms=[event.room_id]
run_as_background_process(
"_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_id
)

async def _notify_app_services(self, room_stream_id: int):
if self.federation_sender:
self.federation_sender.notify_new_events(max_room_stream_id)

async def _notify_app_services(self, max_room_stream_id: int):
try:
await self.appservice_handler.notify_interested_services(room_stream_id)
await self.appservice_handler.notify_interested_services(max_room_stream_id)
except Exception:
logger.exception("Error notifying application services of event")

async def _notify_pusher_pool(self, max_room_stream_id: int):
try:
await self._pusher_pool.on_new_notifications(max_room_stream_id)
except Exception:
logger.exception("Error pusher pool of event")

def on_new_event(
self,
stream_key: str,
Expand Down
2 changes: 1 addition & 1 deletion synapse/push/pusherpool.py
Expand Up @@ -184,7 +184,7 @@ async def remove_pushers_by_access_token(self, user_id, access_tokens):
)
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])

async def on_new_notifications(self, max_stream_id):
async def on_new_notifications(self, max_stream_id: int):
if not self.pushers:
# nothing to do here.
return
Expand Down
4 changes: 0 additions & 4 deletions synapse/replication/tcp/client.py
Expand Up @@ -98,7 +98,6 @@ class ReplicationDataHandler:

def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
self.pusher_pool = hs.get_pusherpool()
self.notifier = hs.get_notifier()
self._reactor = hs.get_reactor()
self._clock = hs.get_clock()
Expand Down Expand Up @@ -154,9 +153,6 @@ async def on_rdata(
max_token = self.store.get_room_max_stream_ordering()
self.notifier.on_new_room_event(event, token, max_token, extra_users)

max_token = self.store.get_room_max_stream_ordering()
await self.pusher_pool.on_new_notifications(max_token)

# Notify any waiting deferreds. The list is ordered by position so we
# just iterate through the list until we reach a position that is
# greater than the received row position.
Expand Down