Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Speed up deleting to-device messages task (#16318)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Sep 14, 2023
1 parent 39dc5de commit e9e2904
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 13 deletions.
1 change: 1 addition & 0 deletions changelog.d/16318.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up task to delete to-device messages.
27 changes: 14 additions & 13 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,8 @@ async def handle_room_un_partial_stated(self, room_id: str) -> None:
"Trying handling device list state for partial join: not supported on workers."
)

DEVICE_MSGS_DELETE_BATCH_LIMIT = 100
DEVICE_MSGS_DELETE_BATCH_LIMIT = 1000
DEVICE_MSGS_DELETE_SLEEP_MS = 1000

async def _delete_device_messages(
self,
Expand All @@ -400,19 +401,19 @@ async def _delete_device_messages(
device_id = task.params["device_id"]
up_to_stream_id = task.params["up_to_stream_id"]

res = await self.store.delete_messages_for_device(
user_id=user_id,
device_id=device_id,
up_to_stream_id=up_to_stream_id,
limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
)
# Delete the messages in batches to avoid too much DB load.
while True:
res = await self.store.delete_messages_for_device(
user_id=user_id,
device_id=device_id,
up_to_stream_id=up_to_stream_id,
limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
)

if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
return TaskStatus.COMPLETE, None, None
else:
# There is probably still device messages to be deleted, let's keep the task active and it will be run
# again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running).
return TaskStatus.ACTIVE, None, None
if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
return TaskStatus.COMPLETE, None, None

await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0)


class DeviceHandler(DeviceWorkerHandler):
Expand Down

0 comments on commit e9e2904

Please sign in to comment.