From f0c73d84d746bf03f79127b77cbf010fb21a60ee Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Sat, 30 Oct 2021 15:38:37 +0300 Subject: [PATCH 1/4] Fix using MSC2716 batch sending with event persistence workers Signed-off-by: Tulir Asokan --- changelog.d/11220.bugfix | 1 + synapse/app/generic_worker.py | 2 ++ 2 files changed, 3 insertions(+) create mode 100644 changelog.d/11220.bugfix diff --git a/changelog.d/11220.bugfix b/changelog.d/11220.bugfix new file mode 100644 index 000000000000..8baae28d5bfe --- /dev/null +++ b/changelog.d/11220.bugfix @@ -0,0 +1 @@ +Fix using MSC2716 batch sending in combination with event persistence workers. Contributed by @tulir at Beeper. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 51eadf122dba..1a8a5e8f4039 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -112,6 +112,7 @@ ) from synapse.storage.databases.main.presence import PresenceStore from synapse.storage.databases.main.room import RoomWorkerStore +from synapse.storage.databases.main.room_batch import RoomBatchStore from synapse.storage.databases.main.search import SearchStore from synapse.storage.databases.main.session import SessionStore from synapse.storage.databases.main.stats import StatsStore @@ -237,6 +238,7 @@ class GenericWorkerSlavedStore( SlavedEventStore, SlavedKeyStore, RoomWorkerStore, + RoomBatchStore, DirectoryStore, SlavedApplicationServiceStore, SlavedRegistrationStore, From deaf3d8934709caa507b2b6beeafb850a2e10660 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Tue, 2 Nov 2021 12:18:39 +0200 Subject: [PATCH 2/4] Fix creating batch sent member events on worker Signed-off-by: Tulir Asokan --- synapse/handlers/message.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 4a0fccfcc6ac..0676d05b0156 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1382,13 +1382,14 @@ async def persist_and_notify_client_event( if event.type == EventTypes.Member: if event.content["membership"] == Membership.INVITE: - event.unsigned[ - "invite_room_state" - ] = await self.store.get_stripped_room_state_from_event_context( - context, - self.room_prejoin_state_types, - membership_user_id=event.sender, - ) + if not event.internal_metadata.outlier: + event.unsigned[ + "invite_room_state" + ] = await self.store.get_stripped_room_state_from_event_context( + context, + self.room_prejoin_state_types, + membership_user_id=event.sender, + ) invitee = UserID.from_string(event.state_key) if not self.hs.is_mine(invitee): @@ -1405,12 +1406,13 @@ async def persist_and_notify_client_event( event.signatures.update(returned_invite.signatures) if event.content["membership"] == Membership.KNOCK: - event.unsigned[ - "knock_room_state" - ] = await self.store.get_stripped_room_state_from_event_context( - context, - self.room_prejoin_state_types, - ) + if not event.internal_metadata.outlier: + event.unsigned[ + "knock_room_state" + ] = await self.store.get_stripped_room_state_from_event_context( + context, + self.room_prejoin_state_types, + ) if event.type == EventTypes.Redaction: original_event = await self.store.get_event( From 4153c7be2cdecc4546f5c5cd8645b3ae9e159662 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Thu, 11 Nov 2021 01:17:39 +0200 Subject: [PATCH 3/4] Preserve outlier metadata when serializing EventContext Also revert deaf3d8934709caa507b2b6beeafb850a2e10660 because it's no longer needed Signed-off-by: Tulir Asokan --- synapse/events/snapshot.py | 8 ++++++++ synapse/handlers/message.py | 28 +++++++++++++--------------- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index d7527008c443..aa06ac60cae5 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -112,6 +112,7 @@ class EventContext: _current_state_ids = attr.ib(default=None, type=Optional[StateMap[str]]) _prev_state_ids = attr.ib(default=None, type=Optional[StateMap[str]]) + _outlier = attr.ib(default=False, type=bool) @staticmethod def with_state( @@ -137,6 +138,7 @@ def for_outlier() -> "EventContext": return EventContext( current_state_ids={}, prev_state_ids={}, + outlier=True, ) async def serialize(self, event: EventBase, store: "DataStore") -> JsonDict: @@ -170,6 +172,7 @@ async def serialize(self, event: EventBase, store: "DataStore") -> JsonDict: "prev_group": self.prev_group, "delta_ids": _encode_state_dict(self.delta_ids), "app_service_id": self.app_service.id if self.app_service else None, + "outlier": self._outlier, } @staticmethod @@ -198,6 +201,11 @@ def deserialize(storage: "Storage", input: JsonDict) -> "EventContext": rejected=input["rejected"], ) + if input["outlier"]: + context._prev_state_ids = {} + context._current_state_ids = {} + context._outlier = True + app_service_id = input["app_service_id"] if app_service_id: context.app_service = storage.main.get_app_service_by_id(app_service_id) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 0676d05b0156..4a0fccfcc6ac 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1382,14 +1382,13 @@ async def persist_and_notify_client_event( if event.type == EventTypes.Member: if event.content["membership"] == Membership.INVITE: - if not event.internal_metadata.outlier: - event.unsigned[ - "invite_room_state" - ] = await self.store.get_stripped_room_state_from_event_context( - context, - self.room_prejoin_state_types, - membership_user_id=event.sender, - ) + event.unsigned[ + "invite_room_state" + ] = await self.store.get_stripped_room_state_from_event_context( + context, + self.room_prejoin_state_types, + membership_user_id=event.sender, + ) invitee = UserID.from_string(event.state_key) if not self.hs.is_mine(invitee): @@ -1406,13 +1405,12 @@ async def persist_and_notify_client_event( event.signatures.update(returned_invite.signatures) if event.content["membership"] == Membership.KNOCK: - if not event.internal_metadata.outlier: - event.unsigned[ - "knock_room_state" - ] = await self.store.get_stripped_room_state_from_event_context( - context, - self.room_prejoin_state_types, - ) + event.unsigned[ + "knock_room_state" + ] = await self.store.get_stripped_room_state_from_event_context( + context, + self.room_prejoin_state_types, + ) if event.type == EventTypes.Redaction: original_event = await self.store.get_event( From c0a0add0dab20eaf45431d003c789bdc0dc9d8ef Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Sun, 28 Nov 2021 14:26:06 +0200 Subject: [PATCH 4/4] Set empty dicts in _fill_out_state instead of having separate outlier flag Signed-off-by: Tulir Asokan --- synapse/events/snapshot.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index aa06ac60cae5..f251402ed8f2 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -112,7 +112,6 @@ class EventContext: _current_state_ids = attr.ib(default=None, type=Optional[StateMap[str]]) _prev_state_ids = attr.ib(default=None, type=Optional[StateMap[str]]) - _outlier = attr.ib(default=False, type=bool) @staticmethod def with_state( @@ -138,7 +137,6 @@ def for_outlier() -> "EventContext": return EventContext( current_state_ids={}, prev_state_ids={}, - outlier=True, ) async def serialize(self, event: EventBase, store: "DataStore") -> JsonDict: @@ -172,7 +170,6 @@ async def serialize(self, event: EventBase, store: "DataStore") -> JsonDict: "prev_group": self.prev_group, "delta_ids": _encode_state_dict(self.delta_ids), "app_service_id": self.app_service.id if self.app_service else None, - "outlier": self._outlier, } @staticmethod @@ -201,11 +198,6 @@ def deserialize(storage: "Storage", input: JsonDict) -> "EventContext": rejected=input["rejected"], ) - if input["outlier"]: - context._prev_state_ids = {} - context._current_state_ids = {} - context._outlier = True - app_service_id = input["app_service_id"] if app_service_id: context.app_service = storage.main.get_app_service_by_id(app_service_id) @@ -330,6 +322,11 @@ async def _fill_out_state(self) -> None: attributes by loading from the database. """ if self.state_group is None: + # No state group means the event is an outlier. Usually the state_ids dicts are also + # pre-set to empty dicts, but they get reset when the context is serialized, so set + # them to empty dicts again here. + self._current_state_ids = {} + self._prev_state_ids = {} return current_state_ids = await self._storage.state.get_state_ids_for_group(