Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refactor backfilled into specific behavior function arguments (_persist_events_and_state_updates) #11417

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11417.misc
@@ -0,0 +1 @@
Refactor `backfilled` into specific behavior function arguments (`_persist_events_and_state_updates` and downstream calls).
74 changes: 54 additions & 20 deletions synapse/storage/databases/main/events.py
Expand Up @@ -121,10 +121,12 @@ def __init__(
async def _persist_events_and_state_updates(
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
*,
current_state_for_room: Dict[str, StateMap[str]],
state_delta_for_room: Dict[str, DeltaState],
new_forward_extremeties: Dict[str, List[str]],
backfilled: bool = False,
use_negative_stream_ordering: bool = False,
inhibit_local_membership_updates: bool = False,
) -> None:
"""Persist a set of events alongside updates to the current state and
forward extremities tables.
Expand All @@ -137,7 +139,14 @@ async def _persist_events_and_state_updates(
room state
new_forward_extremities: Map from room_id to list of event IDs
that are the new forward extremities of the room.
backfilled
use_negative_stream_ordering: Whether to start stream_ordering on
the negative side and decrement. This should be set as True
for backfilled events because backfilled events get a negative
stream ordering so they don't come down incremental `/sync`.
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.

Returns:
Resolves when the events have been persisted
Expand All @@ -159,7 +168,7 @@ async def _persist_events_and_state_updates(
#
# Note: Multiple instances of this function cannot be in flight at
# the same time for the same room.
if backfilled:
if use_negative_stream_ordering:
stream_ordering_manager = self._backfill_id_gen.get_next_mult(
len(events_and_contexts)
)
Expand All @@ -176,13 +185,13 @@ async def _persist_events_and_state_updates(
"persist_events",
self._persist_events_txn,
events_and_contexts=events_and_contexts,
backfilled=backfilled,
inhibit_local_membership_updates=inhibit_local_membership_updates,
state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties,
)
persist_event_counter.inc(len(events_and_contexts))

if not backfilled:
if stream < 0:
# backfilled events have negative stream orderings, so we don't
# want to set the event_persisted_position to that.
synapse.metrics.event_persisted_position.set(
Expand Down Expand Up @@ -316,8 +325,9 @@ def _get_prevs_before_rejected_txn(txn, batch):
def _persist_events_txn(
self,
txn: LoggingTransaction,
*,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool,
inhibit_local_membership_updates: bool = False,
state_delta_for_room: Optional[Dict[str, DeltaState]] = None,
new_forward_extremeties: Optional[Dict[str, List[str]]] = None,
):
Expand All @@ -330,7 +340,10 @@ def _persist_events_txn(
Args:
txn
events_and_contexts: events to persist
backfilled: True if the events were backfilled
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
delete_existing True to purge existing table rows for the events
from the database. This is useful when retrying due to
IntegrityError.
Expand Down Expand Up @@ -363,9 +376,7 @@ def _persist_events_txn(
events_and_contexts
)

self._update_room_depths_txn(
txn, events_and_contexts=events_and_contexts, backfilled=backfilled
)
self._update_room_depths_txn(txn, events_and_contexts=events_and_contexts)

# _update_outliers_txn filters out any events which have already been
# persisted, and returns the filtered list.
Expand Down Expand Up @@ -398,7 +409,7 @@ def _persist_events_txn(
txn,
events_and_contexts=events_and_contexts,
all_events_and_contexts=all_events_and_contexts,
backfilled=backfilled,
inhibit_local_membership_updates=inhibit_local_membership_updates,
)

# We call this last as it assumes we've inserted the events into
Expand Down Expand Up @@ -1200,21 +1211,25 @@ def _update_room_depths_txn(
self,
txn,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool,
):
"""Update min_depth for each room

Args:
txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]): events
we are persisting
backfilled (bool): True if the events were backfilled
"""
depth_updates: Dict[str, int] = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
txn.call_after(self.store._invalidate_get_event_cache, event.event_id)
if not backfilled:
# Then update the `stream_ordering` position to mark the latest
# event as the front of the room. This should not be done for
# backfilled events because backfilled events have negative
# stream_ordering and happened in the past so we know that we don't
# need to update the stream_ordering tip/front for the room.
assert event.internal_metadata.stream_ordering is not None
if event.internal_metadata.stream_ordering >= 0:
txn.call_after(
self.store._events_stream_cache.entity_has_changed,
event.room_id,
Expand Down Expand Up @@ -1427,7 +1442,12 @@ def _store_rejected_events_txn(self, txn, events_and_contexts):
return [ec for ec in events_and_contexts if ec[0] not in to_remove]

def _update_metadata_tables_txn(
self, txn, events_and_contexts, all_events_and_contexts, backfilled
self,
txn,
*,
events_and_contexts,
all_events_and_contexts,
Comment on lines +1448 to +1449
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that these don't have default values, does it necessarily make sense for them to be kw-only args? (Not necessarily asking for a change, just musing)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm leaning to keeping it. Keyword-only matches the sole usage already and seems decent to not mix-up the arguments because there isn't a straightforward order from the function name.

inhibit_local_membership_updates: bool = False,
):
"""Update all the miscellaneous tables for new events

Expand All @@ -1439,7 +1459,10 @@ def _update_metadata_tables_txn(
events that we were going to persist. This includes events
we've already persisted, etc, that wouldn't appear in
events_and_context.
backfilled (bool): True if the events were backfilled
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
"""

# Insert all the push actions into the event_push_actions table.
Expand Down Expand Up @@ -1513,7 +1536,7 @@ def _update_metadata_tables_txn(
for event, _ in events_and_contexts
if event.type == EventTypes.Member
],
backfilled=backfilled,
inhibit_local_membership_updates=inhibit_local_membership_updates,
)

# Insert event_reference_hashes table.
Expand Down Expand Up @@ -1638,8 +1661,19 @@ def _store_event_reference_hashes_txn(self, txn, events):
txn, table="event_reference_hashes", values=vals
)

def _store_room_members_txn(self, txn, events, backfilled):
"""Store a room member in the database."""
def _store_room_members_txn(
self, txn, events, *, inhibit_local_membership_updates: bool = False
):
"""
Store a room member in the database.
Args:
txn: The transaction to use.
events: List of events to store.
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
"""

def non_null_str_or_none(val: Any) -> Optional[str]:
return val if isinstance(val, str) and "\u0000" not in val else None
Expand Down Expand Up @@ -1682,7 +1716,7 @@ def non_null_str_or_none(val: Any) -> Optional[str]:
# band membership", like a remote invite or a rejection of a remote invite.
if (
self.is_mine_id(event.state_key)
and not backfilled
and not inhibit_local_membership_updates
and event.internal_metadata.is_outlier()
and event.internal_metadata.is_out_of_band_membership()
):
Expand Down
3 changes: 2 additions & 1 deletion synapse/storage/persist_events.py
Expand Up @@ -583,7 +583,8 @@ async def _persist_event_batch(
current_state_for_room=current_state_for_room,
state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties,
backfilled=backfilled,
use_negative_stream_ordering=backfilled,
inhibit_local_membership_updates=backfilled,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are ways of grouping some of these flags together into coherent bits of functionality, I think that would be better.

#11396 (review)

Around that idea, there is a nicety around grouping up the behavior under the backfilled label.

Somewhere at the multiple top-levels when we refactor this further, we're going to have to make sure the backfilled logic is consistent.

Wdyt about having a static lookup map:

class BackfilledAttributes:
    use_negative_stream_ordering = False
    inhibit_local_membership_updates = False
    other_more_condition = True
    extraneous_more_condition = False
    different_more_condition = True

And it can be used like this. I don't know. I'm not convinced of this usage and haven't figured a good clean way for the else ... spot (whether normal events have their own NormalAttributes or just be to negate the backfill one) but it seems close. Something in this vein.

await self.persist_events_store._persist_events_and_state_updates(
    chunk,
    current_state_for_room=current_state_for_room,
    state_delta_for_room=state_delta_for_room,
    new_forward_extremeties=new_forward_extremeties,
    use_negative_stream_ordering=BackfilledAttributes.use_negative_stream_ordering if backfilled else ...,
    inhibit_local_membership_updates=BackfilledAttributes.inhibit_local_membership_updates if backfilled else ...,
    other_more_condition=BackfilledAttributes.other_more_condition if backfilled else ...,
    extraneous_more_condition=BackfilledAttributes.extraneous_more_condition if backfilled else ...,
    different_more_condition=BackfilledAttributes.different_more_condition if backfilled else ...,
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's something to be said for identifying different "classes" of event (in this case, 'backfilled' and, uh, 'normal') with a list of exactly how those classes differ in behaviour. In that case you can just pass the 'class' identifier around rather than a sea of booleans.

We do something pretty similar with room versions (see https://github.com/matrix-org/synapse/blob/v1.47.0/synapse/api/room_versions.py#L50 etc) and I don't hate it.

That said, I'm yet to be convinced that such a solution isn't over-engineered in this case - if we've only really got a few degrees of freedom here, then booleans for them might be easier. Can I reserve judgement on this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great comparison to RoomVersion 👌

Can I reserve judgement on this?

Totally fine to push this to when we have more a need. Just wanted to throw out the inkling of an idea that the original backfilled had some benefit and how we could retain some of that in this new refactor ⏩

)

await self._handle_potentially_left_users(potentially_left_users)
Expand Down