Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Delete stale non-e2e devices for users, take 3 (#15183)
Browse files Browse the repository at this point in the history
This should help reduce the number of devices e.g. simple bots the repeatedly login rack up.

We only delete non-e2e devices as they should be safe to delete, whereas if we delete e2e devices for a user we may accidentally break their ability to receive e2e keys for a message.
  • Loading branch information
erikjohnston committed Mar 29, 2023
1 parent d0541e3 commit 78cdb72
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 7 deletions.
1 change: 1 addition & 0 deletions changelog.d/15183.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prune user's old devices on login if they have too many.
2 changes: 1 addition & 1 deletion synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ async def delete_all_devices_for_user(
device_ids = [d for d in device_ids if d != except_device_id]
await self.delete_devices(user_id, device_ids)

async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:
async def delete_devices(self, user_id: str, device_ids: StrCollection) -> None:
"""Delete several devices
Args:
Expand Down
50 changes: 48 additions & 2 deletions synapse/handlers/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"""Contains functions for registering clients."""

import logging
from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple

from prometheus_client import Counter
from typing_extensions import TypedDict
Expand All @@ -40,6 +40,7 @@
from synapse.config.server import is_threepid_reserved
from synapse.handlers.device import DeviceHandler
from synapse.http.servlet import assert_params_in_dict
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.login import RegisterDeviceReplicationServlet
from synapse.replication.http.register import (
ReplicationPostRegisterActionsServlet,
Expand All @@ -48,6 +49,7 @@
from synapse.spam_checker_api import RegistrationBehaviour
from synapse.types import RoomAlias, UserID, create_requester
from synapse.types.state import StateFilter
from synapse.util.iterutils import batch_iter

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -110,6 +112,10 @@ def __init__(self, hs: "HomeServer"):
self._server_notices_mxid = hs.config.servernotices.server_notices_mxid
self._server_name = hs.hostname

# The set of users that we're currently pruning devices for. Ensures
# that we don't have two such jobs for the same user running at once.
self._currently_pruning_devices_for_users: Set[str] = set()

self.spam_checker = hs.get_spam_checker()

if hs.config.worker.worker_app:
Expand All @@ -121,7 +127,10 @@ def __init__(self, hs: "HomeServer"):
ReplicationPostRegisterActionsServlet.make_client(hs)
)
else:
self.device_handler = hs.get_device_handler()
device_handler = hs.get_device_handler()
assert isinstance(device_handler, DeviceHandler)
self.device_handler = device_handler

self._register_device_client = self.register_device_inner
self.pusher_pool = hs.get_pusherpool()

Expand Down Expand Up @@ -851,6 +860,9 @@ class and RegisterDeviceReplicationServlet.
# This can only run on the main process.
assert isinstance(self.device_handler, DeviceHandler)

# Prune the user's device list if they already have a lot of devices.
await self._maybe_prune_too_many_devices(user_id)

registered_device_id = await self.device_handler.check_device_registered(
user_id,
device_id,
Expand Down Expand Up @@ -919,6 +931,40 @@ class and RegisterDeviceReplicationServlet.
"refresh_token": refresh_token,
}

async def _maybe_prune_too_many_devices(self, user_id: str) -> None:
"""Delete any excess old devices this user may have."""

if user_id in self._currently_pruning_devices_for_users:
return

# We also cap the number of users whose devices we prune at the same
# time, to avoid performance problems.
if len(self._currently_pruning_devices_for_users) > 5:
return

device_ids = await self.store.check_too_many_devices_for_user(user_id)
if not device_ids:
return

# Now spawn a background loop that deletes said devices.
async def _prune_too_many_devices_loop() -> None:
if user_id in self._currently_pruning_devices_for_users:
return

self._currently_pruning_devices_for_users.add(user_id)

try:
for batch in batch_iter(device_ids, 10):
await self.device_handler.delete_devices(user_id, batch)

await self.clock.sleep(60)
finally:
self._currently_pruning_devices_for_users.discard(user_id)

run_as_background_process(
"_prune_too_many_devices_loop", _prune_too_many_devices_loop
)

async def post_registration_actions(
self, user_id: str, auth_result: dict, access_token: Optional[str]
) -> None:
Expand Down
80 changes: 79 additions & 1 deletion synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1599,6 +1599,73 @@ def _txn(txn: LoggingTransaction) -> int:

return rows

async def check_too_many_devices_for_user(self, user_id: str) -> List[str]:
"""Check if the user has a lot of devices, and if so return the set of
devices we can prune.
This does *not* return hidden devices or devices with E2E keys.
"""

num_devices = await self.db_pool.simple_select_one_onecol(
table="devices",
keyvalues={"user_id": user_id, "hidden": False},
retcol="COALESCE(COUNT(*), 0)",
desc="count_devices",
)

# We let users have up to ten devices without pruning.
if num_devices <= 10:
return []

# We always prune devices not seen in the last 14 days...
max_last_seen = self._clock.time_msec() - 14 * 24 * 60 * 60 * 1000

# ... but we also cap the maximum number of devices the user can have to
# 50.
if num_devices > 50:
# Choose a last seen that ensures we keep at most 50 devices.
sql = """
SELECT last_seen FROM devices
LEFT JOIN e2e_device_keys_json USING (user_id, device_id)
WHERE
user_id = ?
AND NOT hidden
AND last_seen IS NOT NULL
AND key_json IS NULL
ORDER BY last_seen DESC
LIMIT 1
OFFSET 50
"""

rows = await self.db_pool.execute(
"check_too_many_devices_for_user_last_seen", None, sql, (user_id,)
)
if rows:
max_last_seen = max(rows[0][0], max_last_seen)

# Fetch the devices to delete.
sql = """
SELECT DISTINCT device_id FROM devices
LEFT JOIN e2e_device_keys_json USING (user_id, device_id)
WHERE
user_id = ?
AND NOT hidden
AND last_seen < ?
AND key_json IS NULL
ORDER BY last_seen
"""

def check_too_many_devices_for_user_txn(
txn: LoggingTransaction,
) -> List[str]:
txn.execute(sql, (user_id, max_last_seen))
return [device_id for device_id, in txn]

return await self.db_pool.runInteraction(
"check_too_many_devices_for_user",
check_too_many_devices_for_user_txn,
)


class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
# Because we have write access, this will be a StreamIdGenerator
Expand Down Expand Up @@ -1657,6 +1724,7 @@ async def store_device(
values={},
insertion_values={
"display_name": initial_device_display_name,
"last_seen": self._clock.time_msec(),
"hidden": False,
},
desc="store_device",
Expand Down Expand Up @@ -1702,7 +1770,15 @@ async def store_device(
)
raise StoreError(500, "Problem storing device.")

async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:
@cached(max_entries=0)
async def delete_device(self, user_id: str, device_id: str) -> None:
raise NotImplementedError()

# Note: sometimes deleting rows out of `device_inbox` can take a long time,
# so we use a cache so that we deduplicate in flight requests to delete
# devices.
@cachedList(cached_method_name="delete_device", list_name="device_ids")
async def delete_devices(self, user_id: str, device_ids: Collection[str]) -> dict:
"""Deletes several devices.
Args:
Expand Down Expand Up @@ -1739,6 +1815,8 @@ def _delete_devices_txn(txn: LoggingTransaction) -> None:
for device_id in device_ids:
self.device_id_exists_cache.invalidate((user_id, device_id))

return {}

async def update_device(
self, user_id: str, device_id: str, new_display_name: Optional[str] = None
) -> None:
Expand Down
2 changes: 1 addition & 1 deletion tests/handlers/test_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def test_devices(self) -> None:
self.assertIn("device_id", args[0][0])
self.assertIsNone(args[0][0]["display_name"])
self.assertIsNone(args[0][0]["last_seen_user_agent"])
self.assertIsNone(args[0][0]["last_seen_ts"])
self.assertEqual(args[0][0]["last_seen_ts"], 600)
self.assertIsNone(args[0][0]["last_seen_ip"])

def test_connections(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion tests/handlers/test_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def test_get_devices_by_user(self) -> None:
"device_id": "xyz",
"display_name": "display 0",
"last_seen_ip": None,
"last_seen_ts": None,
"last_seen_ts": 1000000,
},
device_map["xyz"],
)
Expand Down
4 changes: 3 additions & 1 deletion tests/storage/test_client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ def test_get_last_client_ip_by_device(self, after_persisting: bool) -> None:
)
)

last_seen = self.clock.time_msec()

if after_persisting:
# Trigger the storage loop
self.reactor.advance(10)
Expand All @@ -190,7 +192,7 @@ def test_get_last_client_ip_by_device(self, after_persisting: bool) -> None:
"device_id": device_id,
"ip": None,
"user_agent": None,
"last_seen": None,
"last_seen": last_seen,
},
],
)
Expand Down

0 comments on commit 78cdb72

Please sign in to comment.