Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Attempt to delete more duplicate rows in receipts_linearized table. #14915

Merged
merged 8 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14915.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.70.0 where the background updates to add non-thread unique indexes on receipts could fail when upgrading from 1.67.0 or earlier.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stolen from #14453.

23 changes: 14 additions & 9 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -941,34 +941,39 @@ async def _background_receipts_linearized_unique_index(
receipts."""

def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
if isinstance(self.database_engine, PostgresEngine):
ROW_ID_NAME = "ctid"
else:
ROW_ID_NAME = "rowid"

# Identify any duplicate receipts arising from
# https://github.com/matrix-org/synapse/issues/14406.
# We expect the following query to use the per-thread receipt index and take
# less than a minute.
clokep marked this conversation as resolved.
Show resolved Hide resolved
sql = """
SELECT MAX(stream_id), room_id, receipt_type, user_id
sql = f"""
SELECT MAX(stream_id), MAX({ROW_ID_NAME}), room_id, receipt_type, user_id
FROM receipts_linearized
WHERE thread_id IS NULL
GROUP BY room_id, receipt_type, user_id
HAVING COUNT(*) > 1
"""
txn.execute(sql)
duplicate_keys = cast(List[Tuple[int, str, str, str]], list(txn))
duplicate_keys = cast(List[Tuple[int, int, str, str, str]], list(txn))

# Then remove duplicate receipts, keeping the one with the highest
# `stream_id`. There should only be a single receipt with any given
# `stream_id`.
for max_stream_id, room_id, receipt_type, user_id in duplicate_keys:
sql = """
# `stream_id`. Since there might be duplicate rows with the same
# `stream_id`, we delete by the rowid instead.
for _, row_id, room_id, receipt_type, user_id in duplicate_keys:
sql = f"""
clokep marked this conversation as resolved.
Show resolved Hide resolved
DELETE FROM receipts_linearized
WHERE
room_id = ? AND
receipt_type = ? AND
user_id = ? AND
thread_id IS NULL AND
stream_id < ?
{ROW_ID_NAME} != ?
clokep marked this conversation as resolved.
Show resolved Hide resolved
"""
txn.execute(sql, (room_id, receipt_type, user_id, max_stream_id))
txn.execute(sql, (room_id, receipt_type, user_id, row_id))

await self.db_pool.runInteraction(
self.RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME,
Expand Down
4 changes: 3 additions & 1 deletion tests/storage/databases/main/test_receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ def test_background_receipts_linearized_unique_index(self) -> None:
{"stream_id": 6, "event_id": "$some_event"},
],
(self.other_room_id, "m.read", self.user_id): [
{"stream_id": 7, "event_id": "$some_event"}
# It is possible for stream IDs to be duplicated.
{"stream_id": 7, "event_id": "$some_event"},
{"stream_id": 7, "event_id": "$some_event"},
],
},
expected_unique_receipts={
Expand Down