Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Clean-up distributor code #8216

Merged
merged 8 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8216.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Simplify the distributor code to avoid unnecessary work.
4 changes: 0 additions & 4 deletions synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ class EventStreamHandler(BaseHandler):
def __init__(self, hs: "HomeServer"):
super(EventStreamHandler, self).__init__(hs)

self.distributor = hs.get_distributor()
self.distributor.declare("started_user_eventstream")
self.distributor.declare("stopped_user_eventstream")

self.clock = hs.get_clock()

self.notifier = hs.get_notifier()
Expand Down
43 changes: 1 addition & 42 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
ReplicationFederationSendEventsRestServlet,
ReplicationStoreRoomOnInviteRestServlet,
)
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
Expand All @@ -80,7 +79,6 @@
get_domain_from_id,
)
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.distributor import user_joined_room
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_server
Expand Down Expand Up @@ -141,9 +139,6 @@ def __init__(self, hs):
self._replication = hs.get_replication_data_handler()

self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client(
hs
)
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
hs
)
Expand Down Expand Up @@ -704,31 +699,10 @@ async def _process_received_pdu(
logger.debug("[%s %s] Processing event: %s", room_id, event_id, event)

try:
context = await self._handle_new_event(origin, event, state=state)
await self._handle_new_event(origin, event, state=state)
except AuthError as e:
raise FederationError("ERROR", e.code, e.msg, affected=event.event_id)

if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
# Only fire user_joined_room if the user has acutally
# joined the room. Don't bother if the user is just
# changing their profile info.
newly_joined = True

prev_state_ids = await context.get_prev_state_ids()

prev_state_id = prev_state_ids.get((event.type, event.state_key))
if prev_state_id:
prev_state = await self.store.get_event(
prev_state_id, allow_none=True
)
if prev_state and prev_state.membership == Membership.JOIN:
newly_joined = False

if newly_joined:
user = UserID.from_string(event.state_key)
await self.user_joined_room(user, room_id)

# For encrypted messages we check that we know about the sending device,
# if we don't then we mark the device cache for that user as stale.
if event.type == EventTypes.Encrypted:
Expand Down Expand Up @@ -1550,11 +1524,6 @@ async def on_send_join_request(self, origin, pdu):
event.signatures,
)

if event.type == EventTypes.Member:
if event.content["membership"] == Membership.JOIN:
user = UserID.from_string(event.state_key)
await self.user_joined_room(user, event.room_id)

prev_state_ids = await context.get_prev_state_ids()

state_ids = list(prev_state_ids.values())
Expand Down Expand Up @@ -2984,16 +2953,6 @@ async def _clean_room_for_join(self, room_id: str) -> None:
else:
await self.store.clean_room_for_join(room_id)

async def user_joined_room(self, user: UserID, room_id: str) -> None:
"""Called when a new user has joined the room
"""
if self.config.worker_app:
await self._notify_user_membership_change(
room_id=room_id, user_id=user.to_string(), change="joined"
)
else:
user_joined_room(self.distributor, user, room_id)

async def get_room_complexity(
self, remote_room_hosts: List[str], room_id: str
) -> Optional[dict]:
Expand Down
42 changes: 4 additions & 38 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from synapse.storage.roommember import RoomsForUser
from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
from synapse.util.distributor import user_left_room

from ._base import BaseHandler

Expand Down Expand Up @@ -148,17 +148,6 @@ async def remote_reject_invite(
"""
raise NotImplementedError()

@abc.abstractmethod
async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Notifies distributor on master process that the user has joined the
room.

Args:
target
room_id
"""
raise NotImplementedError()

@abc.abstractmethod
async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Notifies distributor on master process that the user has left the
Expand Down Expand Up @@ -221,7 +210,6 @@ async def _local_membership_update(

prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)

newly_joined = False
if event.membership == Membership.JOIN:
newly_joined = True
if prev_member_event_id:
Expand All @@ -246,12 +234,7 @@ async def _local_membership_update(
requester, event, context, extra_users=[target], ratelimit=ratelimit,
)

if event.membership == Membership.JOIN and newly_joined:
# Only fire user_joined_room if the user has actually joined the
# room. Don't bother if the user is just changing their profile
# info.
await self._user_joined_room(target, room_id)
elif event.membership == Membership.LEAVE:
if event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
if prev_member_event.membership == Membership.JOIN:
Expand Down Expand Up @@ -726,17 +709,7 @@ async def send_membership_event(
(EventTypes.Member, event.state_key), None
)

if event.membership == Membership.JOIN:
# Only fire user_joined_room if the user has actually joined the
# room. Don't bother if the user is just changing their profile
# info.
newly_joined = True
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
newly_joined = prev_member_event.membership != Membership.JOIN
if newly_joined:
await self._user_joined_room(target_user, room_id)
elif event.membership == Membership.LEAVE:
if event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
if prev_member_event.membership == Membership.JOIN:
Expand Down Expand Up @@ -1002,10 +975,9 @@ async def _is_server_notice_room(self, room_id: str) -> bool:

class RoomMemberMasterHandler(RoomMemberHandler):
def __init__(self, hs):
super(RoomMemberMasterHandler, self).__init__(hs)
super().__init__(hs)

self.distributor = hs.get_distributor()
self.distributor.declare("user_joined_room")
self.distributor.declare("user_left_room")

async def _is_remote_room_too_complex(
Expand Down Expand Up @@ -1085,7 +1057,6 @@ async def _remote_join(
event_id, stream_id = await self.federation_handler.do_invite_join(
remote_room_hosts, room_id, user.to_string(), content
)
await self._user_joined_room(user, room_id)

# Check the room we just joined wasn't too large, if we didn't fetch the
# complexity of it before.
Expand Down Expand Up @@ -1228,11 +1199,6 @@ async def _locally_reject_invite(
)
return event.event_id, stream_id

async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_joined_room
"""
user_joined_room(self.distributor, target, room_id)

async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_left_room
"""
Expand Down
9 changes: 0 additions & 9 deletions synapse/handlers/room_member_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ async def _remote_join(
content=content,
)

await self._user_joined_room(user, room_id)

return ret["event_id"], ret["stream_id"]

async def remote_reject_invite(
Expand All @@ -81,13 +79,6 @@ async def remote_reject_invite(
)
return ret["event_id"], ret["stream_id"]

async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_joined_room
"""
await self._notify_change_client(
user_id=target.to_string(), room_id=room_id, change="joined"
)

async def _user_left_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_left_room
"""
Expand Down
10 changes: 4 additions & 6 deletions synapse/replication/http/membership.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import JsonDict, Requester, UserID
from synapse.util.distributor import user_joined_room, user_left_room
from synapse.util.distributor import user_left_room

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -181,9 +181,9 @@ async def _serialize_payload(room_id, user_id, change):
Args:
room_id (str)
user_id (str)
change (str): Either "joined" or "left"
change (str): "left"
"""
assert change in ("joined", "left")
assert change == "left"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make more sense to update ReplicationUserJoinedLeftRoomRestServlet to be ReplicationUserLeftRoomRestServlet?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shrugface. Maybe. Probably better to do some more complete refactoring to sort out the user_left_room case.


return {}

Expand All @@ -192,9 +192,7 @@ def _handle_request(self, request, room_id, user_id, change):

user = UserID.from_string(user_id)

if change == "joined":
user_joined_room(self.distributor, user, room_id)
elif change == "left":
if change == "left":
user_left_room(self.distributor, user, room_id)
else:
raise Exception("Unrecognized change: %r", change)
Expand Down
50 changes: 8 additions & 42 deletions synapse/util/distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import logging

from twisted.internet import defer
from twisted.internet.defer import Deferred, fail, succeed
from twisted.python import failure

from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
Expand All @@ -29,11 +27,6 @@ def user_left_room(distributor, user, room_id):
distributor.fire("user_left_room", user=user, room_id=room_id)


# XXX: this is no longer used. We should probably kill it.
def user_joined_room(distributor, user, room_id):
distributor.fire("user_joined_room", user=user, room_id=room_id)


class Distributor:
"""A central dispatch point for loosely-connected pieces of code to
register, observe, and fire signals.
Expand Down Expand Up @@ -81,28 +74,6 @@ def fire(self, name, *args, **kwargs):
run_as_background_process(name, self.signals[name].fire, *args, **kwargs)


def maybeAwaitableDeferred(f, *args, **kw):
"""
Invoke a function that may or may not return a Deferred or an Awaitable.

This is a modified version of twisted.internet.defer.maybeDeferred.
"""
try:
result = f(*args, **kw)
except Exception:
return fail(failure.Failure(captureVars=Deferred.debug))

if isinstance(result, Deferred):
return result
# Handle the additional case of an awaitable being returned.
elif inspect.isawaitable(result):
return defer.ensureDeferred(result)
elif isinstance(result, failure.Failure):
return fail(result)
else:
return succeed(result)


class Signal:
"""A Signal is a dispatch point that stores a list of callables as
observers of it.
Expand Down Expand Up @@ -132,22 +103,17 @@ def fire(self, *args, **kwargs):
Returns a Deferred that will complete when all the observers have
completed."""

def do(observer):
def eb(failure):
async def do(observer):
try:
result = observer(*args, **kwargs)
if inspect.isawaitable(result):
result = await result
return result
except Exception as e:
logger.warning(
"%s signal observer %s failed: %r",
self.name,
observer,
failure,
exc_info=(
failure.type,
failure.value,
failure.getTracebackObject(),
),
"%s signal observer %s failed: %r", self.name, observer, e,
)

return maybeAwaitableDeferred(observer, *args, **kwargs).addErrback(eb)

deferreds = [run_in_background(do, o) for o in self.observers]

return make_deferred_yieldable(
Expand Down