Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix serialization errors when rotating notifications #13118

Merged
merged 14 commits into from Jun 28, 2022
1 change: 1 addition & 0 deletions changelog.d/13118.misc
@@ -0,0 +1 @@
Reduce DB usage of `/sync` when a large number of unread messages have recently been sent in a room.
200 changes: 134 additions & 66 deletions synapse/storage/databases/main/event_push_actions.py
Expand Up @@ -233,14 +233,29 @@ def _get_unread_counts_by_pos_txn(

counts = NotifCounts()

# First we pull the counts from the summary table
# First we pull the counts from the summary table.
#
# We check that `last_receipt_stream_ordering` matches the stream
# ordering given, if it doesn't then a new read receipt has arrived and
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# we haven't yet updated the counts in `event_push_summary` to reflect
# that. In the latter case we simply ignore `event_push_summary` counts
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# and do a manual count of rows in `event_push_actions` table below.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I still don't get why this is an ok thing to do. Haven't we deleted rows from event_push_actions, so relying on it (rather than including event_push_summary) will give numbers that are too low?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We delete rows at the same time as we update the event_push_summary. After a user sends a receipt in the room there are two cases:

  1. The receipt has been processed in the _handle_new_receipts_for_notifs_txn background task, and so we have deleted the push actions but updated the event_push_summary.
  2. The receipt has not been processed, so event_push_summary is stale but we haven't yet deleted the push actions so we can correctly calculate the count from event_push_actions.

Maybe:

and do a manual count of rows in event_push_actions table below.
This gives the correct result as we won't have deleted the push actions yet.

?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, but suppose:

  • a user has 5 unread messages in a room (say E1...E5)
  • A RR arrives for E2. We process it, update event_push_summary to say there are 3 remaining unread messages, and delete all the entries from event_push_actions.
  • A RR arrives for E4, but before we process it fully, we do this query. We ignore event_push_summary and there is nothing left in event_push_actions. So we return 0 unread messages instead of 1?

Or am I fundamentally misunderstanding how this works?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We delete all entries from event_push_actions from before E2, so there will still be the three push actions for the three unread messages in there. So in the final step we'd still have push actions E3, E4 and E5 in the event_push_actions table

#
# If `last_receipt_stream_ordering` is null then that means its up to
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# date (as the row was written by an older version of Synapse that
# updated `event_push_summary` synchronously when persisting a new read
# receipt).
txn.execute(
"""
SELECT stream_ordering, notif_count, COALESCE(unread_count, 0)
FROM event_push_summary
WHERE room_id = ? AND user_id = ? AND stream_ordering > ?
WHERE room_id = ? AND user_id = ?
AND (
(last_receipt_stream_ordering IS NULL AND stream_ordering > ?)
OR last_receipt_stream_ordering = ?
)
""",
(room_id, user_id, stream_ordering),
(room_id, user_id, stream_ordering, stream_ordering),
)
row = txn.fetchone()

Expand All @@ -263,9 +278,9 @@ def _get_unread_counts_by_pos_txn(
if row:
counts.highlight_count += row[0]

# Finally we need to count push actions that haven't been summarized
# yet.
# We only want to pull out push actions that we haven't summarized yet.
# Finally we need to count push actions that aren't included in the
# summary returned above, e.g. recent events that haven't been
# summarized yet, or the summary is empty due to a recent read receipt.
stream_ordering = max(stream_ordering, summary_stream_ordering)
notify_count, unread_count = self._get_notif_unread_count_for_user_room(
txn, room_id, user_id, stream_ordering
Expand Down Expand Up @@ -800,6 +815,19 @@ async def _rotate_notifs(self) -> None:
self._doing_notif_rotation = True

try:
# First we recalculate push summaries and delete stale push actions
# for rooms/users with new receipts.
while True:
logger.debug("Handling new receipts")

caught_up = await self.db_pool.runInteraction(
"_handle_new_receipts_for_notifs_txn",
self._handle_new_receipts_for_notifs_txn,
)
if caught_up:
break

# Then we update the event push summaries for any new events
while True:
logger.info("Rotating notifications")

Expand All @@ -810,10 +838,110 @@ async def _rotate_notifs(self) -> None:
break
await self.hs.get_clock().sleep(self._rotate_delay)

# Finally we clear out old event push actions.
await self._remove_old_push_actions_that_have_rotated()
finally:
self._doing_notif_rotation = False

def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
"""Check for new read receipts and delete from event push actions.

Any push actions which predate the user's most recent read receipt are
now redundant, so we can remove them from `event_push_actions` and
update `event_push_summary`.
"""

limit = 100

min_stream_id = self.db_pool.simple_select_one_onecol_txn(
txn,
table="event_push_summary_last_receipt_stream_id",
keyvalues={},
retcol="stream_id",
)

sql = """
SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering
FROM receipts_linearized AS r
INNER JOIN events AS e USING (event_id)
WHERE r.stream_id > ? AND user_id LIKE ?
ORDER BY r.stream_id ASC
LIMIT ?
"""

# We only want local users, so we add a dodgy filter to the above query
# and recheck it below.
user_filter = "%:" + self.hs.hostname

txn.execute(
sql,
(
min_stream_id,
user_filter,
limit,
),
)
rows = txn.fetchall()

# For each new read receipt we delete push actions from before it and
# recalculate the summary.
for _, room_id, user_id, stream_ordering in rows:
# Only handle our own read receipts.
if not self.hs.is_mine_id(user_id):
continue

txn.execute(
"""
DELETE FROM event_push_actions
WHERE room_id = ?
AND user_id = ?
AND stream_ordering <= ?
AND highlight = 0
""",
(room_id, user_id, stream_ordering),
)

old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
txn,
table="event_push_summary_stream_ordering",
keyvalues={},
retcol="stream_ordering",
)

notif_count, unread_count = self._get_notif_unread_count_for_user_room(
txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering
)

self.db_pool.simple_upsert_txn(
txn,
table="event_push_summary",
keyvalues={"room_id": room_id, "user_id": user_id},
values={
"notif_count": notif_count,
"unread_count": unread_count,
"stream_ordering": old_rotate_stream_ordering,
"last_receipt_stream_ordering": stream_ordering,
},
)

# We always update `event_push_summary_last_receipt_stream_id` to
# ensure that we don't rescan the same receipts for remote users.
#
# This requires repeatable read to be safe, as we need the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'd be quite nice to do an assertion that we have the right isolation level, but that looks a bit fiddly and probably a thing for another time.

# `MAX(stream_id)` to not include any new rows that have been committed
# since the start of the transaction (since those rows won't have been
# returned by the query above). Alternatively we could query the max
# stream ID at the start of the transaction and bound everything by
# that.
txn.execute(
"""
UPDATE event_push_summary_last_receipt_stream_id
SET stream_id = (SELECT COALESCE(MAX(stream_id), 0) FROM receipts_linearized)
"""
)

return len(rows) < limit

def _rotate_notifs_txn(self, txn: LoggingTransaction) -> bool:
"""Archives older notifications into event_push_summary. Returns whether
the archiving process has caught up or not.
Expand Down Expand Up @@ -1033,66 +1161,6 @@ def remove_old_push_actions_that_have_rotated_txn(
if done:
break

def _remove_old_push_actions_before_txn(
self, txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int
) -> None:
"""
Purges old push actions for a user and room before a given
stream_ordering.

We however keep a months worth of highlighted notifications, so that
users can still get a list of recent highlights.

Args:
txn: The transaction
room_id: Room ID to delete from
user_id: user ID to delete for
stream_ordering: The lowest stream ordering which will
not be deleted.
"""
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate,
(room_id, user_id),
)

# We need to join on the events table to get the received_ts for
# event_push_actions and sqlite won't let us use a join in a delete so
# we can't just delete where received_ts < x. Furthermore we can
# only identify event_push_actions by a tuple of room_id, event_id
# we we can't use a subquery.
# Instead, we look up the stream ordering for the last event in that
# room received before the threshold time and delete event_push_actions
# in the room with a stream_odering before that.
txn.execute(
"DELETE FROM event_push_actions "
" WHERE user_id = ? AND room_id = ? AND "
" stream_ordering <= ?"
" AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
(user_id, room_id, stream_ordering, self.stream_ordering_month_ago),
)

old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
txn,
table="event_push_summary_stream_ordering",
keyvalues={},
retcol="stream_ordering",
)

notif_count, unread_count = self._get_notif_unread_count_for_user_room(
txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering
)

self.db_pool.simple_upsert_txn(
txn,
table="event_push_summary",
keyvalues={"room_id": room_id, "user_id": user_id},
values={
"notif_count": notif_count,
"unread_count": unread_count,
"stream_ordering": old_rotate_stream_ordering,
},
)

erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

class EventPushActionsStore(EventPushActionsWorkerStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
Expand Down
13 changes: 1 addition & 12 deletions synapse/storage/databases/main/receipts.py
Expand Up @@ -26,7 +26,7 @@
cast,
)

from synapse.api.constants import EduTypes, ReceiptTypes
from synapse.api.constants import EduTypes
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import ReceiptsStream
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
Expand Down Expand Up @@ -682,17 +682,6 @@ def _insert_linearized_receipt_txn(
lock=False,
)

# When updating a local users read receipt, remove any push actions
# which resulted from the receipt's event and all earlier events.
if (
self.hs.is_mine_id(user_id)
and receipt_type in (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE)
and stream_ordering is not None
):
self._remove_old_push_actions_before_txn( # type: ignore[attr-defined]
txn, room_id=room_id, user_id=user_id, stream_ordering=stream_ordering
)

return rx_ts

def _graph_to_linear(
Expand Down
@@ -0,0 +1,35 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Add a column that records the position of the read receipt for the user at
-- the time we summarised the push actions. This is used to check if the counts
-- are up to date after a new read receipt has been sent.
--
-- Null means that we can skip that check, as the row was written by an older
-- version of Synapse that updated `event_push_summary` synchronously when
-- persisting a new read receipt
ALTER TABLE event_push_summary ADD COLUMN last_receipt_stream_ordering BIGINT;


-- Tracks which new receipts we've handled
CREATE TABLE event_push_summary_last_receipt_stream_id (
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
stream_id BIGINT NOT NULL,
CHECK (Lock='X')
);

INSERT INTO event_push_summary_last_receipt_stream_id (stream_id)
SELECT COALESCE(MAX(stream_id), 0)
FROM receipts_linearized;
35 changes: 30 additions & 5 deletions tests/storage/test_event_push_actions.py
Expand Up @@ -55,7 +55,7 @@ def test_get_unread_push_actions_for_user_in_range_for_email(self) -> None:

def test_count_aggregation(self) -> None:
room_id = "!foo:example.com"
user_id = "@user1235:example.com"
user_id = "@user1235:test"

last_read_stream_ordering = [0]

Expand All @@ -81,11 +81,26 @@ def _assert_counts(noitf_count: int, highlight_count: int) -> None:
def _inject_actions(stream: int, action: list) -> None:
event = Mock()
event.room_id = room_id
event.event_id = "$test:example.com"
event.event_id = f"$test{stream}:example.com"
event.internal_metadata.stream_ordering = stream
event.internal_metadata.is_outlier.return_value = False
event.depth = stream

self.get_success(
self.store.db_pool.simple_insert(
table="events",
values={
"stream_ordering": stream,
"topological_ordering": stream,
"type": "m.room.message",
"room_id": room_id,
"processed": True,
"outlier": False,
"event_id": event.event_id,
},
)
)

self.get_success(
self.store.add_push_actions_to_staging(
event.event_id,
Expand All @@ -105,18 +120,28 @@ def _inject_actions(stream: int, action: list) -> None:
def _rotate(stream: int) -> None:
self.get_success(
self.store.db_pool.runInteraction(
"", self.store._rotate_notifs_before_txn, stream
"rotate-receipts", self.store._handle_new_receipts_for_notifs_txn
)
)

self.get_success(
self.store.db_pool.runInteraction(
"rotate-notifs", self.store._rotate_notifs_before_txn, stream
)
)

def _mark_read(stream: int, depth: int) -> None:
last_read_stream_ordering[0] = stream

self.get_success(
self.store.db_pool.runInteraction(
"",
self.store._remove_old_push_actions_before_txn,
self.store._insert_linearized_receipt_txn,
room_id,
"m.read",
user_id,
f"$test{stream}:example.com",
{},
stream,
)
)
Expand Down Expand Up @@ -150,7 +175,7 @@ def _mark_read(stream: int, depth: int) -> None:

_assert_counts(1, 0)

_mark_read(7, 7)
_mark_read(6, 6)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a change in behavior then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is just making sure we're inject a read receipt at an actual event (which we have to), rather than at some arbitrary place.

(Side note: we need to rewrite this test to not poke things directly into the DB, and instead create real rooms, events and receipts)

_assert_counts(0, 0)

_inject_actions(8, HIGHLIGHT)
Expand Down