Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Attempt to delete more duplicate rows in receipts_linearized table. #14915

Merged
merged 8 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14915.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.70.0 where the background updates to add non-thread unique indexes on receipts could fail when upgrading from 1.67.0 or earlier.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stolen from #14453.

34 changes: 26 additions & 8 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -941,10 +941,14 @@ async def _background_receipts_linearized_unique_index(
receipts."""

def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
if isinstance(self.database_engine, PostgresEngine):
ROW_ID_NAME = "ctid"
else:
ROW_ID_NAME = "rowid"

# Identify any duplicate receipts arising from
# https://github.com/matrix-org/synapse/issues/14406.
# We expect the following query to use the per-thread receipt index and take
# less than a minute.
# The following query takes less than a minute on matrix.org.
sql = """
SELECT MAX(stream_id), room_id, receipt_type, user_id
FROM receipts_linearized
Expand All @@ -956,19 +960,33 @@ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
duplicate_keys = cast(List[Tuple[int, str, str, str]], list(txn))

# Then remove duplicate receipts, keeping the one with the highest
# `stream_id`. There should only be a single receipt with any given
# `stream_id`.
for max_stream_id, room_id, receipt_type, user_id in duplicate_keys:
sql = """
# `stream_id`. Since there might be duplicate rows with the same
# `stream_id`, we delete by the ctid instead.
for stream_id, room_id, receipt_type, user_id in duplicate_keys:
sql = f"""
SELECT {ROW_ID_NAME}
FROM receipts_linearized
WHERE
room_id = ? AND
receipt_type = ? AND
user_id = ? AND
thread_id IS NULL AND
stream_id = ?
LIMIT 1
"""
txn.execute(sql, (room_id, receipt_type, user_id, stream_id))
row_id = cast(Tuple[str], txn.fetchone())[0]

sql = f"""
DELETE FROM receipts_linearized
WHERE
room_id = ? AND
receipt_type = ? AND
user_id = ? AND
thread_id IS NULL AND
stream_id < ?
{ROW_ID_NAME} != ?
clokep marked this conversation as resolved.
Show resolved Hide resolved
"""
txn.execute(sql, (room_id, receipt_type, user_id, max_stream_id))
txn.execute(sql, (room_id, receipt_type, user_id, row_id))

await self.db_pool.runInteraction(
self.RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME,
Expand Down
4 changes: 3 additions & 1 deletion tests/storage/databases/main/test_receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ def test_background_receipts_linearized_unique_index(self) -> None:
{"stream_id": 6, "event_id": "$some_event"},
],
(self.other_room_id, "m.read", self.user_id): [
{"stream_id": 7, "event_id": "$some_event"}
# It is possible for stream IDs to be duplicated.
{"stream_id": 7, "event_id": "$some_event"},
{"stream_id": 7, "event_id": "$some_event"},
],
},
expected_unique_receipts={
Expand Down