Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add ephemeral messages support (MSC2228) #6409

Merged
merged 49 commits into from
Dec 3, 2019
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
c982380
Add database support for ephemeral messages
babolivier Nov 25, 2019
6ad5e92
Add helper functions to redact messages on expiry
babolivier Nov 25, 2019
2628e19
Plug redaction scheduling in the right places
babolivier Nov 25, 2019
8255b9e
Changelog
babolivier Nov 25, 2019
8563e2b
Lint
babolivier Nov 25, 2019
f2c6018
Actually delete events on expiry
babolivier Nov 26, 2019
01d1876
Rename functions + doc
babolivier Nov 26, 2019
274612a
Lint
babolivier Nov 26, 2019
d424549
Move the field name to a constant
babolivier Nov 26, 2019
052294e
Hide the feature behind a configuration flag
babolivier Nov 26, 2019
dee234f
Don't expire state events
babolivier Nov 26, 2019
344d287
Merge branch 'develop' into babolivier/ephemeral-messages
babolivier Nov 26, 2019
36230a5
Test case and various fixes
babolivier Nov 27, 2019
89765f4
Lint
babolivier Nov 27, 2019
ceab0a8
Add background update
babolivier Nov 27, 2019
26dec61
Lint
babolivier Nov 27, 2019
2ac7869
Update synapse/api/constants.py
babolivier Nov 27, 2019
a4307c6
Incorporate part of the review
babolivier Nov 27, 2019
a6461c0
Change the scheduling flow
babolivier Nov 27, 2019
c33d2b4
Use the same code path for censoring redactions and expired events
babolivier Nov 28, 2019
b766861
Merge branch 'develop' into babolivier/ephemeral-messages
babolivier Nov 28, 2019
9a531be
Lint and docstring
babolivier Nov 28, 2019
964c6da
Lint
babolivier Nov 28, 2019
370d089
Various fixes
babolivier Nov 28, 2019
385f47b
Move the database functions to the worker store
babolivier Nov 28, 2019
e092144
Lint
babolivier Nov 28, 2019
1d1f9f2
Lint again
babolivier Nov 28, 2019
6ca89ba
Revert "Move the database functions to the worker store"
babolivier Nov 29, 2019
0e57c5c
Make the expiry happen on the master process
babolivier Nov 29, 2019
0767898
Incorporate review
babolivier Dec 2, 2019
9e61740
Lint
babolivier Dec 2, 2019
d719bfe
Fix type hint
babolivier Dec 2, 2019
857fbd7
fetchone returns a tuple, not a dict
babolivier Dec 2, 2019
8cd5a67
Merge branch 'develop' into babolivier/ephemeral-messages
babolivier Dec 2, 2019
d9d2483
Update synapse/handlers/message.py
babolivier Dec 3, 2019
b3ae6cb
Incorporate review
babolivier Dec 3, 2019
bc00c73
Lint
babolivier Dec 3, 2019
a18f1f7
Lint
babolivier Dec 3, 2019
5ead39d
Incorporate review
babolivier Dec 3, 2019
442a192
Fix maybe logic
babolivier Dec 3, 2019
a0ec11a
Prevent two tasks from being scheduled at the same time
babolivier Dec 3, 2019
9583c60
Typo
babolivier Dec 3, 2019
d5640c3
Typos
babolivier Dec 3, 2019
1573a5e
Incorporate review
babolivier Dec 3, 2019
44741a0
Merge branch 'develop' into babolivier/ephemeral-messages
babolivier Dec 3, 2019
8d2ee04
Add flakey tests to the black list
babolivier Dec 3, 2019
c1102d6
Moar flakey tests
babolivier Dec 3, 2019
3e007c0
Revert "Moar flakey tests"
babolivier Dec 3, 2019
7648299
Revert "Add flakey tests to the black list"
babolivier Dec 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6409.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ephemeral messages support by partially implementing [MSC2228](https://github.com/matrix-org/matrix-doc/pull/2228).
4 changes: 4 additions & 0 deletions synapse/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,7 @@ class EventContentFields(object):

# Labels for the event, cf https://github.com/matrix-org/matrix-doc/pull/2326
LABELS = "org.matrix.labels"

# Timestamp to delete the event after
babolivier marked this conversation as resolved.
Show resolved Hide resolved
# cf https://github.com/matrix-org/matrix-doc/pull/2228
SELF_DESTRUCT_AFTER = "org.matrix.self_destruct_after"
2 changes: 2 additions & 0 deletions synapse/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,8 @@ class LimitRemoteRoomsConfig(object):
"cleanup_extremities_with_dummy_events", True
)

self.enable_ephemeral_messages = config.get("enable_ephemeral_messages", False)

def has_tls_listener(self) -> bool:
return any(l["tls"] for l in self.listeners)

Expand Down
8 changes: 8 additions & 0 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def __init__(self, hs):
self.pusher_pool = hs.get_pusherpool()
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
self._message_handler = hs.get_message_handler()
self._server_notices_mxid = hs.config.server_notices_mxid
self.config = hs.config
self.http_client = hs.get_simple_http_client()
Expand All @@ -141,6 +142,8 @@ def __init__(self, hs):

self.third_party_event_rules = hs.get_third_party_event_rules()

self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages

@defer.inlineCallbacks
def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False):
""" Process a PDU received via a federation /send/ transaction, or
Expand Down Expand Up @@ -2715,6 +2718,11 @@ def persist_events_and_notify(self, event_and_contexts, backfilled=False):
event_and_contexts, backfilled=backfilled
)

if self._ephemeral_messages_enabled:
for (event, context) in event_and_contexts:
# If there's an expiry timestamp on the event, schedule its expiry.
self._message_handler.maybe_schedule_expiry(event)

if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
yield self._notify_persisted_event(event, max_stream_id)
Expand Down
123 changes: 122 additions & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Optional

from six import iteritems, itervalues, string_types

from canonicaljson import encode_canonical_json, json

from twisted.internet import defer
from twisted.internet.defer import succeed
from twisted.internet.interfaces import IDelayedCall

from synapse import event_auth
from synapse.api.constants import EventTypes, Membership, RelationTypes, UserTypes
from synapse.api.constants import (
EventContentFields,
EventTypes,
Membership,
RelationTypes,
UserTypes,
)
from synapse.api.errors import (
AuthError,
Codes,
Expand Down Expand Up @@ -62,6 +70,17 @@ def __init__(self, hs):
self.storage = hs.get_storage()
self.state_store = self.storage.state
self._event_serializer = hs.get_event_client_serializer()
self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages
self._is_worker_app = bool(hs.config.worker_app)

# The scheduled call to self._expire_event. None if no call is currently
# scheduled.
self._scheduled_expiry = None # type: Optional[IDelayedCall]
babolivier marked this conversation as resolved.
Show resolved Hide resolved

if not hs.config.worker_app:
run_as_background_process(
"_schedule_next_expiry", self._schedule_next_expiry
)

@defer.inlineCallbacks
def get_room_data(
Expand Down Expand Up @@ -225,6 +244,100 @@ def get_joined_members(self, requester, room_id):
for user_id, profile in iteritems(users_with_profile)
}

def maybe_schedule_expiry(self, event):
"""Schedule the expiry of an event if there's not already one scheduled,
or if the one running is for an event that will expire after the provided
timestamp.

This function needs to invalidate the event cache, which is only possible on
the master process, and therefore needs to be run on there.

Args:
event (EventBase): The event to schedule the expiry of.
"""
assert not self._is_worker_app

expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER)
babolivier marked this conversation as resolved.
Show resolved Hide resolved
if not isinstance(expiry_ts, int) or event.is_state():
return
babolivier marked this conversation as resolved.
Show resolved Hide resolved

# _schedule_expiry_for_event won't actually schedule anything if there's already
# a task scheduled for a timestamp that's sooner than the provided one.
self._schedule_expiry_for_event(event.event_id, expiry_ts)

@defer.inlineCallbacks
def _schedule_next_expiry(self):
"""Retrieve the ID and the expiry timestamp of the next event to be expired,
and schedule an expiry task for it.

If there's no event left to expire, set _expiry_scheduled to None so that a
future call to save_expiry_ts can schedule a new expiry task.
"""
# Try to get the expiry timestamp of the next event to expire.
res = yield self.store.get_next_event_to_expire()
if res:
event_id, expiry_ts = res
self._schedule_expiry_for_event(event_id, expiry_ts)

def _schedule_expiry_for_event(self, event_id, expiry_ts):
"""Schedule an expiry task for the provided event if there's not already one
scheduled at a timestamp that's sooner than the provided one.

Args:
event_id (str): The ID of the event to expire.
expiry_ts (int): The timestamp at which to expire the event.
"""
if self._scheduled_expiry:
# If the provided timestamp refers to a time before the scheduled time of the
# next expiry task, cancel that task and reschedule it for this timestamp.
next_scheduled_expiry_ts = self._scheduled_expiry.getTime() * 1000
if expiry_ts < next_scheduled_expiry_ts:
self._scheduled_expiry.cancel()
else:
return

# Figure out how many seconds we need to wait before expiring the event.
now_ms = self.clock.time_msec()
delay = (expiry_ts - now_ms) / 1000

# callLater doesn't support negative delays, so trim the delay to 0 if we're
# in that case.
if delay < 0:
delay = 0
Comment on lines +300 to +306
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw I'd be inclined to write all of this as:

delay = max(0, expiry_ts - self.clock.time_msec()) / 1000

but no harm in making it all explicit as you have.


logger.info("Scheduling expiry for event %s in %.3fs", event_id, delay)

self._scheduled_expiry = self.clock.call_later(
delay,
run_as_background_process,
"_expire_event",
self._expire_event,
event_id,
)

@defer.inlineCallbacks
def _expire_event(self, event_id):
"""Retrieve and expire an event that needs to be expired from the database.

If the event doesn't exist in the database, log it and delete the expiry date
from the database (so that we don't try to expire it again).
"""
assert self._ephemeral_events_enabled

self._scheduled_expiry = None

logger.info("Expiring event %s", event_id)

try:
# Expire the event if we know about it. This function also deletes the expiry
# date from the database in the same database transaction.
yield self.store.expire_event(event_id)
except Exception as e:
logger.error("Could not expire event %s: %r", event_id, e)

# Schedule the expiry of the next event to expire.
yield self._schedule_next_expiry()
babolivier marked this conversation as resolved.
Show resolved Hide resolved


# The duration (in ms) after which rooms should be removed
# `_rooms_to_exclude_from_dummy_event_insertion` (with the effect that we will try
Expand Down Expand Up @@ -295,6 +408,10 @@ def __init__(self, hs):
5 * 60 * 1000,
)

self._message_handler = hs.get_message_handler()

self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages

@defer.inlineCallbacks
def create_event(
self,
Expand Down Expand Up @@ -877,6 +994,10 @@ def is_inviter_member_event(e):
event, context=context
)

if self._ephemeral_events_enabled:
# If there's an expiry timestamp on the event, schedule its expiry.
self._message_handler.maybe_schedule_expiry(event)

yield self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)

def _notify():
Expand Down
126 changes: 120 additions & 6 deletions synapse/storage/data_stores/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ def _censor_redactions():
if self.hs.config.redaction_retention_period is not None:
hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000)

self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages

@defer.inlineCallbacks
def _read_forward_extremities(self):
def fetch(txn):
Expand Down Expand Up @@ -940,6 +942,12 @@ def _update_metadata_tables_txn(
txn, event.event_id, labels, event.room_id, event.depth
)

if self._ephemeral_messages_enabled:
# If there's an expiry timestamp on the event, store it.
expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER)
if isinstance(expiry_ts, int) and not event.is_state():
self._insert_event_expiry_txn(txn, event.event_id, expiry_ts)

# Insert into the room_memberships table.
self._store_room_members_txn(
txn,
Expand Down Expand Up @@ -1101,12 +1109,7 @@ def _censor_redactions(self):
def _update_censor_txn(txn):
for redaction_id, event_id, pruned_json in updates:
if pruned_json:
self._simple_update_one_txn(
txn,
table="event_json",
keyvalues={"event_id": event_id},
updatevalues={"json": pruned_json},
)
self._censor_event_txn(txn, event_id, pruned_json)

self._simple_update_one_txn(
txn,
Expand All @@ -1117,6 +1120,22 @@ def _update_censor_txn(txn):

yield self.runInteraction("_update_censor_txn", _update_censor_txn)

def _censor_event_txn(self, txn, event_id, pruned_json):
"""Censor an event by replacing its JSON in the event_json table with the
provided pruned JSON.

Args:
txn (LoggingTransaction): The database transaction.
event_id (str): The ID of the event to censor.
pruned_json (str): The pruned JSON
"""
self._simple_update_one_txn(
txn,
table="event_json",
keyvalues={"event_id": event_id},
updatevalues={"json": pruned_json},
)

@defer.inlineCallbacks
def count_daily_messages(self):
"""
Expand Down Expand Up @@ -1957,6 +1976,101 @@ def insert_labels_for_event_txn(
],
)

def _insert_event_expiry_txn(self, txn, event_id, expiry_ts):
"""Save the expiry timestamp associated with a given event ID.

Args:
txn (LoggingTransaction): The database transaction to use.
event_id (str): The event ID the expiry timestamp is associated with.
expiry_ts (int): The timestamp at which to expire (delete) the event.
"""
return self._simple_insert_txn(
txn=txn,
table="event_expiry",
values={"event_id": event_id, "expiry_ts": expiry_ts},
)

@defer.inlineCallbacks
def expire_event(self, event_id):
"""Retrieve and expire an event that has expired, and delete its associated
expiry timestamp. If the event can't be retrieved, delete its associated
timestamp so we don't try to expire it again in the future.

Args:
event_id (str): The ID of the event to delete.
"""
# Try to retrieve the event's content from the database or the event cache.
event = yield self.get_event(event_id)

def delete_expired_event_txn(txn):
# Delete the expiry timestamp associated with this event from the database.
self._delete_event_expiry_txn(txn, event_id)

if not event:
# If we can't find the event, log a warning and delete the expiry date
# from the database so that we don't try to expire it again in the
# future.
logger.warning(
"Can't expire event %s because we don't have it.", event_id
)
return

# Prune the event's dict then convert it to JSON.
pruned_json = encode_json(prune_event_dict(event.get_dict()))

# Update the event_json table to replace the event's JSON with the pruned
# JSON.
self._censor_event_txn(txn, event.event_id, pruned_json)

# We need to invalidate the event cache entry for this event because we
# changed its content in the database. We can't call
# self._invalidate_cache_and_stream because self.get_event_cache isn't of the
# right type.
txn.call_after(self._get_event_cache.invalidate, (event.event_id,))
# Send that invalidation to replication so that other workers also invalidate
# the event cache.
self._send_invalidation_to_replication(
babolivier marked this conversation as resolved.
Show resolved Hide resolved
txn, "_get_event_cache", (event.event_id,)
)

yield self.runInteraction("delete_expired_event", delete_expired_event_txn)

def _delete_event_expiry_txn(self, txn, event_id):
"""Delete the expiry timestamp associated with an event ID without deleting the
actual event.

Args:
txn (LoggingTransaction): The transaction to use to perform the deletion.
event_id (str): The event ID to delete the associated expiry timestamp of.
"""
return self._simple_delete_txn(
txn=txn, table="event_expiry", keyvalues={"event_id": event_id}
)
Comment on lines +2038 to +2048
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd inline this, but ymmv


def get_next_event_to_expire(self):
"""Retrieve the entry with the lowest expiry timestamp in the event_expiry
table, or None if there's no more event to expire.

Returns: Deferred[Optional[Tuple[str, int]]]
A tuple containing the event ID as its first element and an expiry timestamp
as its second one, if there's at least one row in the event_expiry table.
None otherwise.
"""

def get_next_event_to_expire_txn(txn):
txn.execute(
"""
SELECT event_id, expiry_ts FROM event_expiry
ORDER BY expiry_ts ASC LIMIT 1
"""
)

return txn.fetchone()

return self.runInteraction(
desc="get_next_event_to_expire", func=get_next_event_to_expire_txn
)


AllNewEventsResult = namedtuple(
"AllNewEventsResult",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE TABLE IF NOT EXISTS event_expiry (
event_id TEXT PRIMARY KEY,
expiry_ts BIGINT NOT NULL
babolivier marked this conversation as resolved.
Show resolved Hide resolved
);

CREATE INDEX event_expiry_expiry_ts_idx ON event_expiry(expiry_ts);
Loading