From 6ea256c075cdbffc6b89f42132e27edbfffd0748 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 20 Mar 2020 15:08:56 +0000 Subject: [PATCH 1/3] Fix processing of `groups` stream, and use symbolic names for streams `groups` != `receipts` Introduced in #6964 --- changelog.d/7117.bugfix | 1 + synapse/app/generic_worker.py | 35 ++++++++----- synapse/replication/tcp/streams/__init__.py | 55 ++++++++++++++------- 3 files changed, 61 insertions(+), 30 deletions(-) create mode 100644 changelog.d/7117.bugfix diff --git a/changelog.d/7117.bugfix b/changelog.d/7117.bugfix new file mode 100644 index 000000000000..d3b48abdb783 --- /dev/null +++ b/changelog.d/7117.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in Synapse 1.12.0 which meant that groups updates were not correctly replicated between workers. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index cdc078cf1106..136babe6cefd 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -65,12 +65,23 @@ from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.replication.tcp.streams._base import ( +from synapse.replication.tcp.streams import ( + AccountDataStream, DeviceListsStream, + GroupServerStream, + PresenceStream, + PushersStream, + PushRulesStream, ReceiptsStream, + TagAccountDataStream, ToDeviceStream, + TypingStream, +) +from synapse.replication.tcp.streams.events import ( + EventsStream, + EventsStreamEventRow, + EventsStreamRow, ) -from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow from synapse.rest.admin import register_servlets_for_media_repo from synapse.rest.client.v1 import events from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet @@ -626,7 +637,7 @@ async def process_and_notify(self, stream_name, token, rows): if self.send_handler: self.send_handler.process_replication_rows(stream_name, token, rows) - if stream_name == "events": + if stream_name == EventsStream.NAME: # We shouldn't get multiple rows per token for events stream, so # we don't need to optimise this for multiple rows. for row in rows: @@ -649,44 +660,44 @@ async def process_and_notify(self, stream_name, token, rows): ) await self.pusher_pool.on_new_notifications(token, token) - elif stream_name == "push_rules": + elif stream_name == PushRulesStream.NAME: self.notifier.on_new_event( "push_rules_key", token, users=[row.user_id for row in rows] ) - elif stream_name in ("account_data", "tag_account_data"): + elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME): self.notifier.on_new_event( "account_data_key", token, users=[row.user_id for row in rows] ) - elif stream_name == "receipts": + elif stream_name == ReceiptsStream.NAME: self.notifier.on_new_event( "receipt_key", token, rooms=[row.room_id for row in rows] ) await self.pusher_pool.on_new_receipts( token, token, {row.room_id for row in rows} ) - elif stream_name == "typing": + elif stream_name == TypingStream.NAME: self.typing_handler.process_replication_rows(token, rows) self.notifier.on_new_event( "typing_key", token, rooms=[row.room_id for row in rows] ) - elif stream_name == "to_device": + elif stream_name == ToDeviceStream.NAME: entities = [row.entity for row in rows if row.entity.startswith("@")] if entities: self.notifier.on_new_event("to_device_key", token, users=entities) - elif stream_name == "device_lists": + elif stream_name == DeviceListsStream.NAME: all_room_ids = set() for row in rows: if row.entity.startswith("@"): room_ids = await self.store.get_rooms_for_user(row.entity) all_room_ids.update(room_ids) self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) - elif stream_name == "presence": + elif stream_name == PresenceStream.NAME: await self.presence_handler.process_replication_rows(token, rows) - elif stream_name == "receipts": + elif stream_name == GroupServerStream.NAME: self.notifier.on_new_event( "groups_key", token, users=[row.user_id for row in rows] ) - elif stream_name == "pushers": + elif stream_name == PushersStream.NAME: for row in rows: if row.deleted: self.stop_pusher(row.user_id, row.app_id, row.pushkey) diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py index 5f52264e8432..2b5415ca507f 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py @@ -24,27 +24,46 @@ current_token: The function that returns the current token for the stream update_function: The function that returns a list of updates between two tokens """ - -from . import _base, events, federation +from synapse.replication.tcp.streams._base import * +from synapse.replication.tcp.streams.events import EventsStream +from synapse.replication.tcp.streams.federation import FederationStream STREAMS_MAP = { stream.NAME: stream for stream in ( - events.EventsStream, - _base.BackfillStream, - _base.PresenceStream, - _base.TypingStream, - _base.ReceiptsStream, - _base.PushRulesStream, - _base.PushersStream, - _base.CachesStream, - _base.PublicRoomsStream, - _base.DeviceListsStream, - _base.ToDeviceStream, - federation.FederationStream, - _base.TagAccountDataStream, - _base.AccountDataStream, - _base.GroupServerStream, - _base.UserSignatureStream, + EventsStream, + BackfillStream, + PresenceStream, + TypingStream, + ReceiptsStream, + PushRulesStream, + PushersStream, + CachesStream, + PublicRoomsStream, + DeviceListsStream, + ToDeviceStream, + FederationStream, + TagAccountDataStream, + AccountDataStream, + GroupServerStream, + UserSignatureStream, ) } + +__all__ = [ + "STREAMS_MAP", + "BackfillStream", + "PresenceStream", + "TypingStream", + "ReceiptsStream", + "PushRulesStream", + "PushersStream", + "CachesStream", + "PublicRoomsStream", + "DeviceListsStream", + "ToDeviceStream", + "TagAccountDataStream", + "AccountDataStream", + "GroupServerStream", + "UserSignatureStream", +] From b9dc6e73c95464c4faa7e8e9b0d2ef761bebe090 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 20 Mar 2020 15:17:59 +0000 Subject: [PATCH 2/3] fix changelog --- changelog.d/7117.bugfix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/7117.bugfix b/changelog.d/7117.bugfix index d3b48abdb783..1896d7ad4986 100644 --- a/changelog.d/7117.bugfix +++ b/changelog.d/7117.bugfix @@ -1 +1 @@ -Fix a bug introduced in Synapse 1.12.0 which meant that groups updates were not correctly replicated between workers. +Fix a bug which meant that groups updates were not correctly replicated between workers. From c8583883731451bae1962a84b3b5fa49970a6179 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 20 Mar 2020 15:20:15 +0000 Subject: [PATCH 3/3] pep8? --- synapse/replication/tcp/streams/__init__.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py index 2b5415ca507f..29199f5b466b 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py @@ -24,7 +24,22 @@ current_token: The function that returns the current token for the stream update_function: The function that returns a list of updates between two tokens """ -from synapse.replication.tcp.streams._base import * +from synapse.replication.tcp.streams._base import ( + AccountDataStream, + BackfillStream, + CachesStream, + DeviceListsStream, + GroupServerStream, + PresenceStream, + PublicRoomsStream, + PushersStream, + PushRulesStream, + ReceiptsStream, + TagAccountDataStream, + ToDeviceStream, + TypingStream, + UserSignatureStream, +) from synapse.replication.tcp.streams.events import EventsStream from synapse.replication.tcp.streams.federation import FederationStream