Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Improve performance of `remove_{hidden,deleted}_devices_from_device_i…
Browse files Browse the repository at this point in the history
…nbox` (#11421)

Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
  • Loading branch information
babolivier and richvdh committed Nov 25, 2021
1 parent 7f9841b commit 0d88c4f
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 148 deletions.
1 change: 1 addition & 0 deletions changelog.d/11421.bugfix
@@ -0,0 +1 @@
Improve performance of various background database schema updates.
209 changes: 63 additions & 146 deletions synapse/storage/databases/main/deviceinbox.py
Expand Up @@ -599,6 +599,7 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
REMOVE_DELETED_DEVICES = "remove_deleted_devices_from_device_inbox"
REMOVE_HIDDEN_DEVICES = "remove_hidden_devices_from_device_inbox"
REMOVE_DEAD_DEVICES_FROM_INBOX = "remove_dead_devices_from_device_inbox"

def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
super().__init__(database, db_conn, hs)
Expand All @@ -614,14 +615,18 @@ def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox
)

self.db_pool.updates.register_background_update_handler(
self.REMOVE_DELETED_DEVICES,
self._remove_deleted_devices_from_device_inbox,
# Used to be a background update that deletes all device_inboxes for deleted
# devices.
self.db_pool.updates.register_noop_background_update(
self.REMOVE_DELETED_DEVICES
)
# Used to be a background update that deletes all device_inboxes for hidden
# devices.
self.db_pool.updates.register_noop_background_update(self.REMOVE_HIDDEN_DEVICES)

self.db_pool.updates.register_background_update_handler(
self.REMOVE_HIDDEN_DEVICES,
self._remove_hidden_devices_from_device_inbox,
self.REMOVE_DEAD_DEVICES_FROM_INBOX,
self._remove_dead_devices_from_device_inbox,
)

async def _background_drop_index_device_inbox(self, progress, batch_size):
Expand All @@ -636,171 +641,83 @@ def reindex_txn(conn):

return 1

async def _remove_deleted_devices_from_device_inbox(
self, progress: JsonDict, batch_size: int
async def _remove_dead_devices_from_device_inbox(
self,
progress: JsonDict,
batch_size: int,
) -> int:
"""A background update that deletes all device_inboxes for deleted devices.
This should only need to be run once (when users upgrade to v1.47.0)
"""A background update to remove devices that were either deleted or hidden from
the device_inbox table.
Args:
progress: JsonDict used to store progress of this background update
batch_size: the maximum number of rows to retrieve in a single select query
progress: The update's progress dict.
batch_size: The batch size for this update.
Returns:
The number of deleted rows
The number of rows deleted.
"""

def _remove_deleted_devices_from_device_inbox_txn(
def _remove_dead_devices_from_device_inbox_txn(
txn: LoggingTransaction,
) -> int:
"""stream_id is not unique
we need to use an inclusive `stream_id >= ?` clause,
since we might not have deleted all dead device messages for the stream_id
returned from the previous query
) -> Tuple[int, bool]:

Then delete only rows matching the `(user_id, device_id, stream_id)` tuple,
to avoid problems of deleting a large number of rows all at once
due to a single device having lots of device messages.
"""
if "max_stream_id" in progress:
max_stream_id = progress["max_stream_id"]
else:
txn.execute("SELECT max(stream_id) FROM device_inbox")
# There's a type mismatch here between how we want to type the row and
# what fetchone says it returns, but we silence it because we know that
# res can't be None.
res: Tuple[Optional[int]] = txn.fetchone() # type: ignore[assignment]
if res[0] is None:
# this can only happen if the `device_inbox` table is empty, in which
# case we have no work to do.
return 0, True
else:
max_stream_id = res[0]

last_stream_id = progress.get("stream_id", 0)
start = progress.get("stream_id", 0)
stop = start + batch_size

# delete rows in `device_inbox` which do *not* correspond to a known,
# unhidden device.
sql = """
SELECT device_id, user_id, stream_id
FROM device_inbox
DELETE FROM device_inbox
WHERE
stream_id >= ?
AND (device_id, user_id) NOT IN (
SELECT device_id, user_id FROM devices
stream_id >= ? AND stream_id < ?
AND NOT EXISTS (
SELECT * FROM devices d
WHERE
d.device_id=device_inbox.device_id
AND d.user_id=device_inbox.user_id
AND NOT hidden
)
ORDER BY stream_id
LIMIT ?
"""

txn.execute(sql, (last_stream_id, batch_size))
rows = txn.fetchall()
"""

num_deleted = 0
for row in rows:
num_deleted += self.db_pool.simple_delete_txn(
txn,
"device_inbox",
{"device_id": row[0], "user_id": row[1], "stream_id": row[2]},
)
txn.execute(sql, (start, stop))

if rows:
# send more than stream_id to progress
# otherwise it can happen in large deployments that
# no change of status is visible in the log file
# it may be that the stream_id does not change in several runs
self.db_pool.updates._background_update_progress_txn(
txn,
self.REMOVE_DELETED_DEVICES,
{
"device_id": rows[-1][0],
"user_id": rows[-1][1],
"stream_id": rows[-1][2],
},
)

return num_deleted

number_deleted = await self.db_pool.runInteraction(
"_remove_deleted_devices_from_device_inbox",
_remove_deleted_devices_from_device_inbox_txn,
)

# The task is finished when no more lines are deleted.
if not number_deleted:
await self.db_pool.updates._end_background_update(
self.REMOVE_DELETED_DEVICES
self.db_pool.updates._background_update_progress_txn(
txn,
self.REMOVE_DEAD_DEVICES_FROM_INBOX,
{
"stream_id": stop,
"max_stream_id": max_stream_id,
},
)

return number_deleted

async def _remove_hidden_devices_from_device_inbox(
self, progress: JsonDict, batch_size: int
) -> int:
"""A background update that deletes all device_inboxes for hidden devices.
This should only need to be run once (when users upgrade to v1.47.0)
Args:
progress: JsonDict used to store progress of this background update
batch_size: the maximum number of rows to retrieve in a single select query
Returns:
The number of deleted rows
"""

def _remove_hidden_devices_from_device_inbox_txn(
txn: LoggingTransaction,
) -> int:
"""stream_id is not unique
we need to use an inclusive `stream_id >= ?` clause,
since we might not have deleted all hidden device messages for the stream_id
returned from the previous query
Then delete only rows matching the `(user_id, device_id, stream_id)` tuple,
to avoid problems of deleting a large number of rows all at once
due to a single device having lots of device messages.
"""

last_stream_id = progress.get("stream_id", 0)

sql = """
SELECT device_id, user_id, stream_id
FROM device_inbox
WHERE
stream_id >= ?
AND (device_id, user_id) IN (
SELECT device_id, user_id FROM devices WHERE hidden = ?
)
ORDER BY stream_id
LIMIT ?
"""

txn.execute(sql, (last_stream_id, True, batch_size))
rows = txn.fetchall()

num_deleted = 0
for row in rows:
num_deleted += self.db_pool.simple_delete_txn(
txn,
"device_inbox",
{"device_id": row[0], "user_id": row[1], "stream_id": row[2]},
)

if rows:
# We don't just save the `stream_id` in progress as
# otherwise it can happen in large deployments that
# no change of status is visible in the log file, as
# it may be that the stream_id does not change in several runs
self.db_pool.updates._background_update_progress_txn(
txn,
self.REMOVE_HIDDEN_DEVICES,
{
"device_id": rows[-1][0],
"user_id": rows[-1][1],
"stream_id": rows[-1][2],
},
)

return num_deleted
return stop > max_stream_id

number_deleted = await self.db_pool.runInteraction(
"_remove_hidden_devices_from_device_inbox",
_remove_hidden_devices_from_device_inbox_txn,
finished = await self.db_pool.runInteraction(
"_remove_devices_from_device_inbox_txn",
_remove_dead_devices_from_device_inbox_txn,
)

# The task is finished when no more lines are deleted.
if not number_deleted:
if finished:
await self.db_pool.updates._end_background_update(
self.REMOVE_HIDDEN_DEVICES
self.REMOVE_DEAD_DEVICES_FROM_INBOX,
)

return number_deleted
return batch_size


class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):
Expand Down
@@ -0,0 +1,18 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Background update to clear the inboxes of hidden and deleted devices.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(6508, 'remove_dead_devices_from_device_inbox', '{}');
4 changes: 2 additions & 2 deletions tests/storage/databases/main/test_deviceinbox.py
Expand Up @@ -66,7 +66,7 @@ def test_background_remove_deleted_devices_from_device_inbox(self):
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": "remove_deleted_devices_from_device_inbox",
"update_name": "remove_dead_devices_from_device_inbox",
"progress_json": "{}",
},
)
Expand Down Expand Up @@ -140,7 +140,7 @@ def test_background_remove_hidden_devices_from_device_inbox(self):
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": "remove_hidden_devices_from_device_inbox",
"update_name": "remove_dead_devices_from_device_inbox",
"progress_json": "{}",
},
)
Expand Down

0 comments on commit 0d88c4f

Please sign in to comment.