Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix serialization errors when rotating notifications #13118

Merged
merged 14 commits into from Jun 28, 2022
135 changes: 94 additions & 41 deletions synapse/storage/databases/main/event_push_actions.py
Expand Up @@ -233,14 +233,25 @@ def _get_unread_counts_by_pos_txn(

counts = NotifCounts()

# First we pull the counts from the summary table
# First we pull the counts from the summary table.
#
# We check that `last_receipt_stream_ordering` matches the stream
# ordering given, if it doesn't then a new read receipt hasn't been
# handled yet and so we the data in the table is stale.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
#
# If `last_receipt_stream_ordering` is null then that means its up to
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# date.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does null mean "up to date" ?

txn.execute(
"""
SELECT stream_ordering, notif_count, COALESCE(unread_count, 0)
FROM event_push_summary
WHERE room_id = ? AND user_id = ? AND stream_ordering > ?
WHERE room_id = ? AND user_id = ?
AND (
(last_receipt_stream_ordering IS NULL AND stream_ordering > ?)
OR last_receipt_stream_ordering = ?
)
""",
(room_id, user_id, stream_ordering),
(room_id, user_id, stream_ordering, stream_ordering),
)
row = txn.fetchone()

Expand Down Expand Up @@ -800,6 +811,18 @@ async def _rotate_notifs(self) -> None:
self._doing_notif_rotation = True

try:
# First we handle any new receipts that have happened.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does "handle" mean here? have happened since when?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expanded

while True:
logger.info("Handling new receipts")
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

caught_up = await self.db_pool.runInteraction(
"_handle_new_receipts_for_notifs_txn",
self._handle_new_receipts_for_notifs_txn,
)
if caught_up:
break

# Then we update the event push summaries for any new events
while True:
logger.info("Rotating notifications")

Expand All @@ -810,10 +833,78 @@ async def _rotate_notifs(self) -> None:
break
await self.hs.get_clock().sleep(self._rotate_delay)

# Finally we clear out old event push actions.
await self._remove_old_push_actions_that_have_rotated()
finally:
self._doing_notif_rotation = False

def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
"""Check for new read receipts and delete from event push actions."""
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

limit = 100

sql = """
SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering
FROM receipts_linearized AS r, event_push_summary_last_receipt_stream_id AS eps, events AS e
WHERE r.stream_id > eps.stream_id AND r.event_id = e.event_id
ORDER BY r.stream_id ASC
LIMIT ?
"""

txn.execute(sql, (limit,))
rows = txn.fetchall()

if not rows:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
return True

# For each new read receipt we delete push actions from before it and
# recalculate the summary.
for _, room_id, user_id, stream_ordering in rows:
txn.execute(
"""
DELETE FROM event_push_actions
WHERE room_id = ?
AND user_id = ?
AND stream_ordering <= ?
AND highlight = 0
""",
(room_id, user_id, stream_ordering),
)

old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
txn,
table="event_push_summary_stream_ordering",
keyvalues={},
retcol="stream_ordering",
)

notif_count, unread_count = self._get_notif_unread_count_for_user_room(
txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering
)

self.db_pool.simple_upsert_txn(
txn,
table="event_push_summary",
keyvalues={"room_id": room_id, "user_id": user_id},
values={
"notif_count": notif_count,
"unread_count": unread_count,
"stream_ordering": old_rotate_stream_ordering,
"last_receipt_stream_ordering": stream_ordering,
},
)

last_stream_id = rows[-1][0]

self.db_pool.simple_update_one_txn(
txn,
table="event_push_summary_last_receipt_stream_id",
keyvalues={},
updatevalues={"stream_id": last_stream_id},
)

return len(rows) < limit

def _rotate_notifs_txn(self, txn: LoggingTransaction) -> bool:
"""Archives older notifications into event_push_summary. Returns whether
the archiving process has caught up or not.
Expand Down Expand Up @@ -1055,44 +1146,6 @@ def _remove_old_push_actions_before_txn(
(room_id, user_id),
)

# We need to join on the events table to get the received_ts for
# event_push_actions and sqlite won't let us use a join in a delete so
# we can't just delete where received_ts < x. Furthermore we can
# only identify event_push_actions by a tuple of room_id, event_id
# we we can't use a subquery.
# Instead, we look up the stream ordering for the last event in that
# room received before the threshold time and delete event_push_actions
# in the room with a stream_odering before that.
txn.execute(
"DELETE FROM event_push_actions "
" WHERE user_id = ? AND room_id = ? AND "
" stream_ordering <= ?"
" AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
(user_id, room_id, stream_ordering, self.stream_ordering_month_ago),
)

old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
txn,
table="event_push_summary_stream_ordering",
keyvalues={},
retcol="stream_ordering",
)

notif_count, unread_count = self._get_notif_unread_count_for_user_room(
txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering
)

self.db_pool.simple_upsert_txn(
txn,
table="event_push_summary",
keyvalues={"room_id": room_id, "user_id": user_id},
values={
"notif_count": notif_count,
"unread_count": unread_count,
"stream_ordering": old_rotate_stream_ordering,
},
)

erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

class EventPushActionsStore(EventPushActionsWorkerStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
Expand Down
13 changes: 1 addition & 12 deletions synapse/storage/databases/main/receipts.py
Expand Up @@ -26,7 +26,7 @@
cast,
)

from synapse.api.constants import EduTypes, ReceiptTypes
from synapse.api.constants import EduTypes
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import ReceiptsStream
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
Expand Down Expand Up @@ -682,17 +682,6 @@ def _insert_linearized_receipt_txn(
lock=False,
)

# When updating a local users read receipt, remove any push actions
# which resulted from the receipt's event and all earlier events.
if (
self.hs.is_mine_id(user_id)
and receipt_type in (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE)
and stream_ordering is not None
):
self._remove_old_push_actions_before_txn( # type: ignore[attr-defined]
txn, room_id=room_id, user_id=user_id, stream_ordering=stream_ordering
)

return rx_ts

def _graph_to_linear(
Expand Down
@@ -0,0 +1,32 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Add a column that records the position of the read receipt for the user at
-- the time we summarised the push actions. This is used to check if the counts
-- are up to date after a new read receipt has been sent. Null means that we can
-- skip that check.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does null mean that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have expanded

ALTER TABLE event_push_summary ADD COLUMN last_receipt_stream_ordering BIGINT;


-- Tracks which new receipts we've handled
CREATE TABLE event_push_summary_last_receipt_stream_id (
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
stream_id BIGINT NOT NULL,
CHECK (Lock='X')
);

INSERT INTO event_push_summary_last_receipt_stream_id (stream_id)
SELECT COALESCE(MAX(stream_id), 0)
FROM receipts_linearized;
33 changes: 29 additions & 4 deletions tests/storage/test_event_push_actions.py
Expand Up @@ -81,11 +81,26 @@ def _assert_counts(noitf_count: int, highlight_count: int) -> None:
def _inject_actions(stream: int, action: list) -> None:
event = Mock()
event.room_id = room_id
event.event_id = "$test:example.com"
event.event_id = f"$test{stream}:example.com"
event.internal_metadata.stream_ordering = stream
event.internal_metadata.is_outlier.return_value = False
event.depth = stream

self.get_success(
self.store.db_pool.simple_insert(
table="events",
values={
"stream_ordering": stream,
"topological_ordering": stream,
"type": "m.room.message",
"room_id": room_id,
"processed": True,
"outlier": False,
"event_id": event.event_id,
},
)
)

self.get_success(
self.store.add_push_actions_to_staging(
event.event_id,
Expand All @@ -105,18 +120,28 @@ def _inject_actions(stream: int, action: list) -> None:
def _rotate(stream: int) -> None:
self.get_success(
self.store.db_pool.runInteraction(
"", self.store._rotate_notifs_before_txn, stream
"rotate-receipts", self.store._handle_new_receipts_for_notifs_txn
)
)

self.get_success(
self.store.db_pool.runInteraction(
"rotate-notifs", self.store._rotate_notifs_before_txn, stream
)
)

def _mark_read(stream: int, depth: int) -> None:
last_read_stream_ordering[0] = stream

self.get_success(
self.store.db_pool.runInteraction(
"",
self.store._remove_old_push_actions_before_txn,
self.store._insert_linearized_receipt_txn,
room_id,
"m.read",
user_id,
f"$test{stream}:example.com",
{},
stream,
)
)
Expand Down Expand Up @@ -150,7 +175,7 @@ def _mark_read(stream: int, depth: int) -> None:

_assert_counts(1, 0)

_mark_read(7, 7)
_mark_read(6, 6)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a change in behavior then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is just making sure we're inject a read receipt at an actual event (which we have to), rather than at some arbitrary place.

(Side note: we need to rewrite this test to not poke things directly into the DB, and instead create real rooms, events and receipts)

_assert_counts(0, 0)

_inject_actions(8, HIGHLIGHT)
Expand Down