From fe586decf44a42e6dde1901e456e4d17d4bcc9a1 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 31 Aug 2020 14:47:06 -0400 Subject: [PATCH 1/6] Remove dead code. --- synapse/handlers/events.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 1924636c4d70..f1709dcf7b20 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -33,15 +33,6 @@ class EventStreamHandler(BaseHandler): def __init__(self, hs): super(EventStreamHandler, self).__init__(hs) - # Count of active streams per user - self._streams_per_user = {} - # Grace timers per user to delay the "stopped" signal - self._stop_timer_per_user = {} - - self.distributor = hs.get_distributor() - self.distributor.declare("started_user_eventstream") - self.distributor.declare("stopped_user_eventstream") - self.clock = hs.get_clock() self.notifier = hs.get_notifier() From 026a25f5e2292914fb765d14670fc52028a2b058 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 31 Aug 2020 14:59:06 -0400 Subject: [PATCH 2/6] Remove user_joined_room from distributor. --- synapse/handlers/federation.py | 43 +------------------------- synapse/handlers/room_member.py | 42 +++---------------------- synapse/handlers/room_member_worker.py | 9 ------ synapse/replication/http/membership.py | 10 +++--- synapse/util/distributor.py | 5 --- 5 files changed, 9 insertions(+), 100 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index bd8efbb768db..906a518185cd 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -69,7 +69,6 @@ ReplicationFederationSendEventsRestServlet, ReplicationStoreRoomOnInviteRestServlet, ) -from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet from synapse.state import StateResolutionStore, resolve_events_with_store from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.types import ( @@ -80,7 +79,6 @@ get_domain_from_id, ) from synapse.util.async_helpers import Linearizer, concurrently_execute -from synapse.util.distributor import user_joined_room from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import shortstr from synapse.visibility import filter_events_for_server @@ -141,9 +139,6 @@ def __init__(self, hs): self._replication = hs.get_replication_data_handler() self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs) - self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client( - hs - ) self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client( hs ) @@ -704,31 +699,10 @@ async def _process_received_pdu( logger.debug("[%s %s] Processing event: %s", room_id, event_id, event) try: - context = await self._handle_new_event(origin, event, state=state) + await self._handle_new_event(origin, event, state=state) except AuthError as e: raise FederationError("ERROR", e.code, e.msg, affected=event.event_id) - if event.type == EventTypes.Member: - if event.membership == Membership.JOIN: - # Only fire user_joined_room if the user has acutally - # joined the room. Don't bother if the user is just - # changing their profile info. - newly_joined = True - - prev_state_ids = await context.get_prev_state_ids() - - prev_state_id = prev_state_ids.get((event.type, event.state_key)) - if prev_state_id: - prev_state = await self.store.get_event( - prev_state_id, allow_none=True - ) - if prev_state and prev_state.membership == Membership.JOIN: - newly_joined = False - - if newly_joined: - user = UserID.from_string(event.state_key) - await self.user_joined_room(user, room_id) - # For encrypted messages we check that we know about the sending device, # if we don't then we mark the device cache for that user as stale. if event.type == EventTypes.Encrypted: @@ -1551,11 +1525,6 @@ async def on_send_join_request(self, origin, pdu): event.signatures, ) - if event.type == EventTypes.Member: - if event.content["membership"] == Membership.JOIN: - user = UserID.from_string(event.state_key) - await self.user_joined_room(user, event.room_id) - prev_state_ids = await context.get_prev_state_ids() state_ids = list(prev_state_ids.values()) @@ -3000,16 +2969,6 @@ async def _clean_room_for_join(self, room_id: str) -> None: else: await self.store.clean_room_for_join(room_id) - async def user_joined_room(self, user: UserID, room_id: str) -> None: - """Called when a new user has joined the room - """ - if self.config.worker_app: - await self._notify_user_membership_change( - room_id=room_id, user_id=user.to_string(), change="joined" - ) - else: - user_joined_room(self.distributor, user, room_id) - async def get_room_complexity( self, remote_room_hosts: List[str], room_id: str ) -> Optional[dict]: diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index e5cb3c2d5c4e..6c83cbfb107b 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -40,7 +40,7 @@ from synapse.storage.roommember import RoomsForUser from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID from synapse.util.async_helpers import Linearizer -from synapse.util.distributor import user_joined_room, user_left_room +from synapse.util.distributor import user_left_room from ._base import BaseHandler @@ -141,17 +141,6 @@ async def remote_reject_invite( """ raise NotImplementedError() - @abc.abstractmethod - async def _user_joined_room(self, target: UserID, room_id: str) -> None: - """Notifies distributor on master process that the user has joined the - room. - - Args: - target - room_id - """ - raise NotImplementedError() - @abc.abstractmethod async def _user_left_room(self, target: UserID, room_id: str) -> None: """Notifies distributor on master process that the user has left the @@ -214,7 +203,6 @@ async def _local_membership_update( prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None) - newly_joined = False if event.membership == Membership.JOIN: newly_joined = True if prev_member_event_id: @@ -239,12 +227,7 @@ async def _local_membership_update( requester, event, context, extra_users=[target], ratelimit=ratelimit, ) - if event.membership == Membership.JOIN and newly_joined: - # Only fire user_joined_room if the user has actually joined the - # room. Don't bother if the user is just changing their profile - # info. - await self._user_joined_room(target, room_id) - elif event.membership == Membership.LEAVE: + if event.membership == Membership.LEAVE: if prev_member_event_id: prev_member_event = await self.store.get_event(prev_member_event_id) if prev_member_event.membership == Membership.JOIN: @@ -719,17 +702,7 @@ async def send_membership_event( (EventTypes.Member, event.state_key), None ) - if event.membership == Membership.JOIN: - # Only fire user_joined_room if the user has actually joined the - # room. Don't bother if the user is just changing their profile - # info. - newly_joined = True - if prev_member_event_id: - prev_member_event = await self.store.get_event(prev_member_event_id) - newly_joined = prev_member_event.membership != Membership.JOIN - if newly_joined: - await self._user_joined_room(target_user, room_id) - elif event.membership == Membership.LEAVE: + if event.membership == Membership.LEAVE: if prev_member_event_id: prev_member_event = await self.store.get_event(prev_member_event_id) if prev_member_event.membership == Membership.JOIN: @@ -995,10 +968,9 @@ async def _is_server_notice_room(self, room_id: str) -> bool: class RoomMemberMasterHandler(RoomMemberHandler): def __init__(self, hs): - super(RoomMemberMasterHandler, self).__init__(hs) + super().__init__(hs) self.distributor = hs.get_distributor() - self.distributor.declare("user_joined_room") self.distributor.declare("user_left_room") async def _is_remote_room_too_complex( @@ -1078,7 +1050,6 @@ async def _remote_join( event_id, stream_id = await self.federation_handler.do_invite_join( remote_room_hosts, room_id, user.to_string(), content ) - await self._user_joined_room(user, room_id) # Check the room we just joined wasn't too large, if we didn't fetch the # complexity of it before. @@ -1221,11 +1192,6 @@ async def _locally_reject_invite( ) return event.event_id, stream_id - async def _user_joined_room(self, target: UserID, room_id: str) -> None: - """Implements RoomMemberHandler._user_joined_room - """ - user_joined_room(self.distributor, target, room_id) - async def _user_left_room(self, target: UserID, room_id: str) -> None: """Implements RoomMemberHandler._user_left_room """ diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py index 897338fd54e2..e7f34737c684 100644 --- a/synapse/handlers/room_member_worker.py +++ b/synapse/handlers/room_member_worker.py @@ -57,8 +57,6 @@ async def _remote_join( content=content, ) - await self._user_joined_room(user, room_id) - return ret["event_id"], ret["stream_id"] async def remote_reject_invite( @@ -81,13 +79,6 @@ async def remote_reject_invite( ) return ret["event_id"], ret["stream_id"] - async def _user_joined_room(self, target: UserID, room_id: str) -> None: - """Implements RoomMemberHandler._user_joined_room - """ - await self._notify_change_client( - user_id=target.to_string(), room_id=room_id, change="joined" - ) - async def _user_left_room(self, target: UserID, room_id: str) -> None: """Implements RoomMemberHandler._user_left_room """ diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index 741329ab5fe7..08095fdf7d2c 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -19,7 +19,7 @@ from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint from synapse.types import JsonDict, Requester, UserID -from synapse.util.distributor import user_joined_room, user_left_room +from synapse.util.distributor import user_left_room if TYPE_CHECKING: from synapse.server import HomeServer @@ -181,9 +181,9 @@ async def _serialize_payload(room_id, user_id, change): Args: room_id (str) user_id (str) - change (str): Either "joined" or "left" + change (str): "left" """ - assert change in ("joined", "left") + assert change == "left" return {} @@ -192,9 +192,7 @@ def _handle_request(self, request, room_id, user_id, change): user = UserID.from_string(user_id) - if change == "joined": - user_joined_room(self.distributor, user, room_id) - elif change == "left": + if change == "left": user_left_room(self.distributor, user, room_id) else: raise Exception("Unrecognized change: %r", change) diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 22a857a30616..1b7cf728f1c6 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -29,11 +29,6 @@ def user_left_room(distributor, user, room_id): distributor.fire("user_left_room", user=user, room_id=room_id) -# XXX: this is no longer used. We should probably kill it. -def user_joined_room(distributor, user, room_id): - distributor.fire("user_joined_room", user=user, room_id=room_id) - - class Distributor(object): """A central dispatch point for loosely-connected pieces of code to register, observe, and fire signals. From 4179b3892feab41fd96ff3f69289460babf387e0 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 31 Aug 2020 15:11:48 -0400 Subject: [PATCH 3/6] Avoid converting to Deferreds for some code. --- synapse/util/distributor.py | 45 +++++++------------------------------ 1 file changed, 8 insertions(+), 37 deletions(-) diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 1b7cf728f1c6..8e955eccf095 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -16,8 +16,6 @@ import logging from twisted.internet import defer -from twisted.internet.defer import Deferred, fail, succeed -from twisted.python import failure from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics.background_process_metrics import run_as_background_process @@ -76,28 +74,6 @@ def fire(self, name, *args, **kwargs): run_as_background_process(name, self.signals[name].fire, *args, **kwargs) -def maybeAwaitableDeferred(f, *args, **kw): - """ - Invoke a function that may or may not return a Deferred or an Awaitable. - - This is a modified version of twisted.internet.defer.maybeDeferred. - """ - try: - result = f(*args, **kw) - except Exception: - return fail(failure.Failure(captureVars=Deferred.debug)) - - if isinstance(result, Deferred): - return result - # Handle the additional case of an awaitable being returned. - elif inspect.isawaitable(result): - return defer.ensureDeferred(result) - elif isinstance(result, failure.Failure): - return fail(result) - else: - return succeed(result) - - class Signal(object): """A Signal is a dispatch point that stores a list of callables as observers of it. @@ -127,22 +103,17 @@ def fire(self, *args, **kwargs): Returns a Deferred that will complete when all the observers have completed.""" - def do(observer): - def eb(failure): + async def do(observer): + try: + result = observer(*args, **kwargs) + if inspect.isawaitable(result): + result = await result + return result + except Exception as e: logger.warning( - "%s signal observer %s failed: %r", - self.name, - observer, - failure, - exc_info=( - failure.type, - failure.value, - failure.getTracebackObject(), - ), + "%s signal observer %s failed: %r", self.name, observer, e, ) - return maybeAwaitableDeferred(observer, *args, **kwargs).addErrback(eb) - deferreds = [run_in_background(do, o) for o in self.observers] return make_deferred_yieldable( From 6c27891edc0b2a5747b3501cf88303ae262da79a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 31 Aug 2020 15:12:52 -0400 Subject: [PATCH 4/6] Do not return a value since nothing uses it. --- synapse/util/distributor.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 8e955eccf095..028f629abd30 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -15,9 +15,7 @@ import inspect import logging -from twisted.internet import defer - -from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.logging.context import run_in_background from synapse.metrics.background_process_metrics import run_as_background_process logger = logging.getLogger(__name__) @@ -99,9 +97,7 @@ def fire(self, *args, **kwargs): """Invokes every callable in the observer list, passing in the args and kwargs. Exceptions thrown by observers are logged but ignored. It is not an error to fire a signal with no observers. - - Returns a Deferred that will complete when all the observers have - completed.""" + """ async def do(observer): try: @@ -114,11 +110,8 @@ async def do(observer): "%s signal observer %s failed: %r", self.name, observer, e, ) - deferreds = [run_in_background(do, o) for o in self.observers] - - return make_deferred_yieldable( - defer.gatherResults(deferreds, consumeErrors=True) - ) + for observer in self.observers: + run_in_background(do, observer) def __repr__(self): return "" % (self.name,) From 5cdd2e7661f391456a96b110b6091e794bd50a74 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 31 Aug 2020 15:29:04 -0400 Subject: [PATCH 5/6] Add changelog. --- changelog.d/8216.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8216.misc diff --git a/changelog.d/8216.misc b/changelog.d/8216.misc new file mode 100644 index 000000000000..b38911b0e582 --- /dev/null +++ b/changelog.d/8216.misc @@ -0,0 +1 @@ +Simplify the distributor code to avoid unnecessary work. From 41ced4c7cf4b67d0ccd05c271be170bc9fc51253 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 9 Sep 2020 11:35:51 -0400 Subject: [PATCH 6/6] Revert "Do not return a value since nothing uses it." This reverts commit 6c27891edc0b2a5747b3501cf88303ae262da79a. Turns out the return value is used by Distributor.fire. --- synapse/util/distributor.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 872d2d358da4..f73e95393cbe 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -15,7 +15,9 @@ import inspect import logging -from synapse.logging.context import run_in_background +from twisted.internet import defer + +from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics.background_process_metrics import run_as_background_process logger = logging.getLogger(__name__) @@ -97,7 +99,9 @@ def fire(self, *args, **kwargs): """Invokes every callable in the observer list, passing in the args and kwargs. Exceptions thrown by observers are logged but ignored. It is not an error to fire a signal with no observers. - """ + + Returns a Deferred that will complete when all the observers have + completed.""" async def do(observer): try: @@ -110,8 +114,11 @@ async def do(observer): "%s signal observer %s failed: %r", self.name, observer, e, ) - for observer in self.observers: - run_in_background(do, observer) + deferreds = [run_in_background(do, o) for o in self.observers] + + return make_deferred_yieldable( + defer.gatherResults(deferreds, consumeErrors=True) + ) def __repr__(self): return "" % (self.name,)