From c9823807beb567b95766bd47f85db894308ef7d4 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 25 Nov 2019 11:43:28 +0000 Subject: [PATCH 01/45] Add database support for ephemeral messages --- synapse/storage/data_stores/main/events.py | 26 +++++++++++++++++++ .../main/schema/delta/56/event_expiry.sql | 19 ++++++++++++++ 2 files changed, 45 insertions(+) create mode 100644 synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 878f7568a63d..ab2b9d9d3cde 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -1956,6 +1956,32 @@ def insert_labels_for_event_txn( ], ) + def insert_event_expiry(self, event_id, expiry_ts): + return self._simple_insert( + table="event_expiry", + values={ + "event_id": event_id, + "expiry_ts": expiry_ts, + }, + desc="insert_event_expiry", + ) + + def delete_event_expiry(self, event_id): + return self._simple_delete( + table="event_expiry", + keyvalues={"event_id": event_id}, + desc="delete_event_expiry", + ) + + def get_events_to_expire(self): + return self._simple_select_list( + table="event_expiry", + keyvalues=None, + retcols=["event_id", "expiry_ts"], + desc="get_events_to_expire", + ) + + AllNewEventsResult = namedtuple( "AllNewEventsResult", diff --git a/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql b/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql new file mode 100644 index 000000000000..5b2fa7767ca5 --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql @@ -0,0 +1,19 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS event_expiry ( + event_id TEXT PRIMARY KEY, + expiry_ts BIGINT NOT NULL +); \ No newline at end of file From 6ad5e92e7e30a6b8b703bc915a9611b7e0048de4 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 25 Nov 2019 11:44:17 +0000 Subject: [PATCH 02/45] Add helper functions to redact messages on expiry The actual redaction isn't there yet though --- synapse/handlers/message.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index d682dc2b7a80..fd94348582b2 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -63,6 +63,10 @@ def __init__(self, hs): self.state_store = self.storage.state self._event_serializer = hs.get_event_client_serializer() + run_as_background_process( + "_schedule_redactions_from_db", self._schedule_redactions_from_db + ) + @defer.inlineCallbacks def get_room_data( self, user_id=None, room_id=None, event_type=None, state_key="", is_guest=False @@ -225,6 +229,36 @@ def get_joined_members(self, requester, room_id): for user_id, profile in iteritems(users_with_profile) } + @defer.inlineCallbacks + def schedule_redaction(self, event_id, redaction_ts): + yield self.store.insert_event_expiry(event_id, redaction_ts) + + now_ms = self.clock.time_msec() + + if redaction_ts <= now_ms: + # If the event should have already been redacted, redact it now. + yield self._generate_and_send_synthetic_redaction(event_id) + else: + # Otherwise, figure out how many seconds we need to wait before redacting the + # event. + delay = (redaction_ts - now_ms) / 1000 + self.clock.call_later( + delay, self._generate_and_send_synthetic_redaction, event_id + ) + + @defer.inlineCallbacks + def _schedule_redactions_from_db(self): + events_to_expire = yield self.store.get_events_to_expire() + + for event in events_to_expire: + yield self.schedule_redaction(event["event_id"], event["expiry_ts"]) + + @defer.inlineCallbacks + def _generate_and_send_synthetic_redaction(self, event_id): + # TODO: actually generate and send the redaction. + + yield self.store.delete_event_expiry(event_id) + # The duration (in ms) after which rooms should be removed # `_rooms_to_exclude_from_dummy_event_insertion` (with the effect that we will try From 2628e19f247d05c52cb7e389c1b66c97532b2480 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 25 Nov 2019 14:23:17 +0000 Subject: [PATCH 03/45] Plug redaction scheduling in the right places --- synapse/handlers/federation.py | 7 +++++++ synapse/handlers/message.py | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0e904f2da0cb..86266745605d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -141,6 +141,8 @@ def __init__(self, hs): self.third_party_event_rules = hs.get_third_party_event_rules() + self._message_handler = hs.get_message_handler() + @defer.inlineCallbacks def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False): """ Process a PDU received via a federation /send/ transaction, or @@ -1707,6 +1709,11 @@ def _handle_new_event( self.store.remove_push_actions_from_staging, event.event_id ) + # If there's an expiry timestamp, schedule the redaction of the event. + expiry_ts = event.content.get("m.self_destruct_after") + if isinstance(expiry_ts, int): + yield self._message_handler.schedule_redaction(event.event_id, expiry_ts) + return context @defer.inlineCallbacks diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index fd94348582b2..89e7c4d799d1 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -329,6 +329,8 @@ def __init__(self, hs): 5 * 60 * 1000, ) + self._message_handler = hs.get_message_handler() + @defer.inlineCallbacks def create_event( self, @@ -770,6 +772,11 @@ def handle_new_client_event( self.store.remove_push_actions_from_staging, event.event_id ) + # If there's an expiry timestamp, schedule the redaction of the event. + expiry_ts = event.content.get("m.self_destruct_after") + if isinstance(expiry_ts, int): + yield self._message_handler.schedule_redaction(event.event_id, expiry_ts) + @defer.inlineCallbacks def persist_and_notify_client_event( self, requester, event, context, ratelimit=True, extra_users=[] From 8255b9eb9c279cec3fa523c4dde3bc06c97dd6e5 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 25 Nov 2019 14:25:21 +0000 Subject: [PATCH 04/45] Changelog --- changelog.d/6409.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/6409.feature diff --git a/changelog.d/6409.feature b/changelog.d/6409.feature new file mode 100644 index 000000000000..653ff5a5ad6a --- /dev/null +++ b/changelog.d/6409.feature @@ -0,0 +1 @@ +Add ephemeral messages support by partially implementing [MSC2228](https://github.com/matrix-org/matrix-doc/pull/2228). From 8563e2bef683766947b8f62d78dce132de2c174b Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 25 Nov 2019 14:27:36 +0000 Subject: [PATCH 05/45] Lint --- synapse/storage/data_stores/main/events.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index ab2b9d9d3cde..b16c8476b783 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -1959,10 +1959,7 @@ def insert_labels_for_event_txn( def insert_event_expiry(self, event_id, expiry_ts): return self._simple_insert( table="event_expiry", - values={ - "event_id": event_id, - "expiry_ts": expiry_ts, - }, + values={"event_id": event_id, "expiry_ts": expiry_ts}, desc="insert_event_expiry", ) @@ -1982,7 +1979,6 @@ def get_events_to_expire(self): ) - AllNewEventsResult = namedtuple( "AllNewEventsResult", [ From f2c60180ab07720757be813053108ba9cf01a93e Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 26 Nov 2019 15:21:57 +0000 Subject: [PATCH 06/45] Actually delete events on expiry --- synapse/handlers/message.py | 14 +++++++---- synapse/storage/data_stores/main/events.py | 28 +++++++++++++++++----- 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 89e7c4d799d1..3ba4ea71ee5c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -237,13 +237,13 @@ def schedule_redaction(self, event_id, redaction_ts): if redaction_ts <= now_ms: # If the event should have already been redacted, redact it now. - yield self._generate_and_send_synthetic_redaction(event_id) + yield self._delete_expired_event(event_id) else: # Otherwise, figure out how many seconds we need to wait before redacting the # event. delay = (redaction_ts - now_ms) / 1000 self.clock.call_later( - delay, self._generate_and_send_synthetic_redaction, event_id + delay, self._delete_expired_event, event_id ) @defer.inlineCallbacks @@ -254,10 +254,14 @@ def _schedule_redactions_from_db(self): yield self.schedule_redaction(event["event_id"], event["expiry_ts"]) @defer.inlineCallbacks - def _generate_and_send_synthetic_redaction(self, event_id): - # TODO: actually generate and send the redaction. + def _delete_expired_event(self, event_id): + event = yield self.store.get_event(event_id) - yield self.store.delete_event_expiry(event_id) + if not event: + logger.warning("Can't delete event %s because we don't have it." % event_id) + return + + yield self.store.delete_expired_event(event) # The duration (in ms) after which rooms should be removed diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index b16c8476b783..ced883c13f57 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -1963,12 +1963,28 @@ def insert_event_expiry(self, event_id, expiry_ts): desc="insert_event_expiry", ) - def delete_event_expiry(self, event_id): - return self._simple_delete( - table="event_expiry", - keyvalues={"event_id": event_id}, - desc="delete_event_expiry", - ) + def delete_expired_event(self, event): + """ + Args: + event (events.EventBase): + """ + pruned_json = encode_json(prune_event_dict(event.get_dict())) + + def delete_expired_event_txn(txn): + self._simple_update_one_txn( + txn, + table="event_json", + keyvalues={"event_id": event.event_id}, + updatevalues={"json": pruned_json}, + ) + + self._simple_delete_txn( + txn, + table="event_expiry", + keyvalues={"event_id": event.event_id}, + ) + + yield self.runInteraction("delete_expired_event", delete_expired_event_txn) def get_events_to_expire(self): return self._simple_select_list( From 01d187675254ab0c588008a5f10895a46568ce61 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 26 Nov 2019 16:40:36 +0000 Subject: [PATCH 07/45] Rename functions + doc --- synapse/handlers/federation.py | 4 ++- synapse/handlers/message.py | 40 +++++++++++++++++++--- synapse/storage/data_stores/main/events.py | 36 +++++++++++++++++-- 3 files changed, 72 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 86266745605d..4cfb28132dc5 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1712,7 +1712,9 @@ def _handle_new_event( # If there's an expiry timestamp, schedule the redaction of the event. expiry_ts = event.content.get("m.self_destruct_after") if isinstance(expiry_ts, int): - yield self._message_handler.schedule_redaction(event.event_id, expiry_ts) + yield self._message_handler.schedule_deletion_expired( + event.event_id, expiry_ts + ) return context diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3ba4ea71ee5c..dc7b54e29624 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -64,7 +64,8 @@ def __init__(self, hs): self._event_serializer = hs.get_event_client_serializer() run_as_background_process( - "_schedule_redactions_from_db", self._schedule_redactions_from_db + "_schedule_deletions_expired_from_db", + self._schedule_deletions_expired_from_db, ) @defer.inlineCallbacks @@ -230,7 +231,16 @@ def get_joined_members(self, requester, room_id): } @defer.inlineCallbacks - def schedule_redaction(self, event_id, redaction_ts): + def schedule_deletion_expired(self, event_id, redaction_ts): + """Schedule the deletion of an expired event, or if that event should have + expired then delete the event immediately. + + Args: + event_id (str): The ID of the event to schedule the deletion of. + redaction_ts (int): The timestamp to delete the event at. + """ + # Save the timestamp at which the event expires so that we can reschedule its + # deletion on startup if the server is stopped before the event is deleted. yield self.store.insert_event_expiry(event_id, redaction_ts) now_ms = self.clock.time_msec() @@ -247,20 +257,38 @@ def schedule_redaction(self, event_id, redaction_ts): ) @defer.inlineCallbacks - def _schedule_redactions_from_db(self): + def _schedule_deletions_expired_from_db(self): + """Load the IDs of the events that have an expiry date from the database (and + their expiry timestamp) and either delete them (if the expiry date is now or in + the past) or schedule their deletion on the timestamp. + """ events_to_expire = yield self.store.get_events_to_expire() for event in events_to_expire: - yield self.schedule_redaction(event["event_id"], event["expiry_ts"]) + yield self.schedule_deletion_expired(event["event_id"], event["expiry_ts"]) @defer.inlineCallbacks def _delete_expired_event(self, event_id): + """Retrieve and delete an expired event from the database. + + If we don't have the event in the database, log it and delete the expiry date + from the database (so that we don't try to delete it again). + + Args: + event_id (str): The ID of the event to retrieve and delete. + """ + # Try to retrieve the event from the database. event = yield self.store.get_event(event_id) if not event: + # If we can't find the event, log a warning and delete the expiry date from + # the database so that we don't try to delete it again in the future. logger.warning("Can't delete event %s because we don't have it." % event_id) + yield self.store.delete_event_expiry(event) return + # Delete the event. This function also deletes the expiry date from the database + # in the same database transaction. yield self.store.delete_expired_event(event) @@ -779,7 +807,9 @@ def handle_new_client_event( # If there's an expiry timestamp, schedule the redaction of the event. expiry_ts = event.content.get("m.self_destruct_after") if isinstance(expiry_ts, int): - yield self._message_handler.schedule_redaction(event.event_id, expiry_ts) + yield self._message_handler.schedule_deletion_expired( + event.event_id, expiry_ts + ) @defer.inlineCallbacks def persist_and_notify_client_event( diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index ced883c13f57..b384c2177d46 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -1957,6 +1957,12 @@ def insert_labels_for_event_txn( ) def insert_event_expiry(self, event_id, expiry_ts): + """Save the expiry timestamp associated with a given event ID. + + Args: + event_id (str): The event ID the expiry timestamp is associated with. + expiry_ts (int): The timestamp at which to expire (delete) the event. + """ return self._simple_insert( table="event_expiry", values={"event_id": event_id, "expiry_ts": expiry_ts}, @@ -1964,13 +1970,19 @@ def insert_event_expiry(self, event_id, expiry_ts): ) def delete_expired_event(self, event): - """ + """Delete an event that has expired by replacing its entry in event_json with a + pruned version of its JSON representation, and delete its associated expiry + timestamp. + Args: - event (events.EventBase): + event (events.EventBase): The event to delete. """ + # Prune the event's dict then convert it to a JSON string. pruned_json = encode_json(prune_event_dict(event.get_dict())) def delete_expired_event_txn(txn): + # Update the event_json table to replace the event's JSON with the pruned + # JSON. self._simple_update_one_txn( txn, table="event_json", @@ -1978,6 +1990,7 @@ def delete_expired_event_txn(txn): updatevalues={"json": pruned_json}, ) + # Delete the expiry timestamp associated with this event from the database. self._simple_delete_txn( txn, table="event_expiry", @@ -1986,7 +1999,26 @@ def delete_expired_event_txn(txn): yield self.runInteraction("delete_expired_event", delete_expired_event_txn) + def delete_event_expiry(self, event_id): + """Delete the expiry timestamp associated with an event ID without deleting the + actual event. + + Args: + event_id (str): The event ID to delete the associated expiry timestamp of. + """ + return self._simple_delete( + table="event_expiry", + keyvalues={"event_id": event_id}, + desc="delete_event_expiry", + ) + def get_events_to_expire(self): + """Retrieve the IDs of the events we have an expiry timestamp for, along with + said timestamp. + + Returns: + A list of dicts, each containing an event_id and an expiry_ts. + """ return self._simple_select_list( table="event_expiry", keyvalues=None, From 274612a4c290cb7b17c4dcdd71ff611196b81494 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 26 Nov 2019 16:53:32 +0000 Subject: [PATCH 08/45] Lint --- synapse/handlers/message.py | 4 +--- synapse/storage/data_stores/main/events.py | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index dc7b54e29624..33109537de56 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -252,9 +252,7 @@ def schedule_deletion_expired(self, event_id, redaction_ts): # Otherwise, figure out how many seconds we need to wait before redacting the # event. delay = (redaction_ts - now_ms) / 1000 - self.clock.call_later( - delay, self._delete_expired_event, event_id - ) + self.clock.call_later(delay, self._delete_expired_event, event_id) @defer.inlineCallbacks def _schedule_deletions_expired_from_db(self): diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index b384c2177d46..98ff99e890f2 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -1992,9 +1992,7 @@ def delete_expired_event_txn(txn): # Delete the expiry timestamp associated with this event from the database. self._simple_delete_txn( - txn, - table="event_expiry", - keyvalues={"event_id": event.event_id}, + txn, table="event_expiry", keyvalues={"event_id": event.event_id} ) yield self.runInteraction("delete_expired_event", delete_expired_event_txn) From d4245490b6c16752e64c4e0ebb22e00e9871e0e5 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 26 Nov 2019 17:08:40 +0000 Subject: [PATCH 09/45] Move the field name to a constant --- synapse/api/constants.py | 3 +++ synapse/handlers/federation.py | 9 +++++++-- synapse/handlers/message.py | 10 ++++++++-- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 49c4b8505446..42d43128b422 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -145,3 +145,6 @@ class EventContentFields(object): # Labels for the event, cf https://github.com/matrix-org/matrix-doc/pull/2326 LABELS = "org.matrix.labels" + # Timestamp to delete the event after + # cf https://github.com/matrix-org/matrix-doc/pull/2228 + SELF_DESTRUCT_AFTER = "m.self_destruct_after" diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 4cfb28132dc5..24d1c2259e75 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -31,7 +31,12 @@ from twisted.internet import defer from synapse import event_auth -from synapse.api.constants import EventTypes, Membership, RejectedReason +from synapse.api.constants import ( + EventContentFields, + EventTypes, + Membership, + RejectedReason, +) from synapse.api.errors import ( AuthError, CodeMessageException, @@ -1710,7 +1715,7 @@ def _handle_new_event( ) # If there's an expiry timestamp, schedule the redaction of the event. - expiry_ts = event.content.get("m.self_destruct_after") + expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) if isinstance(expiry_ts, int): yield self._message_handler.schedule_deletion_expired( event.event_id, expiry_ts diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 33109537de56..345b5484deff 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -24,7 +24,13 @@ from twisted.internet.defer import succeed from synapse import event_auth -from synapse.api.constants import EventTypes, Membership, RelationTypes, UserTypes +from synapse.api.constants import ( + EventContentFields, + EventTypes, + Membership, + RelationTypes, + UserTypes, +) from synapse.api.errors import ( AuthError, Codes, @@ -803,7 +809,7 @@ def handle_new_client_event( ) # If there's an expiry timestamp, schedule the redaction of the event. - expiry_ts = event.content.get("m.self_destruct_after") + expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) if isinstance(expiry_ts, int): yield self._message_handler.schedule_deletion_expired( event.event_id, expiry_ts From 052294e2037baf0dc5e69c85d99d0440ce24176b Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 26 Nov 2019 17:23:47 +0000 Subject: [PATCH 10/45] Hide the feature behind a configuration flag --- synapse/config/server.py | 2 ++ synapse/handlers/message.py | 10 ++++++++++ 2 files changed, 12 insertions(+) diff --git a/synapse/config/server.py b/synapse/config/server.py index 00d01c43af8a..85528daf49de 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -372,6 +372,8 @@ class LimitRemoteRoomsConfig(object): "cleanup_extremities_with_dummy_events", True ) + self.enable_ephemeral_messages = config.get("enable_ephemeral_messages", False) + def has_tls_listener(self) -> bool: return any(l["tls"] for l in self.listeners) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 345b5484deff..1cc53b5f5071 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -68,6 +68,7 @@ def __init__(self, hs): self.storage = hs.get_storage() self.state_store = self.storage.state self._event_serializer = hs.get_event_client_serializer() + self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages run_as_background_process( "_schedule_deletions_expired_from_db", @@ -245,6 +246,9 @@ def schedule_deletion_expired(self, event_id, redaction_ts): event_id (str): The ID of the event to schedule the deletion of. redaction_ts (int): The timestamp to delete the event at. """ + if not self._ephemeral_events_enabled: + return + # Save the timestamp at which the event expires so that we can reschedule its # deletion on startup if the server is stopped before the event is deleted. yield self.store.insert_event_expiry(event_id, redaction_ts) @@ -266,6 +270,9 @@ def _schedule_deletions_expired_from_db(self): their expiry timestamp) and either delete them (if the expiry date is now or in the past) or schedule their deletion on the timestamp. """ + if not self._ephemeral_events_enabled: + return + events_to_expire = yield self.store.get_events_to_expire() for event in events_to_expire: @@ -281,6 +288,9 @@ def _delete_expired_event(self, event_id): Args: event_id (str): The ID of the event to retrieve and delete. """ + if not self._ephemeral_events_enabled: + return + # Try to retrieve the event from the database. event = yield self.store.get_event(event_id) From dee234f22679c53d5cca37ecdc4706b684c07ebb Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 26 Nov 2019 17:26:11 +0000 Subject: [PATCH 11/45] Don't expire state events --- synapse/handlers/federation.py | 2 +- synapse/handlers/message.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 24d1c2259e75..64a0d480c5d3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1716,7 +1716,7 @@ def _handle_new_event( # If there's an expiry timestamp, schedule the redaction of the event. expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) - if isinstance(expiry_ts, int): + if isinstance(expiry_ts, int) and not event.is_state(): yield self._message_handler.schedule_deletion_expired( event.event_id, expiry_ts ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 1cc53b5f5071..dc638209a375 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -820,7 +820,7 @@ def handle_new_client_event( # If there's an expiry timestamp, schedule the redaction of the event. expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) - if isinstance(expiry_ts, int): + if isinstance(expiry_ts, int) and not event.is_state(): yield self._message_handler.schedule_deletion_expired( event.event_id, expiry_ts ) From 36230a588434e51dd9ae68dd1e875882ceecbc93 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 27 Nov 2019 11:20:44 +0000 Subject: [PATCH 12/45] Test case and various fixes --- synapse/handlers/message.py | 14 +-- synapse/storage/data_stores/main/events.py | 8 +- tests/rest/client/test_ephemeral_message.py | 101 ++++++++++++++++++++ 3 files changed, 115 insertions(+), 8 deletions(-) create mode 100644 tests/rest/client/test_ephemeral_message.py diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index dc638209a375..e0eacc038263 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -254,15 +254,15 @@ def schedule_deletion_expired(self, event_id, redaction_ts): yield self.store.insert_event_expiry(event_id, redaction_ts) now_ms = self.clock.time_msec() + delay = (redaction_ts - now_ms) / 1000 - if redaction_ts <= now_ms: + if delay > 0: + # Figure out how many seconds we need to wait before redacting the event. + logger.info("Scheduling deletion of event %s in %.3fs", event_id, delay) + self.clock.call_later(delay, self._delete_expired_event, event_id) + else: # If the event should have already been redacted, redact it now. yield self._delete_expired_event(event_id) - else: - # Otherwise, figure out how many seconds we need to wait before redacting the - # event. - delay = (redaction_ts - now_ms) / 1000 - self.clock.call_later(delay, self._delete_expired_event, event_id) @defer.inlineCallbacks def _schedule_deletions_expired_from_db(self): @@ -291,6 +291,8 @@ def _delete_expired_event(self, event_id): if not self._ephemeral_events_enabled: return + logger.info("Deleting expired event %s", event_id) + # Try to retrieve the event from the database. event = yield self.store.get_event(event_id) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index e2d1b2e406bd..f501c055e317 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -1975,7 +1975,7 @@ def delete_expired_event(self, event): Args: event (events.EventBase): The event to delete. """ - # Prune the event's dict then convert it to a JSON string. + # Prune the event's dict then convert it to JSON. pruned_json = encode_json(prune_event_dict(event.get_dict())) def delete_expired_event_txn(txn): @@ -1993,7 +1993,11 @@ def delete_expired_event_txn(txn): txn, table="event_expiry", keyvalues={"event_id": event.event_id} ) - yield self.runInteraction("delete_expired_event", delete_expired_event_txn) + # We need to invalidate the event cache entry for this event because we + # changed its content in the database. + self._get_event_cache.invalidate((event.event_id,)) + + return self.runInteraction("delete_expired_event", delete_expired_event_txn) def delete_event_expiry(self, event_id): """Delete the expiry timestamp associated with an event ID without deleting the diff --git a/tests/rest/client/test_ephemeral_message.py b/tests/rest/client/test_ephemeral_message.py new file mode 100644 index 000000000000..8d9fe4bf36db --- /dev/null +++ b/tests/rest/client/test_ephemeral_message.py @@ -0,0 +1,101 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.api.constants import EventContentFields, EventTypes +from synapse.rest import admin +from synapse.rest.client.v1 import room + +from tests import unittest + + +class EphemeralMessageTestCase(unittest.HomeserverTestCase): + + user_id = "@user:test" + + servlets = [ + admin.register_servlets, + room.register_servlets, + ] + + def make_homeserver(self, reactor, clock): + config = self.default_config() + + config["enable_ephemeral_messages"] = True + + self.hs = self.setup_test_homeserver(config=config) + return self.hs + + def prepare(self, reactor, clock, homeserver): + self.room_id = self.helper.create_room_as(self.user_id) + + def test_message_expiry_no_delay(self): + """Tests that sending a message sent with a m.self_destruct_after field set to the + past results in that event being deleted right away. + """ + # Send a message in the room that has expired. From here, the reactor clock is + # at 200ms, so 0 is in the past, and even if that wasn't the case and the clock + # is at 0ms the code path is the same if the event's expiry timestamp is the + # current timestamp. + res = self.helper.send_event( + room_id=self.room_id, + type=EventTypes.Message, + content={ + "msgtype": "m.text", + "body": "hello", + EventContentFields.SELF_DESTRUCT_AFTER: 0, + } + ) + event_id = res["event_id"] + + # Check that we can't retrieve the content of the event. + event_content = self.get_event(self.room_id, event_id)["content"] + self.assertFalse(bool(event_content), event_content) + + def test_message_expiry_delay(self): + """Tests that sending a message with a m.self_destruct_after field set to the + future results in that event not being deleted right away, but advancing the + clock to after that expiry timestamp causes the event to be deleted. + """ + # Send a message in the room that'll expire in 1s. + res = self.helper.send_event( + room_id=self.room_id, + type=EventTypes.Message, + content={ + "msgtype": "m.text", + "body": "hello", + EventContentFields.SELF_DESTRUCT_AFTER: self.clock.time_msec() + 1000, + } + ) + event_id = res["event_id"] + + # Check that we can retrieve the content of the event before it has expired. + event_content = self.get_event(self.room_id, event_id)["content"] + self.assertTrue(bool(event_content), event_content) + + # Advance the clock to after the deletion. + self.reactor.advance(1) + + # Check that we can't retrieve the content of the event anymore. + event_content = self.get_event(self.room_id, event_id)["content"] + self.assertFalse(bool(event_content), event_content) + + def get_event(self, room_id, event_id, expected_code=200): + url = "/_matrix/client/r0/rooms/%s/event/%s" % (room_id, event_id) + + request, channel = self.make_request("GET", url) + self.render(request) + + self.assertEqual(channel.code, expected_code, channel.result) + + return channel.json_body From 89765f416ac201148010fae59f8a0ba0ba2ef146 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 27 Nov 2019 11:43:19 +0000 Subject: [PATCH 13/45] Lint --- tests/rest/client/test_ephemeral_message.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/rest/client/test_ephemeral_message.py b/tests/rest/client/test_ephemeral_message.py index 8d9fe4bf36db..5e9c07ebf3ff 100644 --- a/tests/rest/client/test_ephemeral_message.py +++ b/tests/rest/client/test_ephemeral_message.py @@ -54,7 +54,7 @@ def test_message_expiry_no_delay(self): "msgtype": "m.text", "body": "hello", EventContentFields.SELF_DESTRUCT_AFTER: 0, - } + }, ) event_id = res["event_id"] @@ -75,7 +75,7 @@ def test_message_expiry_delay(self): "msgtype": "m.text", "body": "hello", EventContentFields.SELF_DESTRUCT_AFTER: self.clock.time_msec() + 1000, - } + }, ) event_id = res["event_id"] From ceab0a80c932e88812ec12978629a7be2f32e2c3 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 27 Nov 2019 12:28:57 +0000 Subject: [PATCH 14/45] Add background update This is a slightly modified copy of the event_store_labels background update, adapted to extract and store expiry timestamps for events. --- .../data_stores/main/events_bg_updates.py | 67 +++++++++++++++++++ .../main/schema/delta/56/event_expiry.sql | 5 +- 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py index aa87f9abc538..1508945eaa75 100644 --- a/synapse/storage/data_stores/main/events_bg_updates.py +++ b/synapse/storage/data_stores/main/events_bg_updates.py @@ -90,6 +90,10 @@ def __init__(self, db_conn, hs): "event_store_labels", self._event_store_labels ) + self.register_background_update_handler( + "event_store_expiry", self._event_store_expiry + ) + @defer.inlineCallbacks def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] @@ -573,3 +577,66 @@ def _event_store_labels_txn(txn): yield self._end_background_update("event_store_labels") return num_rows + + @defer.inlineCallbacks + def _event_store_expiry(self, progress, batch_size): + """Background update handler which will store expiry timestamps for existing + events. + """ + last_event_id = progress.get("last_event_id", "") + + def _event_store_labels_txn(txn): + txn.execute( + """ + SELECT event_id, json FROM event_json + LEFT JOIN event_expiry USING (event_id) + WHERE event_id > ? AND expiry_ts IS NULL + ORDER BY event_id LIMIT ? + """, + (last_event_id, batch_size), + ) + + results = list(txn) + + nbrows = 0 + last_row_event_id = "" + for (event_id, event_json_raw) in results: + try: + event_json = json.loads(event_json_raw) + + expiry_ts = event_json.get(EventContentFields.SELF_DESTRUCT_AFTER) + + if isinstance(expiry_ts, int): + self._simple_insert_txn( + txn=txn, + table="event_expiry", + values={ + "event_id": event_id, + "expiry_ts": expiry_ts + } + ) + except Exception as e: + logger.warning( + "Unable to load event %s (no expiry timestamp will be imported):" + " %s", + event_id, + e, + ) + + nbrows += 1 + last_row_event_id = event_id + + self._background_update_progress_txn( + txn, "event_store_expiry", {"last_event_id": last_row_event_id} + ) + + return nbrows + + num_rows = yield self.runInteraction( + desc="event_store_expiry", func=_event_store_labels_txn + ) + + if not num_rows: + yield self._end_background_update("event_store_expiry") + + return num_rows diff --git a/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql b/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql index 5b2fa7767ca5..49dc86a68653 100644 --- a/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql +++ b/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql @@ -16,4 +16,7 @@ CREATE TABLE IF NOT EXISTS event_expiry ( event_id TEXT PRIMARY KEY, expiry_ts BIGINT NOT NULL -); \ No newline at end of file +); + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('event_store_expiry', '{}'); From 26dec61e6bac8f4e755c19fcf0547292b9055993 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 27 Nov 2019 14:32:27 +0000 Subject: [PATCH 15/45] Lint --- synapse/storage/data_stores/main/events_bg_updates.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py index 1508945eaa75..d6f8e6e24f1d 100644 --- a/synapse/storage/data_stores/main/events_bg_updates.py +++ b/synapse/storage/data_stores/main/events_bg_updates.py @@ -610,10 +610,7 @@ def _event_store_labels_txn(txn): self._simple_insert_txn( txn=txn, table="event_expiry", - values={ - "event_id": event_id, - "expiry_ts": expiry_ts - } + values={"event_id": event_id, "expiry_ts": expiry_ts}, ) except Exception as e: logger.warning( From 2ac7869e47c934178f4f11baa5b48b9c9d062884 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 27 Nov 2019 16:43:16 +0000 Subject: [PATCH 16/45] Update synapse/api/constants.py Co-Authored-By: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/api/constants.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 42d43128b422..6fbfcaf66a29 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -145,6 +145,7 @@ class EventContentFields(object): # Labels for the event, cf https://github.com/matrix-org/matrix-doc/pull/2326 LABELS = "org.matrix.labels" + # Timestamp to delete the event after # cf https://github.com/matrix-org/matrix-doc/pull/2228 SELF_DESTRUCT_AFTER = "m.self_destruct_after" From a4307c692c5a6145486b1caa06bc18a53b9cadbe Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 27 Nov 2019 17:29:36 +0000 Subject: [PATCH 17/45] Incorporate part of the review --- synapse/api/constants.py | 2 +- synapse/handlers/federation.py | 15 +++--- synapse/handlers/message.py | 62 ++++++++++++---------- synapse/storage/data_stores/main/events.py | 6 +-- 4 files changed, 44 insertions(+), 41 deletions(-) diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 6fbfcaf66a29..a66a7f279720 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -148,4 +148,4 @@ class EventContentFields(object): # Timestamp to delete the event after # cf https://github.com/matrix-org/matrix-doc/pull/2228 - SELF_DESTRUCT_AFTER = "m.self_destruct_after" + SELF_DESTRUCT_AFTER = "org.matrix.self_destruct_after" diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 64a0d480c5d3..89d035a894d8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1714,13 +1714,6 @@ def _handle_new_event( self.store.remove_push_actions_from_staging, event.event_id ) - # If there's an expiry timestamp, schedule the redaction of the event. - expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) - if isinstance(expiry_ts, int) and not event.is_state(): - yield self._message_handler.schedule_deletion_expired( - event.event_id, expiry_ts - ) - return context @defer.inlineCallbacks @@ -2726,6 +2719,14 @@ def persist_events_and_notify(self, event_and_contexts, backfilled=False): for event, _ in event_and_contexts: yield self._notify_persisted_event(event, max_stream_id) + for (event, context) in event_and_contexts: + # If there's an expiry timestamp, schedule the redaction of the event. + expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) + if isinstance(expiry_ts, int) and not event.is_state(): + yield self._message_handler.schedule_event_expiry( + event.event_id, expiry_ts + ) + def _notify_persisted_event(self, event, max_stream_id): """Checks to see if notifier/pushers should be notified about the event or not. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e0eacc038263..3d9703cb3aa3 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -71,8 +71,8 @@ def __init__(self, hs): self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages run_as_background_process( - "_schedule_deletions_expired_from_db", - self._schedule_deletions_expired_from_db, + "_schedule_event_expiries_from_db", + self._schedule_event_expiries_from_db, ) @defer.inlineCallbacks @@ -238,37 +238,37 @@ def get_joined_members(self, requester, room_id): } @defer.inlineCallbacks - def schedule_deletion_expired(self, event_id, redaction_ts): + def schedule_event_expiry(self, event_id, redaction_ts, store_expiry_ts=True): """Schedule the deletion of an expired event, or if that event should have expired then delete the event immediately. Args: - event_id (str): The ID of the event to schedule the deletion of. - redaction_ts (int): The timestamp to delete the event at. + event_id (str): The ID of the event to schedule the expiry of. + redaction_ts (int): The timestamp to expire the event at. + store_expiry_ts (bool): Whether we need to store the expiry timestamp in the + database. """ if not self._ephemeral_events_enabled: return - # Save the timestamp at which the event expires so that we can reschedule its - # deletion on startup if the server is stopped before the event is deleted. - yield self.store.insert_event_expiry(event_id, redaction_ts) + if store_expiry_ts: + # Save the timestamp at which the event expires so that we can reschedule + # its deletion on startup if the server is stopped before the event is + # deleted. + yield self.store.insert_event_expiry(event_id, redaction_ts) + # Figure out how many seconds we need to wait before expiring the event. now_ms = self.clock.time_msec() delay = (redaction_ts - now_ms) / 1000 - if delay > 0: - # Figure out how many seconds we need to wait before redacting the event. - logger.info("Scheduling deletion of event %s in %.3fs", event_id, delay) - self.clock.call_later(delay, self._delete_expired_event, event_id) - else: - # If the event should have already been redacted, redact it now. - yield self._delete_expired_event(event_id) + logger.info("Scheduling expiry of event %s in %.3fs", event_id, delay) + self.clock.call_later(delay, self._expire_event, event_id) @defer.inlineCallbacks - def _schedule_deletions_expired_from_db(self): + def _schedule_event_expiries_from_db(self): """Load the IDs of the events that have an expiry date from the database (and - their expiry timestamp) and either delete them (if the expiry date is now or in - the past) or schedule their deletion on the timestamp. + their expiry timestamp) and either expire them (if the expiry date is now or in + the past) or schedule their expiry on the timestamp. """ if not self._ephemeral_events_enabled: return @@ -276,36 +276,40 @@ def _schedule_deletions_expired_from_db(self): events_to_expire = yield self.store.get_events_to_expire() for event in events_to_expire: - yield self.schedule_deletion_expired(event["event_id"], event["expiry_ts"]) + # Schedule the expiry of the event but don't try to insert the expiry + # timestamp in the database again. + yield self.schedule_event_expiry( + event["event_id"], event["expiry_ts"], store_expiry_ts=False + ) @defer.inlineCallbacks - def _delete_expired_event(self, event_id): - """Retrieve and delete an expired event from the database. + def _expire_event(self, event_id): + """Retrieve and expires an event that needs to be expired from the database. If we don't have the event in the database, log it and delete the expiry date - from the database (so that we don't try to delete it again). + from the database (so that we don't try to expire it again). Args: - event_id (str): The ID of the event to retrieve and delete. + event_id (str): The ID of the event to retrieve and expire. """ if not self._ephemeral_events_enabled: return - logger.info("Deleting expired event %s", event_id) + logger.info("Expiring event %s", event_id) # Try to retrieve the event from the database. event = yield self.store.get_event(event_id) if not event: # If we can't find the event, log a warning and delete the expiry date from - # the database so that we don't try to delete it again in the future. - logger.warning("Can't delete event %s because we don't have it." % event_id) + # the database so that we don't try to expire it again in the future. + logger.warning("Can't expire event %s because we don't have it." % event_id) yield self.store.delete_event_expiry(event) return - # Delete the event. This function also deletes the expiry date from the database + # Expire the event. This function also deletes the expiry date from the database # in the same database transaction. - yield self.store.delete_expired_event(event) + yield self.store.expire_event(event) # The duration (in ms) after which rooms should be removed @@ -823,7 +827,7 @@ def handle_new_client_event( # If there's an expiry timestamp, schedule the redaction of the event. expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) if isinstance(expiry_ts, int) and not event.is_state(): - yield self._message_handler.schedule_deletion_expired( + yield self._message_handler.schedule_event_expiry( event.event_id, expiry_ts ) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index f501c055e317..d4806bba1b69 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -1967,10 +1967,8 @@ def insert_event_expiry(self, event_id, expiry_ts): desc="insert_event_expiry", ) - def delete_expired_event(self, event): - """Delete an event that has expired by replacing its entry in event_json with a - pruned version of its JSON representation, and delete its associated expiry - timestamp. + def expire_event(self, event): + """Redact an event that has expired, and delete its associated expiry timestamp. Args: event (events.EventBase): The event to delete. From a6461c058e2b65dde63d3f288cb623e6b3f80112 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 27 Nov 2019 19:07:22 +0000 Subject: [PATCH 18/45] Change the scheduling flow --- synapse/handlers/federation.py | 4 +- synapse/handlers/message.py | 97 ++++++++++++---------- synapse/storage/data_stores/main/events.py | 18 ++++ 3 files changed, 70 insertions(+), 49 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 89d035a894d8..82f368fbc64c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2723,9 +2723,7 @@ def persist_events_and_notify(self, event_and_contexts, backfilled=False): # If there's an expiry timestamp, schedule the redaction of the event. expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) if isinstance(expiry_ts, int) and not event.is_state(): - yield self._message_handler.schedule_event_expiry( - event.event_id, expiry_ts - ) + yield self._message_handler.save_expiry_ts(event.event_id, expiry_ts) def _notify_persisted_event(self, event, max_stream_id): """Checks to see if notifier/pushers should be notified about the diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3d9703cb3aa3..0ca834bf8e22 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -70,10 +70,9 @@ def __init__(self, hs): self._event_serializer = hs.get_event_client_serializer() self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages - run_as_background_process( - "_schedule_event_expiries_from_db", - self._schedule_event_expiries_from_db, - ) + self._expiry_scheduled = False + + run_as_background_process("_schedule_next_expiry", self._schedule_next_expiry) @defer.inlineCallbacks def get_room_data( @@ -238,63 +237,39 @@ def get_joined_members(self, requester, room_id): } @defer.inlineCallbacks - def schedule_event_expiry(self, event_id, redaction_ts, store_expiry_ts=True): - """Schedule the deletion of an expired event, or if that event should have - expired then delete the event immediately. + def save_expiry_ts(self, event_id, redaction_ts): + """Save the expiry timestamp of an event, and schedule an expiry task for it if + there isn't one already scheduled. Args: event_id (str): The ID of the event to schedule the expiry of. redaction_ts (int): The timestamp to expire the event at. - store_expiry_ts (bool): Whether we need to store the expiry timestamp in the - database. - """ - if not self._ephemeral_events_enabled: - return - - if store_expiry_ts: - # Save the timestamp at which the event expires so that we can reschedule - # its deletion on startup if the server is stopped before the event is - # deleted. - yield self.store.insert_event_expiry(event_id, redaction_ts) - - # Figure out how many seconds we need to wait before expiring the event. - now_ms = self.clock.time_msec() - delay = (redaction_ts - now_ms) / 1000 - - logger.info("Scheduling expiry of event %s in %.3fs", event_id, delay) - self.clock.call_later(delay, self._expire_event, event_id) - - @defer.inlineCallbacks - def _schedule_event_expiries_from_db(self): - """Load the IDs of the events that have an expiry date from the database (and - their expiry timestamp) and either expire them (if the expiry date is now or in - the past) or schedule their expiry on the timestamp. """ if not self._ephemeral_events_enabled: return - events_to_expire = yield self.store.get_events_to_expire() + # Insert the event in the list of events to expire. + yield self.store.insert_event_expiry(event_id, redaction_ts) - for event in events_to_expire: - # Schedule the expiry of the event but don't try to insert the expiry - # timestamp in the database again. - yield self.schedule_event_expiry( - event["event_id"], event["expiry_ts"], store_expiry_ts=False - ) + if not self._expiry_scheduled: + # If we don't have any expiry task scheduled, then schedule one so our new + # event can be eventually expired. + yield self._schedule_next_expiry() @defer.inlineCallbacks - def _expire_event(self, event_id): + def _expire_event(self): """Retrieve and expires an event that needs to be expired from the database. - If we don't have the event in the database, log it and delete the expiry date + If the event doesn't exist in the database, log it and delete the expiry date from the database (so that we don't try to expire it again). - - Args: - event_id (str): The ID of the event to retrieve and expire. """ if not self._ephemeral_events_enabled: return + # Get the ID of the next event to expire. + next_event_to_expire = yield self.store.get_next_event_to_expire() + event_id = next_event_to_expire["event_id"] + logger.info("Expiring event %s", event_id) # Try to retrieve the event from the database. @@ -311,6 +286,38 @@ def _expire_event(self, event_id): # in the same database transaction. yield self.store.expire_event(event) + # Schedule the expiry of the next event to expire. + yield self._schedule_next_expiry() + + @defer.inlineCallbacks + def _schedule_next_expiry(self): + """Retrieve the expiry timestamp of the next event to be expired, and schedule + an expiry task for it. + + If there's no event left to expire, set _expiry_scheduled to False so that a + future call to save_expiry_ts can schedule a new expiry task. + """ + # Try to get the expiry timestamp of the next event to expire. + next_event_to_expire = yield self.store.get_next_event_to_expire() + + if next_event_to_expire: + # Figure out how many seconds we need to wait before expiring the event. + now_ms = self.clock.time_msec() + delay = (next_event_to_expire["expiry_ts"] - now_ms) / 1000 + + logger.info( + "Scheduling expiry of event %s in %.3fs", + next_event_to_expire["event_id"], + delay, + ) + + self._expiry_scheduled = True + self.clock.call_later(delay, self._expire_event) + else: + # If there's no more event to expire, then set _expiry_scheduled to False, so + # that the next call to save_expiry_ts can schedule a new expiry task. + self._expiry_scheduled = False + # The duration (in ms) after which rooms should be removed # `_rooms_to_exclude_from_dummy_event_insertion` (with the effect that we will try @@ -827,9 +834,7 @@ def handle_new_client_event( # If there's an expiry timestamp, schedule the redaction of the event. expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) if isinstance(expiry_ts, int) and not event.is_state(): - yield self._message_handler.schedule_event_expiry( - event.event_id, expiry_ts - ) + yield self._message_handler.save_expiry_ts(event.event_id, expiry_ts) @defer.inlineCallbacks def persist_and_notify_client_event( diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index d4806bba1b69..362ed88b1e0d 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -2024,6 +2024,24 @@ def get_events_to_expire(self): desc="get_events_to_expire", ) + def get_next_event_to_expire(self): + def get_next_event_to_expire_txn(txn): + txn.execute(""" + SELECT event_id, expiry_ts FROM event_expiry + ORDER BY expiry_ts ASC LIMIT 1 + """) + + res = self.cursor_to_dict(txn) + + if res: + return res[0] + else: + return None + + return self.runInteraction( + desc="get_next_event_to_expire", func=get_next_event_to_expire_txn + ) + AllNewEventsResult = namedtuple( "AllNewEventsResult", From c33d2b4d68896929a7493665adcc4217988a92ad Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 28 Nov 2019 11:23:07 +0000 Subject: [PATCH 19/45] Use the same code path for censoring redactions and expired events --- synapse/storage/data_stores/main/events.py | 30 +++++++++++++--------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 362ed88b1e0d..2f4112955eb4 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -1098,12 +1098,7 @@ def _censor_redactions(self): def _update_censor_txn(txn): for redaction_id, event_id, pruned_json in updates: if pruned_json: - self._simple_update_one_txn( - txn, - table="event_json", - keyvalues={"event_id": event_id}, - updatevalues={"json": pruned_json}, - ) + self._censor_event_txn(txn, event_id, pruned_json) self._simple_update_one_txn( txn, @@ -1114,6 +1109,22 @@ def _update_censor_txn(txn): yield self.runInteraction("_update_censor_txn", _update_censor_txn) + def _censor_event_txn(self, txn, event_id, pruned_json): + """Censor an event by replacing its JSON in the event_json table with the + provided pruned JSON. + + Args: + txn (LoggingTransaction): The database transaction. + event_id (str): The ID of the event to censor. + pruned_json (str): The pruned JSON + """ + self._simple_update_one_txn( + txn, + table="event_json", + keyvalues={"event_id": event_id}, + updatevalues={"json": pruned_json}, + ) + @defer.inlineCallbacks def count_daily_messages(self): """ @@ -1979,12 +1990,7 @@ def expire_event(self, event): def delete_expired_event_txn(txn): # Update the event_json table to replace the event's JSON with the pruned # JSON. - self._simple_update_one_txn( - txn, - table="event_json", - keyvalues={"event_id": event.event_id}, - updatevalues={"json": pruned_json}, - ) + self._censor_event_txn(txn, event.event_id, pruned_json) # Delete the expiry timestamp associated with this event from the database. self._simple_delete_txn( From 9a531be91ce75b51ae3c07c717a6c5e251c07e1b Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 28 Nov 2019 11:28:38 +0000 Subject: [PATCH 20/45] Lint and docstring --- synapse/storage/data_stores/main/events.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 2f26cdbd4a84..f3cc348c90d4 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -2034,11 +2034,20 @@ def get_events_to_expire(self): ) def get_next_event_to_expire(self): + """Retrieve the entry with the lowest expiry timestamp in the event_expiry + table, or None if there's no more event to expire. + + Returns: + A dict with an event_id and an expiry_ts of there's at least one row in the + event_expiry table, None otherwise. + """ def get_next_event_to_expire_txn(txn): - txn.execute(""" + txn.execute( + """ SELECT event_id, expiry_ts FROM event_expiry ORDER BY expiry_ts ASC LIMIT 1 - """) + """ + ) res = self.cursor_to_dict(txn) From 964c6dabb311d0e4853b5e6d41a38d89ffbcf360 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 28 Nov 2019 11:30:57 +0000 Subject: [PATCH 21/45] Lint --- synapse/storage/data_stores/main/events.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index f3cc348c90d4..30da9d0a7282 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -2041,6 +2041,7 @@ def get_next_event_to_expire(self): A dict with an event_id and an expiry_ts of there's at least one row in the event_expiry table, None otherwise. """ + def get_next_event_to_expire_txn(txn): txn.execute( """ From 370d089821824f93bcea08cedc8ca7de711d1a65 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 28 Nov 2019 16:58:50 +0000 Subject: [PATCH 22/45] Various fixes --- synapse/handlers/message.py | 5 +++++ synapse/storage/data_stores/main/events.py | 7 ++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index b112aae9eb27..07c2d6a11d8d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -305,6 +305,11 @@ def _schedule_next_expiry(self): now_ms = self.clock.time_msec() delay = (next_event_to_expire["expiry_ts"] - now_ms) / 1000 + # callLater doesn't support negative delays, so trim the delay to 0 if we're + # in that case. + if delay < 0: + delay = 0 + logger.info( "Scheduling expiry of event %s in %.3fs", next_event_to_expire["event_id"], diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 30da9d0a7282..4ab21d1eb034 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -2002,7 +2002,12 @@ def delete_expired_event_txn(txn): # We need to invalidate the event cache entry for this event because we # changed its content in the database. - self._get_event_cache.invalidate((event.event_id,)) + txn.call_after(self._get_event_cache.invalidate, (event.event_id,)) + # Send that invalidation to replication so that other workers also invalidate + # the event cache. + self._send_invalidation_to_replication( + txn, "_get_event_cache", (event.event_id,) + ) return self.runInteraction("delete_expired_event", delete_expired_event_txn) From 385f47b7de3e225c58014703cf61b7f2768ab1f9 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 28 Nov 2019 18:04:00 +0000 Subject: [PATCH 23/45] Move the database functions to the worker store --- synapse/storage/data_stores/main/events.py | 127 +----------------- .../storage/data_stores/main/events_worker.py | 127 +++++++++++++++++- 2 files changed, 127 insertions(+), 127 deletions(-) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 4ab21d1eb034..0eca5f7931be 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -40,12 +40,11 @@ from synapse.storage._base import make_in_list_sql_clause from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.data_stores.main.event_federation import EventFederationStore -from synapse.storage.data_stores.main.events_worker import EventsWorkerStore +from synapse.storage.data_stores.main.events_worker import encode_json, EventsWorkerStore from synapse.storage.data_stores.main.state import StateGroupWorkerStore from synapse.types import RoomStreamToken, get_domain_from_id from synapse.util import batch_iter from synapse.util.caches.descriptors import cached, cachedInlineCallbacks -from synapse.util.frozenutils import frozendict_json_encoder logger = logging.getLogger(__name__) @@ -57,16 +56,6 @@ ) -def encode_json(json_object): - """ - Encode a Python object as JSON and return it in a Unicode string. - """ - out = frozendict_json_encoder.encode(json_object) - if isinstance(out, bytes): - out = out.decode("utf8") - return out - - _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) @@ -1112,22 +1101,6 @@ def _update_censor_txn(txn): yield self.runInteraction("_update_censor_txn", _update_censor_txn) - def _censor_event_txn(self, txn, event_id, pruned_json): - """Censor an event by replacing its JSON in the event_json table with the - provided pruned JSON. - - Args: - txn (LoggingTransaction): The database transaction. - event_id (str): The ID of the event to censor. - pruned_json (str): The pruned JSON - """ - self._simple_update_one_txn( - txn, - table="event_json", - keyvalues={"event_id": event_id}, - updatevalues={"json": pruned_json}, - ) - @defer.inlineCallbacks def count_daily_messages(self): """ @@ -1968,104 +1941,6 @@ def insert_labels_for_event_txn( ], ) - def insert_event_expiry(self, event_id, expiry_ts): - """Save the expiry timestamp associated with a given event ID. - - Args: - event_id (str): The event ID the expiry timestamp is associated with. - expiry_ts (int): The timestamp at which to expire (delete) the event. - """ - return self._simple_insert( - table="event_expiry", - values={"event_id": event_id, "expiry_ts": expiry_ts}, - desc="insert_event_expiry", - ) - - def expire_event(self, event): - """Redact an event that has expired, and delete its associated expiry timestamp. - - Args: - event (events.EventBase): The event to delete. - """ - # Prune the event's dict then convert it to JSON. - pruned_json = encode_json(prune_event_dict(event.get_dict())) - - def delete_expired_event_txn(txn): - # Update the event_json table to replace the event's JSON with the pruned - # JSON. - self._censor_event_txn(txn, event.event_id, pruned_json) - - # Delete the expiry timestamp associated with this event from the database. - self._simple_delete_txn( - txn, table="event_expiry", keyvalues={"event_id": event.event_id} - ) - - # We need to invalidate the event cache entry for this event because we - # changed its content in the database. - txn.call_after(self._get_event_cache.invalidate, (event.event_id,)) - # Send that invalidation to replication so that other workers also invalidate - # the event cache. - self._send_invalidation_to_replication( - txn, "_get_event_cache", (event.event_id,) - ) - - return self.runInteraction("delete_expired_event", delete_expired_event_txn) - - def delete_event_expiry(self, event_id): - """Delete the expiry timestamp associated with an event ID without deleting the - actual event. - - Args: - event_id (str): The event ID to delete the associated expiry timestamp of. - """ - return self._simple_delete( - table="event_expiry", - keyvalues={"event_id": event_id}, - desc="delete_event_expiry", - ) - - def get_events_to_expire(self): - """Retrieve the IDs of the events we have an expiry timestamp for, along with - said timestamp. - - Returns: - A list of dicts, each containing an event_id and an expiry_ts. - """ - return self._simple_select_list( - table="event_expiry", - keyvalues=None, - retcols=["event_id", "expiry_ts"], - desc="get_events_to_expire", - ) - - def get_next_event_to_expire(self): - """Retrieve the entry with the lowest expiry timestamp in the event_expiry - table, or None if there's no more event to expire. - - Returns: - A dict with an event_id and an expiry_ts of there's at least one row in the - event_expiry table, None otherwise. - """ - - def get_next_event_to_expire_txn(txn): - txn.execute( - """ - SELECT event_id, expiry_ts FROM event_expiry - ORDER BY expiry_ts ASC LIMIT 1 - """ - ) - - res = self.cursor_to_dict(txn) - - if res: - return res[0] - else: - return None - - return self.runInteraction( - desc="get_next_event_to_expire", func=get_next_event_to_expire_txn - ) - AllNewEventsResult = namedtuple( "AllNewEventsResult", diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 4c4b76bd93d6..322d8de1767b 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -28,12 +28,13 @@ from synapse.api.room_versions import EventFormatVersions from synapse.events import FrozenEvent, event_type_from_format_version # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 -from synapse.events.utils import prune_event +from synapse.events.utils import prune_event, prune_event_dict from synapse.logging.context import LoggingContext, PreserveLoggingContext from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.types import get_domain_from_id from synapse.util import batch_iter +from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -49,6 +50,16 @@ EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events +def encode_json(json_object): + """ + Encode a Python object as JSON and return it in a Unicode string. + """ + out = frozendict_json_encoder.encode(json_object) + if isinstance(out, bytes): + out = out.decode("utf8") + return out + + _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) @@ -880,3 +891,117 @@ def get_room_complexity(self, room_id): complexity_v1 = round(state_events / 500, 2) return {"v1": complexity_v1} + + def insert_event_expiry(self, event_id, expiry_ts): + """Save the expiry timestamp associated with a given event ID. + + Args: + event_id (str): The event ID the expiry timestamp is associated with. + expiry_ts (int): The timestamp at which to expire (delete) the event. + """ + return self._simple_insert( + table="event_expiry", + values={"event_id": event_id, "expiry_ts": expiry_ts}, + desc="insert_event_expiry", + ) + + def _censor_event_txn(self, txn, event_id, pruned_json): + """Censor an event by replacing its JSON in the event_json table with the + provided pruned JSON. + + Args: + txn (LoggingTransaction): The database transaction. + event_id (str): The ID of the event to censor. + pruned_json (str): The pruned JSON + """ + self._simple_update_one_txn( + txn, + table="event_json", + keyvalues={"event_id": event_id}, + updatevalues={"json": pruned_json}, + ) + + def expire_event(self, event): + """Redact an event that has expired, and delete its associated expiry timestamp. + + Args: + event (events.EventBase): The event to delete. + """ + # Prune the event's dict then convert it to JSON. + pruned_json = encode_json(prune_event_dict(event.get_dict())) + + def delete_expired_event_txn(txn): + # Update the event_json table to replace the event's JSON with the pruned + # JSON. + self._censor_event_txn(txn, event.event_id, pruned_json) + + # Delete the expiry timestamp associated with this event from the database. + self._simple_delete_txn( + txn, table="event_expiry", keyvalues={"event_id": event.event_id} + ) + + # We need to invalidate the event cache entry for this event because we + # changed its content in the database. + txn.call_after(self._get_event_cache.invalidate, (event.event_id,)) + # Send that invalidation to replication so that other workers also invalidate + # the event cache. + self._send_invalidation_to_replication( + txn, "_get_event_cache", (event.event_id,) + ) + + return self.runInteraction("delete_expired_event", delete_expired_event_txn) + + def delete_event_expiry(self, event_id): + """Delete the expiry timestamp associated with an event ID without deleting the + actual event. + + Args: + event_id (str): The event ID to delete the associated expiry timestamp of. + """ + return self._simple_delete( + table="event_expiry", + keyvalues={"event_id": event_id}, + desc="delete_event_expiry", + ) + + def get_events_to_expire(self): + """Retrieve the IDs of the events we have an expiry timestamp for, along with + said timestamp. + + Returns: + A list of dicts, each containing an event_id and an expiry_ts. + """ + return self._simple_select_list( + table="event_expiry", + keyvalues=None, + retcols=["event_id", "expiry_ts"], + desc="get_events_to_expire", + ) + + def get_next_event_to_expire(self): + """Retrieve the entry with the lowest expiry timestamp in the event_expiry + table, or None if there's no more event to expire. + + Returns: + A dict with an event_id and an expiry_ts of there's at least one row in the + event_expiry table, None otherwise. + """ + + def get_next_event_to_expire_txn(txn): + txn.execute( + """ + SELECT event_id, expiry_ts FROM event_expiry + ORDER BY expiry_ts ASC LIMIT 1 + """ + ) + + res = self.cursor_to_dict(txn) + + if res: + return res[0] + else: + return None + + return self.runInteraction( + desc="get_next_event_to_expire", func=get_next_event_to_expire_txn + ) From e092144a2b6c30396662d2a3b01a7e3885b7dbdf Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 28 Nov 2019 18:07:50 +0000 Subject: [PATCH 24/45] Lint --- synapse/storage/data_stores/main/events.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 0eca5f7931be..f3a439bb07a2 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -40,7 +40,10 @@ from synapse.storage._base import make_in_list_sql_clause from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.data_stores.main.event_federation import EventFederationStore -from synapse.storage.data_stores.main.events_worker import encode_json, EventsWorkerStore +from synapse.storage.data_stores.main.events_worker import ( + encode_json, + EventsWorkerStore +) from synapse.storage.data_stores.main.state import StateGroupWorkerStore from synapse.types import RoomStreamToken, get_domain_from_id from synapse.util import batch_iter From 1d1f9f25b89f5142793871a6d47eb0869c7b75ed Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 28 Nov 2019 18:12:03 +0000 Subject: [PATCH 25/45] Lint again --- synapse/storage/data_stores/main/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index f3a439bb07a2..7023a44a1c56 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -41,8 +41,8 @@ from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.data_stores.main.event_federation import EventFederationStore from synapse.storage.data_stores.main.events_worker import ( + EventsWorkerStore, encode_json, - EventsWorkerStore ) from synapse.storage.data_stores.main.state import StateGroupWorkerStore from synapse.types import RoomStreamToken, get_domain_from_id From 6ca89bac7e9f72973cefe8b73ef75d6cfc961159 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 29 Nov 2019 14:13:15 +0000 Subject: [PATCH 26/45] Revert "Move the database functions to the worker store" This reverts commit 385f47b7de3e225c58014703cf61b7f2768ab1f9. --- synapse/storage/data_stores/main/events.py | 130 +++++++++++++++++- .../storage/data_stores/main/events_worker.py | 127 +---------------- 2 files changed, 127 insertions(+), 130 deletions(-) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 7023a44a1c56..4ab21d1eb034 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -40,14 +40,12 @@ from synapse.storage._base import make_in_list_sql_clause from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.data_stores.main.event_federation import EventFederationStore -from synapse.storage.data_stores.main.events_worker import ( - EventsWorkerStore, - encode_json, -) +from synapse.storage.data_stores.main.events_worker import EventsWorkerStore from synapse.storage.data_stores.main.state import StateGroupWorkerStore from synapse.types import RoomStreamToken, get_domain_from_id from synapse.util import batch_iter from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from synapse.util.frozenutils import frozendict_json_encoder logger = logging.getLogger(__name__) @@ -59,6 +57,16 @@ ) +def encode_json(json_object): + """ + Encode a Python object as JSON and return it in a Unicode string. + """ + out = frozendict_json_encoder.encode(json_object) + if isinstance(out, bytes): + out = out.decode("utf8") + return out + + _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) @@ -1104,6 +1112,22 @@ def _update_censor_txn(txn): yield self.runInteraction("_update_censor_txn", _update_censor_txn) + def _censor_event_txn(self, txn, event_id, pruned_json): + """Censor an event by replacing its JSON in the event_json table with the + provided pruned JSON. + + Args: + txn (LoggingTransaction): The database transaction. + event_id (str): The ID of the event to censor. + pruned_json (str): The pruned JSON + """ + self._simple_update_one_txn( + txn, + table="event_json", + keyvalues={"event_id": event_id}, + updatevalues={"json": pruned_json}, + ) + @defer.inlineCallbacks def count_daily_messages(self): """ @@ -1944,6 +1968,104 @@ def insert_labels_for_event_txn( ], ) + def insert_event_expiry(self, event_id, expiry_ts): + """Save the expiry timestamp associated with a given event ID. + + Args: + event_id (str): The event ID the expiry timestamp is associated with. + expiry_ts (int): The timestamp at which to expire (delete) the event. + """ + return self._simple_insert( + table="event_expiry", + values={"event_id": event_id, "expiry_ts": expiry_ts}, + desc="insert_event_expiry", + ) + + def expire_event(self, event): + """Redact an event that has expired, and delete its associated expiry timestamp. + + Args: + event (events.EventBase): The event to delete. + """ + # Prune the event's dict then convert it to JSON. + pruned_json = encode_json(prune_event_dict(event.get_dict())) + + def delete_expired_event_txn(txn): + # Update the event_json table to replace the event's JSON with the pruned + # JSON. + self._censor_event_txn(txn, event.event_id, pruned_json) + + # Delete the expiry timestamp associated with this event from the database. + self._simple_delete_txn( + txn, table="event_expiry", keyvalues={"event_id": event.event_id} + ) + + # We need to invalidate the event cache entry for this event because we + # changed its content in the database. + txn.call_after(self._get_event_cache.invalidate, (event.event_id,)) + # Send that invalidation to replication so that other workers also invalidate + # the event cache. + self._send_invalidation_to_replication( + txn, "_get_event_cache", (event.event_id,) + ) + + return self.runInteraction("delete_expired_event", delete_expired_event_txn) + + def delete_event_expiry(self, event_id): + """Delete the expiry timestamp associated with an event ID without deleting the + actual event. + + Args: + event_id (str): The event ID to delete the associated expiry timestamp of. + """ + return self._simple_delete( + table="event_expiry", + keyvalues={"event_id": event_id}, + desc="delete_event_expiry", + ) + + def get_events_to_expire(self): + """Retrieve the IDs of the events we have an expiry timestamp for, along with + said timestamp. + + Returns: + A list of dicts, each containing an event_id and an expiry_ts. + """ + return self._simple_select_list( + table="event_expiry", + keyvalues=None, + retcols=["event_id", "expiry_ts"], + desc="get_events_to_expire", + ) + + def get_next_event_to_expire(self): + """Retrieve the entry with the lowest expiry timestamp in the event_expiry + table, or None if there's no more event to expire. + + Returns: + A dict with an event_id and an expiry_ts of there's at least one row in the + event_expiry table, None otherwise. + """ + + def get_next_event_to_expire_txn(txn): + txn.execute( + """ + SELECT event_id, expiry_ts FROM event_expiry + ORDER BY expiry_ts ASC LIMIT 1 + """ + ) + + res = self.cursor_to_dict(txn) + + if res: + return res[0] + else: + return None + + return self.runInteraction( + desc="get_next_event_to_expire", func=get_next_event_to_expire_txn + ) + AllNewEventsResult = namedtuple( "AllNewEventsResult", diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 322d8de1767b..4c4b76bd93d6 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -28,13 +28,12 @@ from synapse.api.room_versions import EventFormatVersions from synapse.events import FrozenEvent, event_type_from_format_version # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 -from synapse.events.utils import prune_event, prune_event_dict +from synapse.events.utils import prune_event from synapse.logging.context import LoggingContext, PreserveLoggingContext from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.types import get_domain_from_id from synapse.util import batch_iter -from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -50,16 +49,6 @@ EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events -def encode_json(json_object): - """ - Encode a Python object as JSON and return it in a Unicode string. - """ - out = frozendict_json_encoder.encode(json_object) - if isinstance(out, bytes): - out = out.decode("utf8") - return out - - _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) @@ -891,117 +880,3 @@ def get_room_complexity(self, room_id): complexity_v1 = round(state_events / 500, 2) return {"v1": complexity_v1} - - def insert_event_expiry(self, event_id, expiry_ts): - """Save the expiry timestamp associated with a given event ID. - - Args: - event_id (str): The event ID the expiry timestamp is associated with. - expiry_ts (int): The timestamp at which to expire (delete) the event. - """ - return self._simple_insert( - table="event_expiry", - values={"event_id": event_id, "expiry_ts": expiry_ts}, - desc="insert_event_expiry", - ) - - def _censor_event_txn(self, txn, event_id, pruned_json): - """Censor an event by replacing its JSON in the event_json table with the - provided pruned JSON. - - Args: - txn (LoggingTransaction): The database transaction. - event_id (str): The ID of the event to censor. - pruned_json (str): The pruned JSON - """ - self._simple_update_one_txn( - txn, - table="event_json", - keyvalues={"event_id": event_id}, - updatevalues={"json": pruned_json}, - ) - - def expire_event(self, event): - """Redact an event that has expired, and delete its associated expiry timestamp. - - Args: - event (events.EventBase): The event to delete. - """ - # Prune the event's dict then convert it to JSON. - pruned_json = encode_json(prune_event_dict(event.get_dict())) - - def delete_expired_event_txn(txn): - # Update the event_json table to replace the event's JSON with the pruned - # JSON. - self._censor_event_txn(txn, event.event_id, pruned_json) - - # Delete the expiry timestamp associated with this event from the database. - self._simple_delete_txn( - txn, table="event_expiry", keyvalues={"event_id": event.event_id} - ) - - # We need to invalidate the event cache entry for this event because we - # changed its content in the database. - txn.call_after(self._get_event_cache.invalidate, (event.event_id,)) - # Send that invalidation to replication so that other workers also invalidate - # the event cache. - self._send_invalidation_to_replication( - txn, "_get_event_cache", (event.event_id,) - ) - - return self.runInteraction("delete_expired_event", delete_expired_event_txn) - - def delete_event_expiry(self, event_id): - """Delete the expiry timestamp associated with an event ID without deleting the - actual event. - - Args: - event_id (str): The event ID to delete the associated expiry timestamp of. - """ - return self._simple_delete( - table="event_expiry", - keyvalues={"event_id": event_id}, - desc="delete_event_expiry", - ) - - def get_events_to_expire(self): - """Retrieve the IDs of the events we have an expiry timestamp for, along with - said timestamp. - - Returns: - A list of dicts, each containing an event_id and an expiry_ts. - """ - return self._simple_select_list( - table="event_expiry", - keyvalues=None, - retcols=["event_id", "expiry_ts"], - desc="get_events_to_expire", - ) - - def get_next_event_to_expire(self): - """Retrieve the entry with the lowest expiry timestamp in the event_expiry - table, or None if there's no more event to expire. - - Returns: - A dict with an event_id and an expiry_ts of there's at least one row in the - event_expiry table, None otherwise. - """ - - def get_next_event_to_expire_txn(txn): - txn.execute( - """ - SELECT event_id, expiry_ts FROM event_expiry - ORDER BY expiry_ts ASC LIMIT 1 - """ - ) - - res = self.cursor_to_dict(txn) - - if res: - return res[0] - else: - return None - - return self.runInteraction( - desc="get_next_event_to_expire", func=get_next_event_to_expire_txn - ) From 0e57c5c5e8d66e9aa69a456820f9f0ceaf80ed3f Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 29 Nov 2019 15:09:32 +0000 Subject: [PATCH 27/45] Make the expiry happen on the master process --- synapse/handlers/federation.py | 12 +-- synapse/handlers/message.py | 107 ++++++++++----------- synapse/storage/data_stores/main/events.py | 12 ++- 3 files changed, 68 insertions(+), 63 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 979247995ef8..0fa03257c54c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2722,16 +2722,16 @@ def persist_events_and_notify(self, event_and_contexts, backfilled=False): event_and_contexts, backfilled=backfilled ) + for (event, context) in event_and_contexts: + # If there's an expiry timestamp on the event, schedule its expiry. + expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) + if isinstance(expiry_ts, int) and not event.is_state(): + yield self._message_handler.maybe_schedule_next_expiry(expiry_ts) + if not backfilled: # Never notify for backfilled events for event, _ in event_and_contexts: yield self._notify_persisted_event(event, max_stream_id) - for (event, context) in event_and_contexts: - # If there's an expiry timestamp, schedule the redaction of the event. - expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) - if isinstance(expiry_ts, int) and not event.is_state(): - yield self._message_handler.save_expiry_ts(event.event_id, expiry_ts) - def _notify_persisted_event(self, event, max_stream_id): """Checks to see if notifier/pushers should be notified about the event or not. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 07c2d6a11d8d..fd2339f9dafc 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -70,9 +70,12 @@ def __init__(self, hs): self._event_serializer = hs.get_event_client_serializer() self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages - self._expiry_scheduled = False + self._scheduled_expiry = None - run_as_background_process("_schedule_next_expiry", self._schedule_next_expiry) + if not hs.config.worker_app: + run_as_background_process( + "_schedule_next_expiry", self._schedule_next_expiry + ) @defer.inlineCallbacks def get_room_data( @@ -237,25 +240,55 @@ def get_joined_members(self, requester, room_id): } @defer.inlineCallbacks - def save_expiry_ts(self, event_id, redaction_ts): - """Save the expiry timestamp of an event, and schedule an expiry task for it if - there isn't one already scheduled. + def maybe_schedule_next_expiry(self, expiry_ts): + """Schedule the expiry of an event if there's not already one running, or if the + one running is for an event that will expire after the provided timestamp. Args: - event_id (str): The ID of the event to schedule the expiry of. - redaction_ts (int): The timestamp to expire the event at. + expiry_ts (int): Timestamp to compare the time at which the next expiry will + happen with. """ - if not self._ephemeral_events_enabled: - return - - # Insert the event in the list of events to expire. - yield self.store.insert_event_expiry(event_id, redaction_ts) + if not self._scheduled_expiry or not self._scheduled_expiry.active(): + yield self._schedule_next_expiry() - if not self._expiry_scheduled: - # If we don't have any expiry task scheduled, then schedule one so our new - # event can be eventually expired. + next_scheduled_expiry_ts = self._scheduled_expiry.getTime() * 1000 + if expiry_ts < next_scheduled_expiry_ts: + self._scheduled_expiry.cancel() yield self._schedule_next_expiry() + @defer.inlineCallbacks + def _schedule_next_expiry(self): + """Retrieve the expiry timestamp of the next event to be expired, and schedule + an expiry task for it. + + If there's no event left to expire, set _expiry_scheduled to False so that a + future call to save_expiry_ts can schedule a new expiry task. + """ + # Try to get the expiry timestamp of the next event to expire. + next_event_to_expire = yield self.store.get_next_event_to_expire() + + if next_event_to_expire: + # Figure out how many seconds we need to wait before expiring the event. + now_ms = self.clock.time_msec() + delay = (next_event_to_expire["expiry_ts"] - now_ms) / 1000 + + # callLater doesn't support negative delays, so trim the delay to 0 if we're + # in that case. + if delay < 0: + delay = 0 + + logger.info( + "Scheduling expiry of event %s in %.3fs", + next_event_to_expire["event_id"], + delay, + ) + + self._scheduled_expiry = self.clock.call_later(delay, self._expire_event) + else: + # If there's no more event to expire, then set _expiry_scheduled to False, so + # that the next call to save_expiry_ts can schedule a new expiry task. + self._scheduled_expiry = False + @defer.inlineCallbacks def _expire_event(self): """Retrieve and expires an event that needs to be expired from the database. @@ -289,40 +322,6 @@ def _expire_event(self): # Schedule the expiry of the next event to expire. yield self._schedule_next_expiry() - @defer.inlineCallbacks - def _schedule_next_expiry(self): - """Retrieve the expiry timestamp of the next event to be expired, and schedule - an expiry task for it. - - If there's no event left to expire, set _expiry_scheduled to False so that a - future call to save_expiry_ts can schedule a new expiry task. - """ - # Try to get the expiry timestamp of the next event to expire. - next_event_to_expire = yield self.store.get_next_event_to_expire() - - if next_event_to_expire: - # Figure out how many seconds we need to wait before expiring the event. - now_ms = self.clock.time_msec() - delay = (next_event_to_expire["expiry_ts"] - now_ms) / 1000 - - # callLater doesn't support negative delays, so trim the delay to 0 if we're - # in that case. - if delay < 0: - delay = 0 - - logger.info( - "Scheduling expiry of event %s in %.3fs", - next_event_to_expire["event_id"], - delay, - ) - - self._expiry_scheduled = True - self.clock.call_later(delay, self._expire_event) - else: - # If there's no more event to expire, then set _expiry_scheduled to False, so - # that the next call to save_expiry_ts can schedule a new expiry task. - self._expiry_scheduled = False - # The duration (in ms) after which rooms should be removed # `_rooms_to_exclude_from_dummy_event_insertion` (with the effect that we will try @@ -836,11 +835,6 @@ def handle_new_client_event( self.store.remove_push_actions_from_staging, event.event_id ) - # If there's an expiry timestamp, schedule the redaction of the event. - expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) - if isinstance(expiry_ts, int) and not event.is_state(): - yield self._message_handler.save_expiry_ts(event.event_id, expiry_ts) - @defer.inlineCallbacks def persist_and_notify_client_event( self, requester, event, context, ratelimit=True, extra_users=[] @@ -982,6 +976,11 @@ def is_inviter_member_event(e): event, context=context ) + # If there's an expiry timestamp on the event, schedule its expiry. + expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) + if isinstance(expiry_ts, int) and not event.is_state(): + yield self._message_handler.maybe_schedule_next_expiry(expiry_ts) + yield self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id) def _notify(): diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 4ab21d1eb034..b41657aba2de 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -940,6 +940,11 @@ def _update_metadata_tables_txn( txn, event.event_id, labels, event.room_id, event.depth ) + # If there's an expiry timestamp on the event, store it. + expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) + if isinstance(expiry_ts, int) and not event.is_state(): + self.insert_event_expiry_txn(txn, event.event_id, expiry_ts) + # Insert into the room_memberships table. self._store_room_members_txn( txn, @@ -1968,17 +1973,18 @@ def insert_labels_for_event_txn( ], ) - def insert_event_expiry(self, event_id, expiry_ts): + def insert_event_expiry_txn(self, txn, event_id, expiry_ts): """Save the expiry timestamp associated with a given event ID. Args: + txn (LoggingTransaction): The database transaction to use. event_id (str): The event ID the expiry timestamp is associated with. expiry_ts (int): The timestamp at which to expire (delete) the event. """ - return self._simple_insert( + return self._simple_insert_txn( + txn=txn, table="event_expiry", values={"event_id": event_id, "expiry_ts": expiry_ts}, - desc="insert_event_expiry", ) def expire_event(self, event): From 07678989be92ee9b233d488e3dd83670f3bdf01f Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 2 Dec 2019 17:51:16 +0000 Subject: [PATCH 28/45] Incorporate review --- synapse/handlers/message.py | 76 ++++++++++------- synapse/storage/data_stores/main/events.py | 82 ++++++++++--------- .../data_stores/main/events_bg_updates.py | 64 --------------- .../main/schema/delta/56/event_expiry.sql | 3 +- 4 files changed, 88 insertions(+), 137 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index fd2339f9dafc..8950ba82a54a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -70,7 +70,12 @@ def __init__(self, hs): self._event_serializer = hs.get_event_client_serializer() self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages - self._scheduled_expiry = None + # The scheduled call to self._expire_event. None if no call is currently + # scheduled. + self._scheduled_expiry = None # type: twisted.internet.interfaces.IDelayedCall + # Whether the handler is currently running through the logic for scheduling a + # call to self._expire_event. + self._scheduling_expiry = False if not hs.config.worker_app: run_as_background_process( @@ -248,40 +253,59 @@ def maybe_schedule_next_expiry(self, expiry_ts): expiry_ts (int): Timestamp to compare the time at which the next expiry will happen with. """ - if not self._scheduled_expiry or not self._scheduled_expiry.active(): - yield self._schedule_next_expiry() + # Ensure that we're not scheduling an expiry task if we're already doing that + # while processing a different request. + if self._scheduling_expiry: + return + self._scheduling_expiry = True - next_scheduled_expiry_ts = self._scheduled_expiry.getTime() * 1000 - if expiry_ts < next_scheduled_expiry_ts: - self._scheduled_expiry.cancel() - yield self._schedule_next_expiry() + try: + # Don't schedule an expiry task if there's already one scheduled. + if not self._scheduled_expiry or not self._scheduled_expiry.active(): + yield self._schedule_next_expiry() + + # If the provided timestamp refers to a time before the scheduled time of the + # next expiry task, cancel that task and reschedule it for this timestamp. + next_scheduled_expiry_ts = self._scheduled_expiry.getTime() * 1000 + if expiry_ts < next_scheduled_expiry_ts: + self._scheduled_expiry.cancel() + yield self._schedule_next_expiry(expiry_ts) + finally: + self._scheduling_expiry = False @defer.inlineCallbacks - def _schedule_next_expiry(self): + def _schedule_next_expiry(self, expiry_ts=None): """Retrieve the expiry timestamp of the next event to be expired, and schedule an expiry task for it. + Optionally, if an expiry timestamp is already provided as part of the call to + this function, use it and skip the database lookup. + If there's no event left to expire, set _expiry_scheduled to False so that a future call to save_expiry_ts can schedule a new expiry task. + + Args: + expiry_ts (int|None): The expiry timestamp to use to schedule the next expiry + task. If not provided, the timestamp of event that'll expire the soonest + will be retrieved from the database and used instead. """ - # Try to get the expiry timestamp of the next event to expire. - next_event_to_expire = yield self.store.get_next_event_to_expire() + if expiry_ts is None: + # Try to get the expiry timestamp of the next event to expire. + next_event_to_expire = yield self.store.get_next_event_to_expire() + if next_event_to_expire: + expiry_ts = next_event_to_expire.get("expiry_ts") - if next_event_to_expire: + if expiry_ts is not None: # Figure out how many seconds we need to wait before expiring the event. now_ms = self.clock.time_msec() - delay = (next_event_to_expire["expiry_ts"] - now_ms) / 1000 + delay = (expiry_ts - now_ms) / 1000 # callLater doesn't support negative delays, so trim the delay to 0 if we're # in that case. if delay < 0: delay = 0 - logger.info( - "Scheduling expiry of event %s in %.3fs", - next_event_to_expire["event_id"], - delay, - ) + logger.info("Scheduling next expiry task in %.3fs", delay) self._scheduled_expiry = self.clock.call_later(delay, self._expire_event) else: @@ -291,7 +315,7 @@ def _schedule_next_expiry(self): @defer.inlineCallbacks def _expire_event(self): - """Retrieve and expires an event that needs to be expired from the database. + """Retrieve and expire an event that needs to be expired from the database. If the event doesn't exist in the database, log it and delete the expiry date from the database (so that we don't try to expire it again). @@ -305,19 +329,9 @@ def _expire_event(self): logger.info("Expiring event %s", event_id) - # Try to retrieve the event from the database. - event = yield self.store.get_event(event_id) - - if not event: - # If we can't find the event, log a warning and delete the expiry date from - # the database so that we don't try to expire it again in the future. - logger.warning("Can't expire event %s because we don't have it." % event_id) - yield self.store.delete_event_expiry(event) - return - - # Expire the event. This function also deletes the expiry date from the database - # in the same database transaction. - yield self.store.expire_event(event) + # Expire the event if we know about it. This function also deletes the expiry + # date from the database in the same database transaction. + yield self.store.expire_event(event_id) # Schedule the expiry of the next event to expire. yield self._schedule_next_expiry() diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index b41657aba2de..138807620a67 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -130,6 +130,8 @@ def _censor_redactions(): if self.hs.config.redaction_retention_period is not None: hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000) + self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages + @defer.inlineCallbacks def _read_forward_extremities(self): def fetch(txn): @@ -908,6 +910,8 @@ def _update_metadata_tables_txn( # Remove from relations table. self._handle_redaction(txn, event.redacts) + self._delete_event_expiry_txn(txn, event.redacts) + # Update the event_forward_extremities, event_backward_extremities and # event_edges tables. self._handle_mult_prev_events( @@ -940,10 +944,11 @@ def _update_metadata_tables_txn( txn, event.event_id, labels, event.room_id, event.depth ) - # If there's an expiry timestamp on the event, store it. - expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) - if isinstance(expiry_ts, int) and not event.is_state(): - self.insert_event_expiry_txn(txn, event.event_id, expiry_ts) + if self._ephemeral_messages_enabled: + # If there's an expiry timestamp on the event, store it. + expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) + if isinstance(expiry_ts, int) and not event.is_state(): + self._insert_event_expiry_txn(txn, event.event_id, expiry_ts) # Insert into the room_memberships table. self._store_room_members_txn( @@ -1973,7 +1978,7 @@ def insert_labels_for_event_txn( ], ) - def insert_event_expiry_txn(self, txn, event_id, expiry_ts): + def _insert_event_expiry_txn(self, txn, event_id, expiry_ts): """Save the expiry timestamp associated with a given event ID. Args: @@ -1987,27 +1992,42 @@ def insert_event_expiry_txn(self, txn, event_id, expiry_ts): values={"event_id": event_id, "expiry_ts": expiry_ts}, ) - def expire_event(self, event): - """Redact an event that has expired, and delete its associated expiry timestamp. + @defer.inlineCallbacks + def expire_event(self, event_id): + """Retrieve and expire an event that has expired, and delete its associated + expiry timestamp. If the event can't be retrieved, delete its associated + timestamp so we don't try to expire it again in the future. Args: - event (events.EventBase): The event to delete. + event_id (str): The ID of the event to delete. """ - # Prune the event's dict then convert it to JSON. - pruned_json = encode_json(prune_event_dict(event.get_dict())) + # Try to retrieve the event's content from the database or the event cache. + event = yield self.get_event(event_id) def delete_expired_event_txn(txn): + # Delete the expiry timestamp associated with this event from the database. + self._delete_event_expiry_txn(txn, event_id) + + if not event: + # If we can't find the event, log a warning and delete the expiry date + # from the database so that we don't try to expire it again in the + # future. + logger.warning( + "Can't expire event %s because we don't have it.", event_id + ) + return + + # Prune the event's dict then convert it to JSON. + pruned_json = encode_json(prune_event_dict(event.get_dict())) + # Update the event_json table to replace the event's JSON with the pruned # JSON. self._censor_event_txn(txn, event.event_id, pruned_json) - # Delete the expiry timestamp associated with this event from the database. - self._simple_delete_txn( - txn, table="event_expiry", keyvalues={"event_id": event.event_id} - ) - # We need to invalidate the event cache entry for this event because we - # changed its content in the database. + # changed its content in the database. We can't call + # self._invalidate_cache_and_stream because self.get_event_cache isn't of the + # right type. txn.call_after(self._get_event_cache.invalidate, (event.event_id,)) # Send that invalidation to replication so that other workers also invalidate # the event cache. @@ -2017,31 +2037,18 @@ def delete_expired_event_txn(txn): return self.runInteraction("delete_expired_event", delete_expired_event_txn) - def delete_event_expiry(self, event_id): + def _delete_event_expiry_txn(self, txn, event_id): """Delete the expiry timestamp associated with an event ID without deleting the actual event. Args: + txn (LoggingTransaction): The transaction to use to perform the deletion. event_id (str): The event ID to delete the associated expiry timestamp of. """ - return self._simple_delete( + return self._simple_delete_txn( + txn=txn, table="event_expiry", keyvalues={"event_id": event_id}, - desc="delete_event_expiry", - ) - - def get_events_to_expire(self): - """Retrieve the IDs of the events we have an expiry timestamp for, along with - said timestamp. - - Returns: - A list of dicts, each containing an event_id and an expiry_ts. - """ - return self._simple_select_list( - table="event_expiry", - keyvalues=None, - retcols=["event_id", "expiry_ts"], - desc="get_events_to_expire", ) def get_next_event_to_expire(self): @@ -2049,7 +2056,7 @@ def get_next_event_to_expire(self): table, or None if there's no more event to expire. Returns: - A dict with an event_id and an expiry_ts of there's at least one row in the + A dict with an event_id and an expiry_ts if there's at least one row in the event_expiry table, None otherwise. """ @@ -2061,12 +2068,7 @@ def get_next_event_to_expire_txn(txn): """ ) - res = self.cursor_to_dict(txn) - - if res: - return res[0] - else: - return None + return txn.fetchone() return self.runInteraction( desc="get_next_event_to_expire", func=get_next_event_to_expire_txn diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py index d6f8e6e24f1d..aa87f9abc538 100644 --- a/synapse/storage/data_stores/main/events_bg_updates.py +++ b/synapse/storage/data_stores/main/events_bg_updates.py @@ -90,10 +90,6 @@ def __init__(self, db_conn, hs): "event_store_labels", self._event_store_labels ) - self.register_background_update_handler( - "event_store_expiry", self._event_store_expiry - ) - @defer.inlineCallbacks def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] @@ -577,63 +573,3 @@ def _event_store_labels_txn(txn): yield self._end_background_update("event_store_labels") return num_rows - - @defer.inlineCallbacks - def _event_store_expiry(self, progress, batch_size): - """Background update handler which will store expiry timestamps for existing - events. - """ - last_event_id = progress.get("last_event_id", "") - - def _event_store_labels_txn(txn): - txn.execute( - """ - SELECT event_id, json FROM event_json - LEFT JOIN event_expiry USING (event_id) - WHERE event_id > ? AND expiry_ts IS NULL - ORDER BY event_id LIMIT ? - """, - (last_event_id, batch_size), - ) - - results = list(txn) - - nbrows = 0 - last_row_event_id = "" - for (event_id, event_json_raw) in results: - try: - event_json = json.loads(event_json_raw) - - expiry_ts = event_json.get(EventContentFields.SELF_DESTRUCT_AFTER) - - if isinstance(expiry_ts, int): - self._simple_insert_txn( - txn=txn, - table="event_expiry", - values={"event_id": event_id, "expiry_ts": expiry_ts}, - ) - except Exception as e: - logger.warning( - "Unable to load event %s (no expiry timestamp will be imported):" - " %s", - event_id, - e, - ) - - nbrows += 1 - last_row_event_id = event_id - - self._background_update_progress_txn( - txn, "event_store_expiry", {"last_event_id": last_row_event_id} - ) - - return nbrows - - num_rows = yield self.runInteraction( - desc="event_store_expiry", func=_event_store_labels_txn - ) - - if not num_rows: - yield self._end_background_update("event_store_expiry") - - return num_rows diff --git a/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql b/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql index 49dc86a68653..81a36a8b1dc9 100644 --- a/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql +++ b/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql @@ -18,5 +18,4 @@ CREATE TABLE IF NOT EXISTS event_expiry ( expiry_ts BIGINT NOT NULL ); -INSERT INTO background_updates (update_name, progress_json) VALUES - ('event_store_expiry', '{}'); +CREATE INDEX event_expiry_expiry_ts_idx ON event_expiry(expiry_ts); From 9e61740d4eeede7ca8c4b5cd9e20f3257b2265a6 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 2 Dec 2019 18:03:09 +0000 Subject: [PATCH 29/45] Lint --- synapse/storage/data_stores/main/events.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 138807620a67..7a99ff429b08 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -2046,9 +2046,7 @@ def _delete_event_expiry_txn(self, txn, event_id): event_id (str): The event ID to delete the associated expiry timestamp of. """ return self._simple_delete_txn( - txn=txn, - table="event_expiry", - keyvalues={"event_id": event_id}, + txn=txn, table="event_expiry", keyvalues={"event_id": event_id} ) def get_next_event_to_expire(self): From d719bfee314615626c597087edee96400584a330 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 2 Dec 2019 18:13:25 +0000 Subject: [PATCH 30/45] Fix type hint --- synapse/handlers/message.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 8950ba82a54a..0f885f815d3d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import Optional from six import iteritems, itervalues, string_types @@ -22,6 +23,7 @@ from twisted.internet import defer from twisted.internet.defer import succeed +from twisted.internet.interfaces import IDelayedCall from synapse import event_auth from synapse.api.constants import ( @@ -72,7 +74,7 @@ def __init__(self, hs): # The scheduled call to self._expire_event. None if no call is currently # scheduled. - self._scheduled_expiry = None # type: twisted.internet.interfaces.IDelayedCall + self._scheduled_expiry = None # type: Optional[IDelayedCall] # Whether the handler is currently running through the logic for scheduling a # call to self._expire_event. self._scheduling_expiry = False From 857fbd71b85b4220691f5cc2038a259f23f1b1ea Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 2 Dec 2019 18:27:50 +0000 Subject: [PATCH 31/45] fetchone returns a tuple, not a dict --- synapse/handlers/message.py | 4 ++-- synapse/storage/data_stores/main/events.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 0f885f815d3d..cd5358d04725 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -295,7 +295,7 @@ def _schedule_next_expiry(self, expiry_ts=None): # Try to get the expiry timestamp of the next event to expire. next_event_to_expire = yield self.store.get_next_event_to_expire() if next_event_to_expire: - expiry_ts = next_event_to_expire.get("expiry_ts") + expiry_ts = next_event_to_expire[1] if expiry_ts is not None: # Figure out how many seconds we need to wait before expiring the event. @@ -327,7 +327,7 @@ def _expire_event(self): # Get the ID of the next event to expire. next_event_to_expire = yield self.store.get_next_event_to_expire() - event_id = next_event_to_expire["event_id"] + event_id = next_event_to_expire[0] logger.info("Expiring event %s", event_id) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 7a99ff429b08..75134a26527a 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -2035,7 +2035,7 @@ def delete_expired_event_txn(txn): txn, "_get_event_cache", (event.event_id,) ) - return self.runInteraction("delete_expired_event", delete_expired_event_txn) + yield self.runInteraction("delete_expired_event", delete_expired_event_txn) def _delete_event_expiry_txn(self, txn, event_id): """Delete the expiry timestamp associated with an event ID without deleting the From d9d24833272f12b9601e4231968824967aac58b2 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 3 Dec 2019 10:50:29 +0000 Subject: [PATCH 32/45] Update synapse/handlers/message.py Co-Authored-By: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/handlers/message.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 4141df319c76..43d50190f868 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -75,6 +75,7 @@ def __init__(self, hs): # The scheduled call to self._expire_event. None if no call is currently # scheduled. self._scheduled_expiry = None # type: Optional[IDelayedCall] + # Whether the handler is currently running through the logic for scheduling a # call to self._expire_event. self._scheduling_expiry = False From b3ae6cb4d054c3838ecb03b4942e277e1f60dce2 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 3 Dec 2019 11:44:42 +0000 Subject: [PATCH 33/45] Incorporate review --- synapse/handlers/federation.py | 11 +- synapse/handlers/message.py | 113 +++++++++++---------- synapse/storage/data_stores/main/events.py | 7 +- 3 files changed, 68 insertions(+), 63 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2cf61a9bc3d1..19b9f6e2d45f 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -148,6 +148,8 @@ def __init__(self, hs): self._message_handler = hs.get_message_handler() + self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages + @defer.inlineCallbacks def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False): """ Process a PDU received via a federation /send/ transaction, or @@ -2722,11 +2724,10 @@ def persist_events_and_notify(self, event_and_contexts, backfilled=False): event_and_contexts, backfilled=backfilled ) - for (event, context) in event_and_contexts: - # If there's an expiry timestamp on the event, schedule its expiry. - expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) - if isinstance(expiry_ts, int) and not event.is_state(): - yield self._message_handler.maybe_schedule_next_expiry(expiry_ts) + if self._ephemeral_messages_enabled: + for (event, context) in event_and_contexts: + # If there's an expiry timestamp on the event, schedule its expiry. + self._message_handler.maybe_schedule_next_expiry(event) if not backfilled: # Never notify for backfilled events for event, _ in event_and_contexts: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 43d50190f868..581bb3c5417b 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -247,97 +247,101 @@ def get_joined_members(self, requester, room_id): for user_id, profile in iteritems(users_with_profile) } - @defer.inlineCallbacks - def maybe_schedule_next_expiry(self, expiry_ts): + def maybe_schedule_next_expiry(self, event): """Schedule the expiry of an event if there's not already one running, or if the one running is for an event that will expire after the provided timestamp. Args: - expiry_ts (int): Timestamp to compare the time at which the next expiry will - happen with. + event (EventBase): The event to schedule the expiry of. """ - # Ensure that we're not scheduling an expiry task if we're already doing that - # while processing a different request. - if self._scheduling_expiry: + expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) + if not isinstance(expiry_ts, int) or event.is_state(): return + self._scheduling_expiry = True try: # Don't schedule an expiry task if there's already one scheduled. if not self._scheduled_expiry or not self._scheduled_expiry.active(): - yield self._schedule_next_expiry() + self._schedule_expiry_for_event( + event_id=event.event_id, + expiry_ts=expiry_ts, + ) + return # If the provided timestamp refers to a time before the scheduled time of the # next expiry task, cancel that task and reschedule it for this timestamp. next_scheduled_expiry_ts = self._scheduled_expiry.getTime() * 1000 if expiry_ts < next_scheduled_expiry_ts: self._scheduled_expiry.cancel() - yield self._schedule_next_expiry(expiry_ts) + self._schedule_expiry_for_event( + event_id=event.event_id, + expiry_ts=expiry_ts, + ) finally: self._scheduling_expiry = False @defer.inlineCallbacks - def _schedule_next_expiry(self, expiry_ts=None): - """Retrieve the expiry timestamp of the next event to be expired, and schedule - an expiry task for it. - - Optionally, if an expiry timestamp is already provided as part of the call to - this function, use it and skip the database lookup. + def _schedule_next_expiry(self): + """Retrieve the ID and the expiry timestamp of the next event to be expired, + and schedule an expiry task for it. - If there's no event left to expire, set _expiry_scheduled to False so that a + If there's no event left to expire, set _expiry_scheduled to None so that a future call to save_expiry_ts can schedule a new expiry task. + """ + # Try to get the expiry timestamp of the next event to expire. + res = yield self.store.get_next_event_to_expire() + if res: + event_id, expiry_ts = res + self._schedule_expiry_for_event(event_id, expiry_ts) + else: + # If there's no more event to expire, then set _expiry_scheduled to None, so + # that the next call to save_expiry_ts can schedule a new expiry task. + self._scheduled_expiry = None + + def _schedule_expiry_for_event(self, event_id, expiry_ts): + """Schedule an expiry task for the provided event. Args: - expiry_ts (int|None): The expiry timestamp to use to schedule the next expiry - task. If not provided, the timestamp of event that'll expire the soonest - will be retrieved from the database and used instead. + event_id (str): The ID of the event to expire. + expiry_ts (int): The timestamp at which to expire the event. """ - if expiry_ts is None: - # Try to get the expiry timestamp of the next event to expire. - next_event_to_expire = yield self.store.get_next_event_to_expire() - if next_event_to_expire: - expiry_ts = next_event_to_expire[1] - - if expiry_ts is not None: - # Figure out how many seconds we need to wait before expiring the event. - now_ms = self.clock.time_msec() - delay = (expiry_ts - now_ms) / 1000 + # Figure out how many seconds we need to wait before expiring the event. + now_ms = self.clock.time_msec() + delay = (expiry_ts - now_ms) / 1000 - # callLater doesn't support negative delays, so trim the delay to 0 if we're - # in that case. - if delay < 0: - delay = 0 + # callLater doesn't support negative delays, so trim the delay to 0 if we're + # in that case. + if delay < 0: + delay = 0 - logger.info("Scheduling next expiry task in %.3fs", delay) + logger.info("Scheduling expiry for event %s in %.3fs", event_id, delay) - self._scheduled_expiry = self.clock.call_later(delay, self._expire_event) - else: - # If there's no more event to expire, then set _expiry_scheduled to False, so - # that the next call to save_expiry_ts can schedule a new expiry task. - self._scheduled_expiry = False + self._scheduled_expiry = self.clock.call_later( + delay, run_as_background_process, self._expire_event, event_id + ) @defer.inlineCallbacks - def _expire_event(self): + def _expire_event(self, event_id): """Retrieve and expire an event that needs to be expired from the database. If the event doesn't exist in the database, log it and delete the expiry date from the database (so that we don't try to expire it again). """ - if not self._ephemeral_events_enabled: - return - - # Get the ID of the next event to expire. - next_event_to_expire = yield self.store.get_next_event_to_expire() - event_id = next_event_to_expire[0] + assert not self._ephemeral_events_enabled logger.info("Expiring event %s", event_id) - # Expire the event if we know about it. This function also deletes the expiry - # date from the database in the same database transaction. - yield self.store.expire_event(event_id) + try: + # Expire the event if we know about it. This function also deletes the expiry + # date from the database in the same database transaction. + yield self.store.expire_event(event_id) + except Exception as e: + logger.error("Could not expire event %s: %r", event_id, e) # Schedule the expiry of the next event to expire. - yield self._schedule_next_expiry() + if not self._scheduling_expiry: + yield self._schedule_next_expiry() # The duration (in ms) after which rooms should be removed @@ -411,6 +415,8 @@ def __init__(self, hs): self._message_handler = hs.get_message_handler() + self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages + @defer.inlineCallbacks def create_event( self, @@ -993,10 +999,9 @@ def is_inviter_member_event(e): event, context=context ) - # If there's an expiry timestamp on the event, schedule its expiry. - expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) - if isinstance(expiry_ts, int) and not event.is_state(): - yield self._message_handler.maybe_schedule_next_expiry(expiry_ts) + if self._ephemeral_events_enabled: + # If there's an expiry timestamp on the event, schedule its expiry. + self._message_handler.maybe_schedule_next_expiry(event) yield self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 75134a26527a..54102a8290f5 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -910,8 +910,6 @@ def _update_metadata_tables_txn( # Remove from relations table. self._handle_redaction(txn, event.redacts) - self._delete_event_expiry_txn(txn, event.redacts) - # Update the event_forward_extremities, event_backward_extremities and # event_edges tables. self._handle_mult_prev_events( @@ -2054,8 +2052,9 @@ def get_next_event_to_expire(self): table, or None if there's no more event to expire. Returns: - A dict with an event_id and an expiry_ts if there's at least one row in the - event_expiry table, None otherwise. + A tuple containing the event ID as its first element and an expiry timestamp + as its second one, if there's at least one row in the event_expiry table. + None otherwise. """ def get_next_event_to_expire_txn(txn): From bc00c73c8e5c7d905dce18428de9bd7515af9e16 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 3 Dec 2019 11:48:13 +0000 Subject: [PATCH 34/45] Lint --- synapse/handlers/message.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 581bb3c5417b..b91ed533f5a5 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -264,8 +264,7 @@ def maybe_schedule_next_expiry(self, event): # Don't schedule an expiry task if there's already one scheduled. if not self._scheduled_expiry or not self._scheduled_expiry.active(): self._schedule_expiry_for_event( - event_id=event.event_id, - expiry_ts=expiry_ts, + event_id=event.event_id, expiry_ts=expiry_ts, ) return @@ -275,8 +274,7 @@ def maybe_schedule_next_expiry(self, event): if expiry_ts < next_scheduled_expiry_ts: self._scheduled_expiry.cancel() self._schedule_expiry_for_event( - event_id=event.event_id, - expiry_ts=expiry_ts, + event_id=event.event_id, expiry_ts=expiry_ts, ) finally: self._scheduling_expiry = False From a18f1f78d5fcd309b606ddf2ab2cf0a60f528943 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 3 Dec 2019 12:03:03 +0000 Subject: [PATCH 35/45] Lint --- synapse/handlers/federation.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 19b9f6e2d45f..d44a43165170 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -31,12 +31,7 @@ from twisted.internet import defer from synapse import event_auth -from synapse.api.constants import ( - EventContentFields, - EventTypes, - Membership, - RejectedReason, -) +from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.api.errors import ( AuthError, CodeMessageException, From 5ead39d0c45648668f0d4dfcde3c8795b8ef90fe Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 3 Dec 2019 12:42:39 +0000 Subject: [PATCH 36/45] Incorporate review --- synapse/handlers/message.py | 34 +++++++++------------- synapse/storage/data_stores/main/events.py | 2 +- 2 files changed, 14 insertions(+), 22 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index b91ed533f5a5..20688ac66457 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -247,7 +247,7 @@ def get_joined_members(self, requester, room_id): for user_id, profile in iteritems(users_with_profile) } - def maybe_schedule_next_expiry(self, event): + def maybe_schedule_expiry(self, event): """Schedule the expiry of an event if there's not already one running, or if the one running is for an event that will expire after the provided timestamp. @@ -258,26 +258,19 @@ def maybe_schedule_next_expiry(self, event): if not isinstance(expiry_ts, int) or event.is_state(): return - self._scheduling_expiry = True + # Don't schedule an expiry task if there's already one scheduled. + if self._scheduled_expiry and self._scheduled_expiry.active(): + return - try: - # Don't schedule an expiry task if there's already one scheduled. - if not self._scheduled_expiry or not self._scheduled_expiry.active(): - self._schedule_expiry_for_event( - event_id=event.event_id, expiry_ts=expiry_ts, - ) - return + # If the provided timestamp refers to a time before the scheduled time of the + # next expiry task, cancel that task and reschedule it for this timestamp. + next_scheduled_expiry_ts = self._scheduled_expiry.getTime() * 1000 + if expiry_ts < next_scheduled_expiry_ts: + self._scheduled_expiry.cancel() + else: + return - # If the provided timestamp refers to a time before the scheduled time of the - # next expiry task, cancel that task and reschedule it for this timestamp. - next_scheduled_expiry_ts = self._scheduled_expiry.getTime() * 1000 - if expiry_ts < next_scheduled_expiry_ts: - self._scheduled_expiry.cancel() - self._schedule_expiry_for_event( - event_id=event.event_id, expiry_ts=expiry_ts, - ) - finally: - self._scheduling_expiry = False + self._schedule_expiry_for_event(event.event_id, expiry_ts) @defer.inlineCallbacks def _schedule_next_expiry(self): @@ -338,8 +331,7 @@ def _expire_event(self, event_id): logger.error("Could not expire event %s: %r", event_id, e) # Schedule the expiry of the next event to expire. - if not self._scheduling_expiry: - yield self._schedule_next_expiry() + yield self._schedule_next_expiry() # The duration (in ms) after which rooms should be removed diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 54102a8290f5..79c91fe284ff 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -2051,7 +2051,7 @@ def get_next_event_to_expire(self): """Retrieve the entry with the lowest expiry timestamp in the event_expiry table, or None if there's no more event to expire. - Returns: + Returns: Deferred[Optional[Tuple[str, int]]] A tuple containing the event ID as its first element and an expiry timestamp as its second one, if there's at least one row in the event_expiry table. None otherwise. From 442a1928472c3950c3991bdb23ed524629d4b491 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 3 Dec 2019 12:47:10 +0000 Subject: [PATCH 37/45] Fix maybe logic --- synapse/handlers/message.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 20688ac66457..a114a6f5abe9 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -262,13 +262,14 @@ def maybe_schedule_expiry(self, event): if self._scheduled_expiry and self._scheduled_expiry.active(): return - # If the provided timestamp refers to a time before the scheduled time of the - # next expiry task, cancel that task and reschedule it for this timestamp. - next_scheduled_expiry_ts = self._scheduled_expiry.getTime() * 1000 - if expiry_ts < next_scheduled_expiry_ts: - self._scheduled_expiry.cancel() - else: - return + if self._scheduling_expiry: + # If the provided timestamp refers to a time before the scheduled time of the + # next expiry task, cancel that task and reschedule it for this timestamp. + next_scheduled_expiry_ts = self._scheduled_expiry.getTime() * 1000 + if expiry_ts < next_scheduled_expiry_ts: + self._scheduled_expiry.cancel() + else: + return self._schedule_expiry_for_event(event.event_id, expiry_ts) From a0ec11ab1fd4aa93d72ed0459618275c07964f1a Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 3 Dec 2019 12:49:18 +0000 Subject: [PATCH 38/45] Prevent two tasks from being scheduled at the same time --- synapse/handlers/message.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a114a6f5abe9..40b68b3b1030 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -76,10 +76,6 @@ def __init__(self, hs): # scheduled. self._scheduled_expiry = None # type: Optional[IDelayedCall] - # Whether the handler is currently running through the logic for scheduling a - # call to self._expire_event. - self._scheduling_expiry = False - if not hs.config.worker_app: run_as_background_process( "_schedule_next_expiry", self._schedule_next_expiry @@ -248,8 +244,9 @@ def get_joined_members(self, requester, room_id): } def maybe_schedule_expiry(self, event): - """Schedule the expiry of an event if there's not already one running, or if the - one running is for an event that will expire after the provided timestamp. + """Schedule the expiry of an event if there's not already one scheduled, + or if the one running is for an event that will expire after the provided + timestamp. Args: event (EventBase): The event to schedule the expiry of. @@ -258,10 +255,6 @@ def maybe_schedule_expiry(self, event): if not isinstance(expiry_ts, int) or event.is_state(): return - # Don't schedule an expiry task if there's already one scheduled. - if self._scheduled_expiry and self._scheduled_expiry.active(): - return - if self._scheduling_expiry: # If the provided timestamp refers to a time before the scheduled time of the # next expiry task, cancel that task and reschedule it for this timestamp. @@ -271,6 +264,8 @@ def maybe_schedule_expiry(self, event): else: return + # _schedule_expiry_for_event won't actually schedule anything if there's already + # a task scheduled. self._schedule_expiry_for_event(event.event_id, expiry_ts) @defer.inlineCallbacks @@ -292,12 +287,17 @@ def _schedule_next_expiry(self): self._scheduled_expiry = None def _schedule_expiry_for_event(self, event_id, expiry_ts): - """Schedule an expiry task for the provided event. + """Schedule an expiry task for the provided event if there's not already one + scheduled. Args: event_id (str): The ID of the event to expire. expiry_ts (int): The timestamp at which to expire the event. """ + # Don't schedule an expiry task if there's already one scheduled. + if self._scheduled_expiry and self._scheduled_expiry.active(): + return + # Figure out how many seconds we need to wait before expiring the event. now_ms = self.clock.time_msec() delay = (expiry_ts - now_ms) / 1000 From 9583c60243e41d3769e8843c28b6c2ce2f7f5a07 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 3 Dec 2019 12:52:49 +0000 Subject: [PATCH 39/45] Typo --- synapse/handlers/message.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 40b68b3b1030..093b219b0515 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -255,7 +255,7 @@ def maybe_schedule_expiry(self, event): if not isinstance(expiry_ts, int) or event.is_state(): return - if self._scheduling_expiry: + if self._scheduled_expiry: # If the provided timestamp refers to a time before the scheduled time of the # next expiry task, cancel that task and reschedule it for this timestamp. next_scheduled_expiry_ts = self._scheduled_expiry.getTime() * 1000 From d5640c3e3dbce43180b043fd55a48e1e1cf9332e Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 3 Dec 2019 14:14:12 +0000 Subject: [PATCH 40/45] Typos --- synapse/handlers/federation.py | 2 +- synapse/handlers/message.py | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d44a43165170..668d49a053f9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2722,7 +2722,7 @@ def persist_events_and_notify(self, event_and_contexts, backfilled=False): if self._ephemeral_messages_enabled: for (event, context) in event_and_contexts: # If there's an expiry timestamp on the event, schedule its expiry. - self._message_handler.maybe_schedule_next_expiry(event) + self._message_handler.maybe_schedule_expiry(event) if not backfilled: # Never notify for backfilled events for event, _ in event_and_contexts: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 093b219b0515..3b9e264a4310 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -310,7 +310,11 @@ def _schedule_expiry_for_event(self, event_id, expiry_ts): logger.info("Scheduling expiry for event %s in %.3fs", event_id, delay) self._scheduled_expiry = self.clock.call_later( - delay, run_as_background_process, self._expire_event, event_id + delay, + run_as_background_process, + "_expire_event", + self._expire_event, + event_id, ) @defer.inlineCallbacks @@ -320,7 +324,7 @@ def _expire_event(self, event_id): If the event doesn't exist in the database, log it and delete the expiry date from the database (so that we don't try to expire it again). """ - assert not self._ephemeral_events_enabled + assert self._ephemeral_events_enabled logger.info("Expiring event %s", event_id) @@ -992,7 +996,7 @@ def is_inviter_member_event(e): if self._ephemeral_events_enabled: # If there's an expiry timestamp on the event, schedule its expiry. - self._message_handler.maybe_schedule_next_expiry(event) + self._message_handler.maybe_schedule_expiry(event) yield self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id) From 1573a5e189f1df7432cced522e5fe80a0d238586 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 3 Dec 2019 15:12:31 +0000 Subject: [PATCH 41/45] Incorporate review --- synapse/handlers/federation.py | 3 +-- synapse/handlers/message.py | 36 +++++++++++++++++----------------- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 668d49a053f9..d9d0cd9eef3d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -121,6 +121,7 @@ def __init__(self, hs): self.pusher_pool = hs.get_pusherpool() self.spam_checker = hs.get_spam_checker() self.event_creation_handler = hs.get_event_creation_handler() + self._message_handler = hs.get_message_handler() self._server_notices_mxid = hs.config.server_notices_mxid self.config = hs.config self.http_client = hs.get_simple_http_client() @@ -141,8 +142,6 @@ def __init__(self, hs): self.third_party_event_rules = hs.get_third_party_event_rules() - self._message_handler = hs.get_message_handler() - self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages @defer.inlineCallbacks diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3b9e264a4310..4f53a5f5dc61 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -71,6 +71,7 @@ def __init__(self, hs): self.state_store = self.storage.state self._event_serializer = hs.get_event_client_serializer() self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages + self._is_worker_app = bool(hs.config.worker_app) # The scheduled call to self._expire_event. None if no call is currently # scheduled. @@ -248,24 +249,20 @@ def maybe_schedule_expiry(self, event): or if the one running is for an event that will expire after the provided timestamp. + This function needs to invalidate the event cache, which is only possible on + the master process, and therefore needs to be run on there. + Args: event (EventBase): The event to schedule the expiry of. """ + assert not self._is_worker_app + expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) if not isinstance(expiry_ts, int) or event.is_state(): return - if self._scheduled_expiry: - # If the provided timestamp refers to a time before the scheduled time of the - # next expiry task, cancel that task and reschedule it for this timestamp. - next_scheduled_expiry_ts = self._scheduled_expiry.getTime() * 1000 - if expiry_ts < next_scheduled_expiry_ts: - self._scheduled_expiry.cancel() - else: - return - # _schedule_expiry_for_event won't actually schedule anything if there's already - # a task scheduled. + # a task scheduled for a timestamp that's sooner than the provided one. self._schedule_expiry_for_event(event.event_id, expiry_ts) @defer.inlineCallbacks @@ -281,22 +278,23 @@ def _schedule_next_expiry(self): if res: event_id, expiry_ts = res self._schedule_expiry_for_event(event_id, expiry_ts) - else: - # If there's no more event to expire, then set _expiry_scheduled to None, so - # that the next call to save_expiry_ts can schedule a new expiry task. - self._scheduled_expiry = None def _schedule_expiry_for_event(self, event_id, expiry_ts): """Schedule an expiry task for the provided event if there's not already one - scheduled. + scheduled at a timestamp that's sooner than the provided one. Args: event_id (str): The ID of the event to expire. expiry_ts (int): The timestamp at which to expire the event. """ - # Don't schedule an expiry task if there's already one scheduled. - if self._scheduled_expiry and self._scheduled_expiry.active(): - return + if self._scheduled_expiry: + # If the provided timestamp refers to a time before the scheduled time of the + # next expiry task, cancel that task and reschedule it for this timestamp. + next_scheduled_expiry_ts = self._scheduled_expiry.getTime() * 1000 + if expiry_ts < next_scheduled_expiry_ts: + self._scheduled_expiry.cancel() + else: + return # Figure out how many seconds we need to wait before expiring the event. now_ms = self.clock.time_msec() @@ -326,6 +324,8 @@ def _expire_event(self, event_id): """ assert self._ephemeral_events_enabled + self._scheduled_expiry = None + logger.info("Expiring event %s", event_id) try: From 8d2ee047da63c31e35294ab5ea7f005db98add8b Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 3 Dec 2019 17:26:05 +0000 Subject: [PATCH 42/45] Add flakey tests to the black list --- .buildkite/worker-blacklist | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.buildkite/worker-blacklist b/.buildkite/worker-blacklist index 7950d19db339..b4e20d327517 100644 --- a/.buildkite/worker-blacklist +++ b/.buildkite/worker-blacklist @@ -64,3 +64,9 @@ If remote user leaves room, changes device and rejoins we see update in sync uploading self-signing key notifies over federation Inbound federation can receive redacted events Outbound federation can request missing events + +# Flakey tests noticed as of https://github.com/matrix-org/synapse/pull/6409 +Can recv device messages over federation +Device messages over federation wake up /sync +Federation handles empty auth_events in state_ids sanely +An event which redacts itself should be ignored From c1102d6cd50c3d7ed40a2384432e7b10d84e1c9d Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 3 Dec 2019 18:04:15 +0000 Subject: [PATCH 43/45] Moar flakey tests --- .buildkite/worker-blacklist | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.buildkite/worker-blacklist b/.buildkite/worker-blacklist index b4e20d327517..5905cd08b432 100644 --- a/.buildkite/worker-blacklist +++ b/.buildkite/worker-blacklist @@ -70,3 +70,5 @@ Can recv device messages over federation Device messages over federation wake up /sync Federation handles empty auth_events in state_ids sanely An event which redacts itself should be ignored +Ignore user in existing room + From 3e007c077f68668730042a153efd995be00f01f9 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 3 Dec 2019 18:48:35 +0000 Subject: [PATCH 44/45] Revert "Moar flakey tests" This reverts commit c1102d6cd50c3d7ed40a2384432e7b10d84e1c9d. --- .buildkite/worker-blacklist | 2 -- 1 file changed, 2 deletions(-) diff --git a/.buildkite/worker-blacklist b/.buildkite/worker-blacklist index 5905cd08b432..b4e20d327517 100644 --- a/.buildkite/worker-blacklist +++ b/.buildkite/worker-blacklist @@ -70,5 +70,3 @@ Can recv device messages over federation Device messages over federation wake up /sync Federation handles empty auth_events in state_ids sanely An event which redacts itself should be ignored -Ignore user in existing room - From 7648299a9e91437c01a52e0d6d8a2a42f853671d Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 3 Dec 2019 18:48:47 +0000 Subject: [PATCH 45/45] Revert "Add flakey tests to the black list" This reverts commit 8d2ee047da63c31e35294ab5ea7f005db98add8b. --- .buildkite/worker-blacklist | 6 ------ 1 file changed, 6 deletions(-) diff --git a/.buildkite/worker-blacklist b/.buildkite/worker-blacklist index b4e20d327517..7950d19db339 100644 --- a/.buildkite/worker-blacklist +++ b/.buildkite/worker-blacklist @@ -64,9 +64,3 @@ If remote user leaves room, changes device and rejoins we see update in sync uploading self-signing key notifies over federation Inbound federation can receive redacted events Outbound federation can request missing events - -# Flakey tests noticed as of https://github.com/matrix-org/synapse/pull/6409 -Can recv device messages over federation -Device messages over federation wake up /sync -Federation handles empty auth_events in state_ids sanely -An event which redacts itself should be ignored