Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Censor redactions in DB after a month #5934

Merged
merged 11 commits into from Sep 9, 2019
1 change: 1 addition & 0 deletions changelog.d/5934.feature
@@ -0,0 +1 @@
Redact events in the database that have been redacted for a month.
7 changes: 7 additions & 0 deletions docs/sample_config.yaml
Expand Up @@ -306,6 +306,13 @@ listeners:
#
#allow_per_room_profiles: false

# How long to keep redacted events in unredacted form in the database. After
# this period redacted events get replaced with their redacted form in the DB.
#
# Defaults to `7d`. Set to `null` to disable.
#
redaction_retention_period: 7d


## TLS ##

Expand Down
17 changes: 17 additions & 0 deletions synapse/config/server.py
Expand Up @@ -162,6 +162,16 @@ def read_config(self, config, **kwargs):

self.mau_trial_days = config.get("mau_trial_days", 0)

# How long to keep redacted events in the database in unredacted form
# before redacting them.
redaction_retention_period = config.get("redaction_retention_period", "7d")
if redaction_retention_period is not None:
self.redaction_retention_period = self.parse_duration(
redaction_retention_period
)
else:
self.redaction_retention_period = None

# Options to disable HS
self.hs_disabled = config.get("hs_disabled", False)
self.hs_disabled_message = config.get("hs_disabled_message", "")
Expand Down Expand Up @@ -718,6 +728,13 @@ def generate_config_section(
# Defaults to 'true'.
#
#allow_per_room_profiles: false

# How long to keep redacted events in unredacted form in the database. After
# this period redacted events get replaced with their redacted form in the DB.
#
# Defaults to `7d`. Set to `null` to disable.
#
redaction_retention_period: 7d
"""
% locals()
)
Expand Down
102 changes: 101 additions & 1 deletion synapse/storage/events.py
Expand Up @@ -23,7 +23,7 @@
from six import iteritems, text_type
from six.moves import range

from canonicaljson import json
from canonicaljson import encode_canonical_json, json
from prometheus_client import Counter, Histogram

from twisted.internet import defer
Expand All @@ -33,6 +33,7 @@
from synapse.api.errors import SynapseError
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event_dict
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.logging.utils import log_function
from synapse.metrics import BucketCollector
Expand Down Expand Up @@ -262,6 +263,14 @@ def read_forward_extremities():

hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000)

def _censor_redactions():
return run_as_background_process(
"_censor_redactions", self._censor_redactions
)

if self.hs.config.redaction_retention_period is not None:
hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000)

@defer.inlineCallbacks
def _read_forward_extremities(self):
def fetch(txn):
Expand Down Expand Up @@ -1548,6 +1557,97 @@ def _store_redaction(self, txn, event):
(event.event_id, event.redacts),
)

@defer.inlineCallbacks
def _censor_redactions(self):
"""Censors all redactions older than a month that haven't been censored.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

By censor we mean update the event_json table with the redacted event.

Returns:
Deferred
"""

if self.hs.config.redaction_retention_period is None:
return

max_pos = yield self.find_first_stream_ordering_after_ts(
self._clock.time_msec() - self.hs.config.redaction_retention_period
)

# We fetch all redactions that:
# 1. point to an event we have that has,
# 2. has a stream ordering from before the cut off, and
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# 3. we haven't yet censored.
#
# This is limited to 100 events to ensure that we don't try and do too
# much at once. We'll get called again so this should eventually catch
# up.
#
# We use the range [-max_pos, max_pos] to handle backfilled events,
# which are given negative stream ordering.
sql = """
SELECT redact_event.event_id, redacts FROM redactions
INNER JOIN events AS redact_event USING (event_id)
INNER JOIN events AS original_event ON (
redact_event.room_id = original_event.room_id
AND redacts = original_event.event_id
)
WHERE NOT have_censored
AND ? <= redact_event.stream_ordering AND redact_event.stream_ordering <= ?
ORDER BY redact_event.stream_ordering ASC
LIMIT ?
"""

rows = yield self._execute(
"_censor_redactions_fetch", None, sql, -max_pos, max_pos, 100
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
)

updates = []

for redaction_id, event_id in rows:
redaction_event = yield self.get_event(redaction_id, allow_none=True)
original_event = yield self.get_event(
event_id, allow_rejected=True, allow_none=True
)

# The SQL above ensures that we have both the redaction and
# original event, so if the `get_event` calls return None it
# means that the redaction wasn't allowed. Either way we know that
# the result won't change so we mark the fact that we've checked.
if (
redaction_event
and original_event
and original_event.internal_metadata.is_redacted()
):
# Redaction was allowed
pruned_json = encode_canonical_json(
prune_event_dict(original_event.get_dict())
)
else:
# Redaction wasn't allowed
pruned_json = None

updates.append((redaction_id, event_id, pruned_json))

def _update_censor_txn(txn):
for redaction_id, event_id, pruned_json in updates:
if pruned_json:
self._simple_update_one_txn(
txn,
table="event_json",
keyvalues={"event_id": event_id},
updatevalues={"json": pruned_json},
)

self._simple_update_one_txn(
txn,
table="redactions",
keyvalues={"event_id": redaction_id},
updatevalues={"have_censored": True},
)

yield self.runInteraction("_update_censor_txn", _update_censor_txn)

@defer.inlineCallbacks
def count_daily_messages(self):
"""
Expand Down
17 changes: 17 additions & 0 deletions synapse/storage/schema/delta/56/redaction_censor.sql
@@ -0,0 +1,17 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

ALTER TABLE redactions ADD COLUMN have_censored BOOL NOT NULL DEFAULT false;
CREATE INDEX redactions_have_censored ON redactions(event_id) WHERE not have_censored;
77 changes: 76 additions & 1 deletion tests/storage/test_redaction.py
Expand Up @@ -17,6 +17,8 @@

from mock import Mock

from canonicaljson import json

from twisted.internet import defer

from synapse.api.constants import EventTypes, Membership
Expand All @@ -29,8 +31,10 @@

class RedactionTestCase(unittest.HomeserverTestCase):
def make_homeserver(self, reactor, clock):
config = self.default_config()
config["redaction_retention_period"] = "30d"
return self.setup_test_homeserver(
resource_for_federation=Mock(), http_client=None
resource_for_federation=Mock(), http_client=None, config=config
)

def prepare(self, reactor, clock, hs):
Expand Down Expand Up @@ -286,3 +290,74 @@ def room_id(self):
self.assertEqual(
fetched.unsigned["redacted_because"].event_id, redaction_event_id2
)

def test_redact_censor(self):
"""Test that a redacted event gets censored in the DB after a month
"""

self.get_success(
self.inject_room_member(self.room1, self.u_alice, Membership.JOIN)
)

msg_event = self.get_success(self.inject_message(self.room1, self.u_alice, "t"))

# Check event has not been redacted:
event = self.get_success(self.store.get_event(msg_event.event_id))

self.assertObjectHasAttributes(
{
"type": EventTypes.Message,
"user_id": self.u_alice.to_string(),
"content": {"body": "t", "msgtype": "message"},
},
event,
)

self.assertFalse("redacted_because" in event.unsigned)

# Redact event
reason = "Because I said so"
self.get_success(
self.inject_redaction(self.room1, msg_event.event_id, self.u_alice, reason)
)

event = self.get_success(self.store.get_event(msg_event.event_id))

self.assertTrue("redacted_because" in event.unsigned)

self.assertObjectHasAttributes(
{
"type": EventTypes.Message,
"user_id": self.u_alice.to_string(),
"content": {},
},
event,
)

event_json = self.get_success(
self.store._simple_select_one_onecol(
table="event_json",
keyvalues={"event_id": msg_event.event_id},
retcol="json",
)
)

self.assert_dict(
{"content": {"body": "t", "msgtype": "message"}}, json.loads(event_json)
)

# Advance by 30 days, then advance again to ensure that the looping call
# for updating the stream position gets called and then the looping call
# for the censoring gets called.
self.reactor.advance(60 * 60 * 24 * 31)
self.reactor.advance(60 * 60 * 2)

event_json = self.get_success(
self.store._simple_select_one_onecol(
table="event_json",
keyvalues={"event_id": msg_event.event_id},
retcol="json",
)
)

self.assert_dict({"content": {}}, json.loads(event_json))