From 16ef562e06f8618ec455e1323d52009649ae1b31 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 8 Aug 2023 15:04:58 -0400 Subject: [PATCH 1/6] Use the device ID when de-duplicating persisted events. --- synapse/storage/databases/main/events_worker.py | 16 ++++++++-------- tests/handlers/test_message.py | 15 ++++----------- 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 7e7648c95112..f6b7187833c5 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -2072,29 +2072,29 @@ async def get_already_persisted_events( """ mapping = {} - txn_id_to_event: Dict[Tuple[str, int, str], str] = {} + txn_id_to_event: Dict[Tuple[str, str, str], str] = {} for event in events: - token_id = getattr(event.internal_metadata, "token_id", None) + device_id = getattr(event.internal_metadata, "device_id", None) txn_id = getattr(event.internal_metadata, "txn_id", None) - if token_id and txn_id: + if device_id and txn_id: # Check if this is a duplicate of an event in the given events. - existing = txn_id_to_event.get((event.room_id, token_id, txn_id)) + existing = txn_id_to_event.get((event.room_id, device_id, txn_id)) if existing: mapping[event.event_id] = existing continue # Check if this is a duplicate of an event we've already # persisted. - existing = await self.get_event_id_from_transaction_id_and_token_id( - event.room_id, event.sender, token_id, txn_id + existing = await self.get_event_id_from_transaction_id_and_device_id( + event.room_id, event.sender, device_id, txn_id ) if existing: mapping[event.event_id] = existing - txn_id_to_event[(event.room_id, token_id, txn_id)] = existing + txn_id_to_event[(event.room_id, device_id, txn_id)] = existing else: - txn_id_to_event[(event.room_id, token_id, txn_id)] = event.event_id + txn_id_to_event[(event.room_id, device_id, txn_id)] = event.event_id return mapping diff --git a/tests/handlers/test_message.py b/tests/handlers/test_message.py index 9691d66b48a0..1c5897c84e49 100644 --- a/tests/handlers/test_message.py +++ b/tests/handlers/test_message.py @@ -46,18 +46,11 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self._persist_event_storage_controller = persistence self.user_id = self.register_user("tester", "foobar") - self.access_token = self.login("tester", "foobar") - self.room_id = self.helper.create_room_as(self.user_id, tok=self.access_token) - - info = self.get_success( - self.hs.get_datastores().main.get_user_by_access_token( - self.access_token, - ) - ) - assert info is not None - self.token_id = info.token_id + device_id = "dev-1" + access_token = self.login("tester", "foobar", device_id=device_id) + self.room_id = self.helper.create_room_as(self.user_id, tok=access_token) - self.requester = create_requester(self.user_id, access_token_id=self.token_id) + self.requester = create_requester(self.user_id, device_id=device_id) def _create_and_persist_member_event(self) -> Tuple[EventBase, EventContext]: # Create a member event we can use as an auth_event From dde36cd70479453c2e5d2001ea45b5cb949b69e6 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 8 Aug 2023 15:06:08 -0400 Subject: [PATCH 2/6] Stop reading and writing to event_txn_id. --- synapse/handlers/message.py | 13 ------- synapse/storage/databases/main/events.py | 35 +------------------ .../storage/databases/main/events_worker.py | 19 ---------- 3 files changed, 1 insertion(+), 66 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a74db1dccffa..2b2496128083 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -908,19 +908,6 @@ async def get_event_id_from_transaction( if existing_event_id: return existing_event_id - # Some requsters don't have device IDs (appservice, guests, and access - # tokens minted with the admin API), fallback to checking the access token - # ID, which should be close enough. - if requester.access_token_id: - existing_event_id = ( - await self.store.get_event_id_from_transaction_id_and_token_id( - room_id, - requester.user.to_string(), - requester.access_token_id, - txn_id, - ) - ) - return existing_event_id async def get_event_from_transaction( diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index c1353b18c1cd..f7a370d5b5ed 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -978,26 +978,12 @@ def _persist_transaction_ids_txn( """Persist the mapping from transaction IDs to event IDs (if defined).""" inserted_ts = self._clock.time_msec() - to_insert_token_id: List[Tuple[str, str, str, int, str, int]] = [] to_insert_device_id: List[Tuple[str, str, str, str, str, int]] = [] for event, _ in events_and_contexts: txn_id = getattr(event.internal_metadata, "txn_id", None) - token_id = getattr(event.internal_metadata, "token_id", None) device_id = getattr(event.internal_metadata, "device_id", None) if txn_id is not None: - if token_id is not None: - to_insert_token_id.append( - ( - event.event_id, - event.room_id, - event.sender, - token_id, - txn_id, - inserted_ts, - ) - ) - if device_id is not None: to_insert_device_id.append( ( @@ -1010,26 +996,7 @@ def _persist_transaction_ids_txn( ) ) - # Synapse usually relies on the device_id to scope transactions for events, - # except for users without device IDs (appservice, guests, and access - # tokens minted with the admin API) which use the access token ID instead. - # - # TODO https://github.com/matrix-org/synapse/issues/16042 - if to_insert_token_id: - self.db_pool.simple_insert_many_txn( - txn, - table="event_txn_id", - keys=( - "event_id", - "room_id", - "user_id", - "token_id", - "txn_id", - "inserted_ts", - ), - values=to_insert_token_id, - ) - + # Synapse relies on the device_id to scope transactions for events.. if to_insert_device_id: self.db_pool.simple_insert_many_txn( txn, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index f6b7187833c5..21576b18efe3 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -2022,25 +2022,6 @@ def get_next_event_to_expire_txn( desc="get_next_event_to_expire", func=get_next_event_to_expire_txn ) - async def get_event_id_from_transaction_id_and_token_id( - self, room_id: str, user_id: str, token_id: int, txn_id: str - ) -> Optional[str]: - """Look up if we have already persisted an event for the transaction ID, - returning the event ID if so. - """ - return await self.db_pool.simple_select_one_onecol( - table="event_txn_id", - keyvalues={ - "room_id": room_id, - "user_id": user_id, - "token_id": token_id, - "txn_id": txn_id, - }, - retcol="event_id", - allow_none=True, - desc="get_event_id_from_transaction_id_and_token_id", - ) - async def get_event_id_from_transaction_id_and_device_id( self, room_id: str, user_id: str, device_id: str, txn_id: str ) -> Optional[str]: From b6eaca059bf8c0ae1418aeef50feccb5d38a2ccc Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 23 Aug 2023 12:29:43 -0400 Subject: [PATCH 3/6] Bump schema ver & minimum schema ver. --- synapse/storage/schema/__init__.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 649d3c8e9f96..c43fe6b7a7c7 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -114,19 +114,15 @@ Changes in SCHEMA_VERSION = 80 - The event_txn_id_device_id is always written to for new events. - Add tables for the task scheduler. + +Changes in SCHEMA_VERSION = 81 + - The event_txn_id is no longer written to for new events. """ SCHEMA_COMPAT_VERSION = ( - # Queries against `event_stream_ordering` columns in membership tables must - # be disambiguated. - # - # The threads_id column must written to with non-null values for the - # event_push_actions, event_push_actions_staging, and event_push_summary tables. - # - # insertions to the column `full_user_id` of tables profiles and user_filters can no - # longer be null - 76 + # The `event_txn_id_device_id` must be written to for new events. + 80 ) """Limit on how far the synapse codebase can be rolled back without breaking db compat From 451bbc4972097352df0c22269b537f37afa10587 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 23 Aug 2023 12:36:47 -0400 Subject: [PATCH 4/6] Newsfragment --- changelog.d/16175.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/16175.misc diff --git a/changelog.d/16175.misc b/changelog.d/16175.misc new file mode 100644 index 000000000000..308fbc225923 --- /dev/null +++ b/changelog.d/16175.misc @@ -0,0 +1 @@ +Stop using the `event_txn_id` table. From 76d9b96b92ee3c59e409877bc2d6c25135fb7e66 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 29 Aug 2023 14:25:20 -0400 Subject: [PATCH 5/6] Include user_id in txn ID -> event map. --- synapse/storage/databases/main/events_worker.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 21576b18efe3..1eb313040ed9 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -2053,7 +2053,7 @@ async def get_already_persisted_events( """ mapping = {} - txn_id_to_event: Dict[Tuple[str, str, str], str] = {} + txn_id_to_event: Dict[Tuple[str, str, str, str], str] = {} for event in events: device_id = getattr(event.internal_metadata, "device_id", None) @@ -2061,7 +2061,9 @@ async def get_already_persisted_events( if device_id and txn_id: # Check if this is a duplicate of an event in the given events. - existing = txn_id_to_event.get((event.room_id, device_id, txn_id)) + existing = txn_id_to_event.get( + (event.room_id, event.sender, device_id, txn_id) + ) if existing: mapping[event.event_id] = existing continue @@ -2073,9 +2075,13 @@ async def get_already_persisted_events( ) if existing: mapping[event.event_id] = existing - txn_id_to_event[(event.room_id, device_id, txn_id)] = existing + txn_id_to_event[ + (event.room_id, event.sender, device_id, txn_id) + ] = existing else: - txn_id_to_event[(event.room_id, device_id, txn_id)] = event.event_id + txn_id_to_event[ + (event.room_id, event.sender, device_id, txn_id) + ] = event.event_id return mapping From bf16dcbe68b18d9236310304a71d4c4f64ddff0a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 29 Aug 2023 14:25:52 -0400 Subject: [PATCH 6/6] Bump schema version. --- synapse/storage/schema/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index c43fe6b7a7c7..422f11f59e9e 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -SCHEMA_VERSION = 80 # remember to update the list below when updating +SCHEMA_VERSION = 81 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the