Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Stop writing to the event_txn_id table #16175

Merged
merged 7 commits into from Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16175.misc
@@ -0,0 +1 @@
Stop using the `event_txn_id` table.
13 changes: 0 additions & 13 deletions synapse/handlers/message.py
Expand Up @@ -908,19 +908,6 @@ async def get_event_id_from_transaction(
if existing_event_id:
return existing_event_id

# Some requsters don't have device IDs (appservice, guests, and access
# tokens minted with the admin API), fallback to checking the access token
# ID, which should be close enough.
if requester.access_token_id:
existing_event_id = (
await self.store.get_event_id_from_transaction_id_and_token_id(
room_id,
requester.user.to_string(),
requester.access_token_id,
txn_id,
)
)

return existing_event_id

async def get_event_from_transaction(
Expand Down
35 changes: 1 addition & 34 deletions synapse/storage/databases/main/events.py
Expand Up @@ -978,26 +978,12 @@ def _persist_transaction_ids_txn(
"""Persist the mapping from transaction IDs to event IDs (if defined)."""

inserted_ts = self._clock.time_msec()
to_insert_token_id: List[Tuple[str, str, str, int, str, int]] = []
to_insert_device_id: List[Tuple[str, str, str, str, str, int]] = []
for event, _ in events_and_contexts:
txn_id = getattr(event.internal_metadata, "txn_id", None)
token_id = getattr(event.internal_metadata, "token_id", None)
device_id = getattr(event.internal_metadata, "device_id", None)

if txn_id is not None:
if token_id is not None:
to_insert_token_id.append(
(
event.event_id,
event.room_id,
event.sender,
token_id,
txn_id,
inserted_ts,
)
)

if device_id is not None:
to_insert_device_id.append(
(
Expand All @@ -1010,26 +996,7 @@ def _persist_transaction_ids_txn(
)
)

# Synapse usually relies on the device_id to scope transactions for events,
# except for users without device IDs (appservice, guests, and access
# tokens minted with the admin API) which use the access token ID instead.
#
# TODO https://github.com/matrix-org/synapse/issues/16042
if to_insert_token_id:
self.db_pool.simple_insert_many_txn(
txn,
table="event_txn_id",
keys=(
"event_id",
"room_id",
"user_id",
"token_id",
"txn_id",
"inserted_ts",
),
values=to_insert_token_id,
)

# Synapse relies on the device_id to scope transactions for events..
if to_insert_device_id:
self.db_pool.simple_insert_many_txn(
txn,
Expand Down
41 changes: 14 additions & 27 deletions synapse/storage/databases/main/events_worker.py
Expand Up @@ -2022,25 +2022,6 @@ def get_next_event_to_expire_txn(
desc="get_next_event_to_expire", func=get_next_event_to_expire_txn
)

async def get_event_id_from_transaction_id_and_token_id(
self, room_id: str, user_id: str, token_id: int, txn_id: str
) -> Optional[str]:
"""Look up if we have already persisted an event for the transaction ID,
returning the event ID if so.
"""
return await self.db_pool.simple_select_one_onecol(
table="event_txn_id",
keyvalues={
"room_id": room_id,
"user_id": user_id,
"token_id": token_id,
"txn_id": txn_id,
},
retcol="event_id",
allow_none=True,
desc="get_event_id_from_transaction_id_and_token_id",
)

async def get_event_id_from_transaction_id_and_device_id(
self, room_id: str, user_id: str, device_id: str, txn_id: str
) -> Optional[str]:
Expand Down Expand Up @@ -2072,29 +2053,35 @@ async def get_already_persisted_events(
"""

mapping = {}
txn_id_to_event: Dict[Tuple[str, int, str], str] = {}
txn_id_to_event: Dict[Tuple[str, str, str, str], str] = {}

for event in events:
token_id = getattr(event.internal_metadata, "token_id", None)
device_id = getattr(event.internal_metadata, "device_id", None)
clokep marked this conversation as resolved.
Show resolved Hide resolved
txn_id = getattr(event.internal_metadata, "txn_id", None)

if token_id and txn_id:
if device_id and txn_id:
# Check if this is a duplicate of an event in the given events.
existing = txn_id_to_event.get((event.room_id, token_id, txn_id))
existing = txn_id_to_event.get(
(event.room_id, event.sender, device_id, txn_id)
)
if existing:
mapping[event.event_id] = existing
continue

# Check if this is a duplicate of an event we've already
# persisted.
existing = await self.get_event_id_from_transaction_id_and_token_id(
event.room_id, event.sender, token_id, txn_id
existing = await self.get_event_id_from_transaction_id_and_device_id(
event.room_id, event.sender, device_id, txn_id
)
if existing:
mapping[event.event_id] = existing
txn_id_to_event[(event.room_id, token_id, txn_id)] = existing
txn_id_to_event[
(event.room_id, event.sender, device_id, txn_id)
] = existing
else:
txn_id_to_event[(event.room_id, token_id, txn_id)] = event.event_id
txn_id_to_event[
(event.room_id, event.sender, device_id, txn_id)
] = event.event_id

return mapping

Expand Down
16 changes: 6 additions & 10 deletions synapse/storage/schema/__init__.py
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

SCHEMA_VERSION = 80 # remember to update the list below when updating
SCHEMA_VERSION = 81 # remember to update the list below when updating
"""Represents the expectations made by the codebase about the database schema

This should be incremented whenever the codebase changes its requirements on the
Expand Down Expand Up @@ -114,19 +114,15 @@
Changes in SCHEMA_VERSION = 80
- The event_txn_id_device_id is always written to for new events.
- Add tables for the task scheduler.

Changes in SCHEMA_VERSION = 81
- The event_txn_id is no longer written to for new events.
"""


SCHEMA_COMPAT_VERSION = (
# Queries against `event_stream_ordering` columns in membership tables must
# be disambiguated.
#
# The threads_id column must written to with non-null values for the
# event_push_actions, event_push_actions_staging, and event_push_summary tables.
#
# insertions to the column `full_user_id` of tables profiles and user_filters can no
# longer be null
76
# The `event_txn_id_device_id` must be written to for new events.
80
clokep marked this conversation as resolved.
Show resolved Hide resolved
)
"""Limit on how far the synapse codebase can be rolled back without breaking db compat

Expand Down
15 changes: 4 additions & 11 deletions tests/handlers/test_message.py
Expand Up @@ -46,18 +46,11 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self._persist_event_storage_controller = persistence

self.user_id = self.register_user("tester", "foobar")
self.access_token = self.login("tester", "foobar")
self.room_id = self.helper.create_room_as(self.user_id, tok=self.access_token)

info = self.get_success(
self.hs.get_datastores().main.get_user_by_access_token(
self.access_token,
)
)
assert info is not None
self.token_id = info.token_id
device_id = "dev-1"
access_token = self.login("tester", "foobar", device_id=device_id)
self.room_id = self.helper.create_room_as(self.user_id, tok=access_token)

self.requester = create_requester(self.user_id, access_token_id=self.token_id)
self.requester = create_requester(self.user_id, device_id=device_id)

def _create_and_persist_member_event(self) -> Tuple[EventBase, EventContext]:
# Create a member event we can use as an auth_event
Expand Down