Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Bundled relations refactoring #11408

Merged
merged 7 commits into from Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11408.misc
@@ -0,0 +1 @@
Refactor including the bundled relations when serializing an event.
146 changes: 82 additions & 64 deletions synapse/events/utils.py
@@ -1,4 +1,5 @@
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -392,15 +393,16 @@ async def serialize_event(
self,
event: Union[JsonDict, EventBase],
time_now: int,
bundle_aggregations: bool = True,
bundle_relations: bool = True,
**kwargs: Any,
) -> JsonDict:
"""Serializes a single event.

Args:
event
event: The event being serialized.
time_now: The current time in milliseconds
bundle_aggregations: Whether to bundle in related events
bundle_relations: Whether to include the bundled relations for this
event.
**kwargs: Arguments to pass to `serialize_event`

Returns:
Expand All @@ -410,77 +412,93 @@ async def serialize_event(
if not isinstance(event, EventBase):
return event

event_id = event.event_id
serialized_event = serialize_event(event, time_now, **kwargs)

# If MSC1849 is enabled then we need to look if there are any relations
# we need to bundle in with the event.
# Do not bundle relations if the event has been redacted
if not event.internal_metadata.is_redacted() and (
self._msc1849_enabled and bundle_aggregations
self._msc1849_enabled and bundle_relations
):
annotations = await self.store.get_aggregation_groups_for_event(event_id)
references = await self.store.get_relations_for_event(
event_id, RelationTypes.REFERENCE, direction="f"
)

if annotations.chunk:
r = serialized_event["unsigned"].setdefault("m.relations", {})
r[RelationTypes.ANNOTATION] = annotations.to_dict()

if references.chunk:
r = serialized_event["unsigned"].setdefault("m.relations", {})
r[RelationTypes.REFERENCE] = references.to_dict()

edit = None
if event.type == EventTypes.Message:
edit = await self.store.get_applicable_edit(event_id)

if edit:
# If there is an edit replace the content, preserving existing
# relations.

# Ensure we take copies of the edit content, otherwise we risk modifying
# the original event.
edit_content = edit.content.copy()

# Unfreeze the event content if necessary, so that we may modify it below
edit_content = unfreeze(edit_content)
serialized_event["content"] = edit_content.get("m.new_content", {})

# Check for existing relations
relations = event.content.get("m.relates_to")
if relations:
# Keep the relations, ensuring we use a dict copy of the original
serialized_event["content"]["m.relates_to"] = relations.copy()
else:
serialized_event["content"].pop("m.relates_to", None)

r = serialized_event["unsigned"].setdefault("m.relations", {})
r[RelationTypes.REPLACE] = {
"event_id": edit.event_id,
"origin_server_ts": edit.origin_server_ts,
"sender": edit.sender,
}

# If this event is the start of a thread, include a summary of the replies.
if self._msc3440_enabled:
(
thread_count,
latest_thread_event,
) = await self.store.get_thread_summary(event_id)
if latest_thread_event:
r = serialized_event["unsigned"].setdefault("m.relations", {})
r[RelationTypes.THREAD] = {
# Don't bundle aggregations as this could recurse forever.
"latest_event": await self.serialize_event(
latest_thread_event, time_now, bundle_aggregations=False
),
"count": thread_count,
}
await self._injected_bundled_relations(event, time_now, serialized_event)

return serialized_event

async def _injected_bundled_relations(
self, event: EventBase, time_now: int, serialized_event: JsonDict
) -> None:
"""Potentially injects bundled relations into the unsigned portion of the serialized event.

Args:
event: The event being serialized.
time_now: The current time in milliseconds
serialized_event: The serialized event which may be modified.

"""
event_id = event.event_id

# The bundled relations to include.
relations = {}

annotations = await self.store.get_aggregation_groups_for_event(event_id)
if annotations.chunk:
relations[RelationTypes.ANNOTATION] = annotations.to_dict()

references = await self.store.get_relations_for_event(
event_id, RelationTypes.REFERENCE, direction="f"
)
if references.chunk:
relations[RelationTypes.REFERENCE] = references.to_dict()

edit = None
if event.type == EventTypes.Message:
edit = await self.store.get_applicable_edit(event_id)

if edit:
# If there is an edit replace the content, preserving existing
# relations.

# Ensure we take copies of the edit content, otherwise we risk modifying
# the original event.
edit_content = edit.content.copy()

# Unfreeze the event content if necessary, so that we may modify it below
edit_content = unfreeze(edit_content)
serialized_event["content"] = edit_content.get("m.new_content", {})

# Check for existing relations
relates_to = event.content.get("m.relates_to")
if relates_to:
# Keep the relations, ensuring we use a dict copy of the original
serialized_event["content"]["m.relates_to"] = relates_to.copy()
else:
serialized_event["content"].pop("m.relates_to", None)

relations[RelationTypes.REPLACE] = {
"event_id": edit.event_id,
"origin_server_ts": edit.origin_server_ts,
"sender": edit.sender,
}

# If this event is the start of a thread, include a summary of the replies.
if self._msc3440_enabled:
(
thread_count,
latest_thread_event,
) = await self.store.get_thread_summary(event_id)
if latest_thread_event:
relations[RelationTypes.THREAD] = {
# Don't bundle relations as this could recurse forever.
"latest_event": await self.serialize_event(
latest_thread_event, time_now, bundle_relations=False
),
"count": thread_count,
}

# If any bundled relations were found, include them.
if relations:
serialized_event["unsigned"].setdefault("m.relations", {}).update(relations)

async def serialize_events(
self, events: Iterable[Union[JsonDict, EventBase]], time_now: int, **kwargs: Any
) -> List[JsonDict]:
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/events.py
Expand Up @@ -124,7 +124,7 @@ async def get_stream(
as_client_event=as_client_event,
# We don't bundle "live" events, as otherwise clients
# will end up double counting annotations.
bundle_aggregations=False,
bundle_relations=False,
)

chunk = {
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/message.py
Expand Up @@ -252,7 +252,7 @@ async def get_state_events(
now,
# We don't bother bundling aggregations in when asked for state
# events, as clients won't use them.
bundle_aggregations=False,
bundle_relations=False,
)
return events

Expand Down
4 changes: 2 additions & 2 deletions synapse/rest/admin/rooms.py
Expand Up @@ -448,7 +448,7 @@ async def on_GET(
now,
# We don't bother bundling aggregations in when asked for state
# events, as clients won't use them.
bundle_aggregations=False,
bundle_relations=False,
)
ret = {"state": room_state}

Expand Down Expand Up @@ -778,7 +778,7 @@ async def on_GET(
results["state"],
time_now,
# No need to bundle aggregations for state events
bundle_aggregations=False,
bundle_relations=False,
)

return 200, results
Expand Down
6 changes: 3 additions & 3 deletions synapse/rest/client/relations.py
Expand Up @@ -224,17 +224,17 @@ async def on_GET(
)

now = self.clock.time_msec()
# We set bundle_aggregations to False when retrieving the original
# We set bundle_relations to False when retrieving the original
# event because we want the content before relations were applied to
# it.
original_event = await self._event_serializer.serialize_event(
event, now, bundle_aggregations=False
event, now, bundle_relations=False
)
# Similarly, we don't allow relations to be applied to relations, so we
# return the original relations without any aggregations on top of them
# here.
serialized_events = await self._event_serializer.serialize_events(
events, now, bundle_aggregations=False
events, now, bundle_relations=False
)

return_value = pagination_chunk.to_dict()
Expand Down
2 changes: 1 addition & 1 deletion synapse/rest/client/room.py
Expand Up @@ -719,7 +719,7 @@ async def on_GET(
results["state"],
time_now,
# No need to bundle aggregations for state events
bundle_aggregations=False,
bundle_relations=False,
)

return 200, results
Expand Down
2 changes: 1 addition & 1 deletion synapse/rest/client/sync.py
Expand Up @@ -522,7 +522,7 @@ def serialize(events: Iterable[EventBase]) -> Awaitable[List[JsonDict]]:
time_now=time_now,
# We don't bundle "live" events, as otherwise clients
# will end up double counting annotations.
bundle_aggregations=False,
bundle_relations=False,
token_id=token_id,
event_format=event_formatter,
only_event_fields=only_fields,
Expand Down