Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refactor get_events_as_list #5699

Merged
merged 3 commits into from
Jul 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5699.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix some problems with authenticating redactions in recent room versions.
122 changes: 74 additions & 48 deletions synapse/storage/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,37 +218,23 @@ def get_events_as_list(
if not event_ids:
defer.returnValue([])

event_id_list = event_ids
event_ids = set(event_ids)

event_entry_map = self._get_events_from_cache(
event_ids, allow_rejected=allow_rejected
# there may be duplicates so we cast the list to a set
event_entry_map = yield self._get_events_from_cache_or_db(
set(event_ids), allow_rejected=allow_rejected
)

missing_events_ids = [e for e in event_ids if e not in event_entry_map]

if missing_events_ids:
log_ctx = LoggingContext.current_context()
log_ctx.record_event_fetch(len(missing_events_ids))

# Note that _enqueue_events is also responsible for turning db rows
# into FrozenEvents (via _get_event_from_row), which involves seeing if
# the events have been redacted, and if so pulling the redaction event out
# of the database to check it.
#
# _enqueue_events is a bit of a rubbish name but naming is hard.
missing_events = yield self._enqueue_events(
missing_events_ids, allow_rejected=allow_rejected
)

event_entry_map.update(missing_events)

events = []
for event_id in event_id_list:
for event_id in event_ids:
entry = event_entry_map.get(event_id, None)
if not entry:
continue

if not allow_rejected:
assert not entry.event.rejected_reason, (
"rejected event returned from _get_events_from_cache_or_db despite "
"allow_rejected=False"
)

# Starting in room version v3, some redactions need to be rechecked if we
# didn't have the redacted event at the time, so we recheck on read
# instead.
Expand Down Expand Up @@ -291,42 +277,82 @@ def get_events_as_list(
# recheck.
entry.event.internal_metadata.recheck_redaction = False
else:
# We don't have the event that is being redacted, so we
# assume that the event isn't authorized for now. (If we
# later receive the event, then we will always redact
# it anyway, since we have this redaction)
# We either don't have the event that is being redacted (so we
# assume that the event isn't authorised for now), or the
# senders don't match (so it will never be authorised). Either
# way, we shouldn't return it.
#
# (If we later receive the event, then we will redact it anyway,
# since we have this redaction)
continue

if allow_rejected or not entry.event.rejected_reason:
if check_redacted and entry.redacted_event:
event = entry.redacted_event
else:
event = entry.event

events.append(event)

if get_prev_content:
if "replaces_state" in event.unsigned:
prev = yield self.get_event(
event.unsigned["replaces_state"],
get_prev_content=False,
allow_none=True,
)
if prev:
event.unsigned = dict(event.unsigned)
event.unsigned["prev_content"] = prev.content
event.unsigned["prev_sender"] = prev.sender
if check_redacted and entry.redacted_event:
event = entry.redacted_event
else:
event = entry.event

events.append(event)

if get_prev_content:
if "replaces_state" in event.unsigned:
prev = yield self.get_event(
event.unsigned["replaces_state"],
get_prev_content=False,
allow_none=True,
)
if prev:
event.unsigned = dict(event.unsigned)
event.unsigned["prev_content"] = prev.content
event.unsigned["prev_sender"] = prev.sender

defer.returnValue(events)

@defer.inlineCallbacks
def _get_events_from_cache_or_db(self, event_ids, allow_rejected=False):
"""Fetch a bunch of events from the cache or the database.

If events are pulled from the database, they will be cached for future lookups.

Args:
event_ids (Iterable[str]): The event_ids of the events to fetch
allow_rejected (bool): Whether to include rejected events

Returns:
Deferred[Dict[str, _EventCacheEntry]]:
map from event id to result
"""
event_entry_map = self._get_events_from_cache(
event_ids, allow_rejected=allow_rejected
)

missing_events_ids = [e for e in event_ids if e not in event_entry_map]

if missing_events_ids:
log_ctx = LoggingContext.current_context()
log_ctx.record_event_fetch(len(missing_events_ids))

# Note that _enqueue_events is also responsible for turning db rows
# into FrozenEvents (via _get_event_from_row), which involves seeing if
# the events have been redacted, and if so pulling the redaction event out
# of the database to check it.
#
# _enqueue_events is a bit of a rubbish name but naming is hard.
missing_events = yield self._enqueue_events(
missing_events_ids, allow_rejected=allow_rejected
)

event_entry_map.update(missing_events)

return event_entry_map

def _invalidate_get_event_cache(self, event_id):
self._get_event_cache.invalidate((event_id,))

def _get_events_from_cache(self, events, allow_rejected, update_metrics=True):
"""Fetch events from the caches

Args:
events (list(str)): list of event_ids to fetch
events (Iterable[str]): list of event_ids to fetch
allow_rejected (bool): Whether to return events that were rejected
update_metrics (bool): Whether to update the cache hit ratio metrics

Expand Down