-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Add config option to forget rooms automatically when users leave them #15224
Changes from 6 commits
d7d29f2
601fe80
b4926d9
923d523
dea5247
f3badbb
5dfada4
a7b6777
767347f
95e81e3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Add `forget_rooms_on_leave` config option to automatically forget rooms when users leave them or are removed from them. | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,7 @@ | |
import logging | ||
import random | ||
from http import HTTPStatus | ||
from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple | ||
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple | ||
|
||
from synapse import types | ||
from synapse.api.constants import ( | ||
|
@@ -38,7 +38,10 @@ | |
from synapse.events import EventBase | ||
from synapse.events.snapshot import EventContext | ||
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN | ||
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler | ||
from synapse.logging import opentracing | ||
from synapse.metrics import event_processing_positions | ||
from synapse.metrics.background_process_metrics import run_as_background_process | ||
from synapse.module_api import NOT_SPAM | ||
from synapse.types import ( | ||
JsonDict, | ||
|
@@ -278,9 +281,25 @@ async def _user_left_room(self, target: UserID, room_id: str) -> None: | |
""" | ||
raise NotImplementedError() | ||
|
||
@abc.abstractmethod | ||
async def forget(self, user: UserID, room_id: str) -> None: | ||
raise NotImplementedError() | ||
user_id = user.to_string() | ||
|
||
member = await self._storage_controllers.state.get_current_state_event( | ||
room_id=room_id, event_type=EventTypes.Member, state_key=user_id | ||
) | ||
membership = member.membership if member else None | ||
|
||
if membership is not None and membership not in [ | ||
Membership.LEAVE, | ||
Membership.BAN, | ||
]: | ||
raise SynapseError(400, "User %s in room %s" % (user_id, room_id)) | ||
|
||
# In normal case this call is only required if `membership` is not `None`. | ||
# But: After the last member had left the room, the background update | ||
# `_background_remove_left_rooms` is deleting rows related to this room from | ||
# the table `current_state_events` and `get_current_state_events` is `None`. | ||
await self.store.forget(user_id, room_id) | ||
Comment on lines
286
to
+304
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I couldn't see why forgetting rooms couldn't be done on any worker, so I made the action available to all workers. |
||
|
||
async def ratelimit_multiple_invites( | ||
self, | ||
|
@@ -2018,25 +2037,139 @@ async def _user_left_room(self, target: UserID, room_id: str) -> None: | |
"""Implements RoomMemberHandler._user_left_room""" | ||
user_left_room(self.distributor, target, room_id) | ||
|
||
async def forget(self, user: UserID, room_id: str) -> None: | ||
user_id = user.to_string() | ||
|
||
member = await self._storage_controllers.state.get_current_state_event( | ||
room_id=room_id, event_type=EventTypes.Member, state_key=user_id | ||
) | ||
membership = member.membership if member else None | ||
class RoomForgetterHandler(StateDeltasHandler): | ||
"""Forgets rooms when they are left, when enabled in the homeserver config. | ||
|
||
if membership is not None and membership not in [ | ||
Membership.LEAVE, | ||
Membership.BAN, | ||
]: | ||
raise SynapseError(400, "User %s in room %s" % (user_id, room_id)) | ||
For the purposes of this feature, kicks, bans and "leaves" via state resolution | ||
weirdness are all considered to be leaves. | ||
|
||
# In normal case this call is only required if `membership` is not `None`. | ||
# But: After the last member had left the room, the background update | ||
# `_background_remove_left_rooms` is deleting rows related to this room from | ||
# the table `current_state_events` and `get_current_state_events` is `None`. | ||
await self.store.forget(user_id, room_id) | ||
Derived from `StatsHandler` and `UserDirectoryHandler`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if it's worth factoring out the duplicated structure. There is certainly a lot of faff involved in adding another one of these state delta-consumers. |
||
""" | ||
|
||
def __init__(self, hs: "HomeServer"): | ||
super().__init__(hs) | ||
|
||
self._hs = hs | ||
self._store = hs.get_datastores().main | ||
self._storage_controllers = hs.get_storage_controllers() | ||
self._clock = hs.get_clock() | ||
self._notifier = hs.get_notifier() | ||
self._room_member_handler = hs.get_room_member_handler() | ||
|
||
# The current position in the current_state_delta stream | ||
self.pos: Optional[int] = None | ||
|
||
# Guard to ensure we only process deltas one at a time | ||
self._is_processing = False | ||
|
||
if hs.config.worker.run_background_tasks: | ||
self._notifier.add_replication_callback(self.notify_new_event) | ||
|
||
# We kick this off to pick up outstanding work from before the last restart. | ||
self._clock.call_later(0, self.notify_new_event) | ||
|
||
def notify_new_event(self) -> None: | ||
"""Called when there may be more deltas to process""" | ||
if self._is_processing: | ||
return | ||
|
||
self._is_processing = True | ||
|
||
async def process() -> None: | ||
try: | ||
await self._unsafe_process() | ||
finally: | ||
self._is_processing = False | ||
|
||
run_as_background_process("room_forgetter.notify_new_event", process) | ||
|
||
async def _unsafe_process(self) -> None: | ||
# If self.pos is None then means we haven't fetched it from DB | ||
if self.pos is None: | ||
self.pos = await self._store.get_room_forgetter_stream_pos() | ||
room_max_stream_ordering = self._store.get_room_max_stream_ordering() | ||
if self.pos > room_max_stream_ordering: | ||
# apparently, we've processed more events than exist in the database! | ||
# this can happen if events are removed with history purge or similar. | ||
logger.warning( | ||
"Event stream ordering appears to have gone backwards (%i -> %i): " | ||
"rewinding room forgetter processor", | ||
self.pos, | ||
room_max_stream_ordering, | ||
) | ||
self.pos = room_max_stream_ordering | ||
|
||
# Loop round handling deltas until we're up to date | ||
|
||
while True: | ||
# Be sure to read the max stream_ordering *before* checking if there are any outstanding | ||
# deltas, since there is otherwise a chance that we could miss updates which arrive | ||
# after we check the deltas. | ||
room_max_stream_ordering = self._store.get_room_max_stream_ordering() | ||
if self.pos == room_max_stream_ordering: | ||
break | ||
|
||
logger.debug( | ||
"Processing room forgetting %s->%s", self.pos, room_max_stream_ordering | ||
) | ||
if self._hs.config.room.forget_on_leave: | ||
( | ||
max_pos, | ||
deltas, | ||
) = await self._storage_controllers.state.get_current_state_deltas( | ||
self.pos, room_max_stream_ordering | ||
) | ||
|
||
logger.debug("Handling %d state deltas", len(deltas)) | ||
await self._handle_deltas(deltas) | ||
else: | ||
# Update the processing position, so that if the server admin turns the | ||
# feature on at a later date, we don't decide to forget every room that | ||
# has ever been left in the past. | ||
pass | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we lift this out of the loop and instead have a top level: if not self._hs.config.room.forget_on_leave:
await self._store.update_room_forgetter_stream_pos(max_pos)
return I think that's clearer? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
||
self.pos = max_pos | ||
|
||
# Expose current event processing position to prometheus | ||
event_processing_positions.labels("room_forgetter").set(max_pos) | ||
|
||
await self._store.update_room_forgetter_stream_pos(max_pos) | ||
|
||
async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None: | ||
"""Called with the state deltas to process""" | ||
for delta in deltas: | ||
typ = delta["type"] | ||
state_key = delta["state_key"] | ||
room_id = delta["room_id"] | ||
event_id = delta["event_id"] | ||
prev_event_id = delta["prev_event_id"] | ||
|
||
if typ != EventTypes.Member: | ||
continue | ||
|
||
if not self._hs.is_mine_id(state_key): | ||
continue | ||
|
||
change = await self._get_key_change( | ||
prev_event_id, | ||
event_id, | ||
key_name="membership", | ||
public_value=Membership.JOIN, | ||
) | ||
is_leave = change is MatchChange.now_false | ||
|
||
if is_leave: | ||
try: | ||
await self._room_member_handler.forget( | ||
UserID.from_string(state_key), room_id | ||
) | ||
except SynapseError as e: | ||
if e.code == 400: | ||
# The user is back in the room. | ||
pass | ||
else: | ||
raise | ||
|
||
|
||
def get_users_which_can_issue_invite(auth_events: StateMap[EventBase]) -> List[str]: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -82,7 +82,7 @@ class EventIdMembership: | |
membership: str | ||
|
||
|
||
class RoomMemberWorkerStore(EventsWorkerStore): | ||
class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): | ||
def __init__( | ||
self, | ||
database: DatabasePool, | ||
|
@@ -1368,6 +1368,55 @@ def _is_local_host_in_room_ignoring_users_txn( | |
_is_local_host_in_room_ignoring_users_txn, | ||
) | ||
|
||
async def forget(self, user_id: str, room_id: str) -> None: | ||
"""Indicate that user_id wishes to discard history for room_id.""" | ||
|
||
def f(txn: LoggingTransaction) -> None: | ||
sql = ( | ||
"UPDATE" | ||
" room_memberships" | ||
" SET" | ||
" forgotten = 1" | ||
" WHERE" | ||
" user_id = ?" | ||
" AND" | ||
" room_id = ?" | ||
) | ||
txn.execute(sql, (user_id, room_id)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While we're here can we change this to a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
||
self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id)) | ||
self._invalidate_cache_and_stream( | ||
txn, self.get_forgotten_rooms_for_user, (user_id,) | ||
) | ||
|
||
await self.db_pool.runInteraction("forget_membership", f) | ||
|
||
async def get_room_forgetter_stream_pos(self) -> int: | ||
"""Get the stream position of the background process to forget rooms when left | ||
by users. | ||
""" | ||
return await self.db_pool.simple_select_one_onecol( | ||
table="room_forgetter_stream_pos", | ||
keyvalues={}, | ||
retcol="stream_id", | ||
desc="room_forgetter_stream_pos", | ||
) | ||
|
||
async def update_room_forgetter_stream_pos(self, stream_id: int) -> None: | ||
"""Update the stream position of the background process to forget rooms when | ||
left by users. | ||
|
||
Must only be used by the worker running the background process. | ||
""" | ||
assert self.hs.config.worker.run_background_tasks | ||
|
||
await self.db_pool.simple_update_one( | ||
table="room_forgetter_stream_pos", | ||
keyvalues={}, | ||
updatevalues={"stream_id": stream_id}, | ||
desc="room_forgetter_stream_pos", | ||
) | ||
|
||
|
||
class RoomMemberBackgroundUpdateStore(SQLBaseStore): | ||
def __init__( | ||
|
@@ -1543,29 +1592,6 @@ def __init__( | |
): | ||
super().__init__(database, db_conn, hs) | ||
|
||
async def forget(self, user_id: str, room_id: str) -> None: | ||
"""Indicate that user_id wishes to discard history for room_id.""" | ||
|
||
def f(txn: LoggingTransaction) -> None: | ||
sql = ( | ||
"UPDATE" | ||
" room_memberships" | ||
" SET" | ||
" forgotten = 1" | ||
" WHERE" | ||
" user_id = ?" | ||
" AND" | ||
" room_id = ?" | ||
) | ||
txn.execute(sql, (user_id, room_id)) | ||
|
||
self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id)) | ||
self._invalidate_cache_and_stream( | ||
txn, self.get_forgotten_rooms_for_user, (user_id,) | ||
) | ||
|
||
await self.db_pool.runInteraction("forget_membership", f) | ||
|
||
|
||
def extract_heroes_from_room_summary( | ||
details: Mapping[str, MemberSummary], me: str | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* Copyright 2023 The Matrix.org Foundation C.I.C | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
CREATE TABLE room_forgetter_stream_pos ( | ||
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. | ||
stream_id BIGINT NOT NULL, | ||
CHECK (Lock='X') | ||
); | ||
|
||
INSERT INTO room_forgetter_stream_pos ( | ||
stream_id | ||
) SELECT COALESCE(MAX(stream_ordering), 0) from events; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this fix #4720 ?
We should update the issue description in that case or "Part of ..."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this does not purge rooms from the database at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this part of #4720? Seems like a prerequisite step
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is adjacent, but I see it as more optional than anything. #4720 sounds like it's scoped to the case where all users explicitly choose to forget a room themselves. This PR forces users to forget rooms, making the proposed GC in #4720 trigger more often.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤷 Feels like a nice step to make the GC automatic (sane default)