Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Speed up deleting to-device messages task #16318

Merged
merged 2 commits into from Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16318.misc
@@ -0,0 +1 @@
Speed up task to delete to-device messages.
27 changes: 14 additions & 13 deletions synapse/handlers/device.py
Expand Up @@ -388,7 +388,8 @@ async def handle_room_un_partial_stated(self, room_id: str) -> None:
"Trying handling device list state for partial join: not supported on workers."
)

DEVICE_MSGS_DELETE_BATCH_LIMIT = 100
DEVICE_MSGS_DELETE_BATCH_LIMIT = 1000
DEVICE_MSGS_DELETE_SLEEP_MS = 1000

async def _delete_device_messages(
self,
Expand All @@ -400,19 +401,19 @@ async def _delete_device_messages(
device_id = task.params["device_id"]
up_to_stream_id = task.params["up_to_stream_id"]

res = await self.store.delete_messages_for_device(
user_id=user_id,
device_id=device_id,
up_to_stream_id=up_to_stream_id,
limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
)
# Delete the messages in batches to avoid too much DB load.
while True:
res = await self.store.delete_messages_for_device(
user_id=user_id,
device_id=device_id,
up_to_stream_id=up_to_stream_id,
limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
)

if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
return TaskStatus.COMPLETE, None, None
else:
# There is probably still device messages to be deleted, let's keep the task active and it will be run
# again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running).
return TaskStatus.ACTIVE, None, None
if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
return TaskStatus.COMPLETE, None, None

await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0)


class DeviceHandler(DeviceWorkerHandler):
Expand Down