Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Faster joins: fix race where we can fail to persist an incoming event with partial state after _sync_partial_state_room clears the partial state flag for a room #12988

Closed
squahtx opened this issue Jun 8, 2022 · 6 comments · Fixed by #13100
Assignees
Labels
A-Federated-Join joins over federation generally suck T-Defect Bugs, crashes, hangs, security vulnerabilities, or other reported issues.

Comments

@squahtx
Copy link
Contributor

squahtx commented Jun 8, 2022

logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id)
if success:
logger.info("State resync complete for %s", room_id)
self._storage_controllers.state.notify_room_un_partial_stated(
room_id
)
# TODO(faster_joins) update room stats and user directory?
return
# we raced against more events arriving with partial state. Go round
# the loop again. We've already logged a warning, so no need for more.
# TODO(faster_joins): there is still a race here, whereby incoming events which raced
# with us will fail to be persisted after the call to `clear_partial_state_room` due to
# having partial state.
continue

When we are processing an incoming event while _sync_partial_state_room is running, _sync_partial_state_room may clear the partial state flag for the room before we try to persist the event with a partial state flag. This leads to a foreign key constraint failure because there's no longer a partial_state_room entry for the room.

See #12394 (comment) for an example.

@squahtx squahtx added A-Federated-Join joins over federation generally suck T-Defect Bugs, crashes, hangs, security vulnerabilities, or other reported issues. labels Jun 8, 2022
@squahtx
Copy link
Contributor Author

squahtx commented Jun 8, 2022

From discussing with Rich, our options are:

  • Add some sort of cross-worker lock (since _sync_partial_state_room may not be running on the worker persisting events).
    eg. have the event persister hold a per-room lock while persisting each event. Then when _sync_partial_state_room thinks it is done, and only when it thinks it is done, it acquires the lock, checks once more for partial state events and clears the partial state flag for the room otherwise.
  • Retry persisting the event just once, as a non-partial state event. There may be an unpleasant amount of code that needs re-running (which is the same amount of code that would need to live within the lock in the previous option).

@richvdh
Copy link
Member

richvdh commented Jun 8, 2022

This issue also relates to the TODOs in

async def clear_partial_state_room(self, room_id: str) -> bool:
# this can race with incoming events, so we watch out for FK errors.
# TODO(faster_joins): this still doesn't completely fix the race, since the persist process
# is not atomic. I fear we need an application-level lock.
# https://github.com/matrix-org/synapse/issues/12988
try:
await self.db_pool.runInteraction(
"clear_partial_state_room", self._clear_partial_state_room_txn, room_id
)
return True
except self.db_pool.engine.module.DatabaseError as e:
# TODO(faster_joins): how do we distinguish between FK errors and other errors?
# https://github.com/matrix-org/synapse/issues/12988
logger.warning(
"Exception while clearing lazy partial-state-room %s, retrying: %s",
room_id,
e,
)
return False

@squahtx
Copy link
Contributor Author

squahtx commented Jun 9, 2022

The span of code that would need to be locked or retried is:

try:
context = await self._state_handler.compute_event_context(
event,
state_ids_before_event=state_ids,
)
context = await self._check_event_auth(
origin,
event,
context,
)
except AuthError as e:
# FIXME richvdh 2021/10/07 I don't think this is reachable. Let's log it
# for now
logger.exception("Unexpected AuthError from _check_event_auth")
raise FederationError("ERROR", e.code, e.msg, affected=event.event_id)
if not backfilled and not context.rejected:
# For new (non-backfilled and non-outlier) events we check if the event
# passes auth based on the current state. If it doesn't then we
# "soft-fail" the event.
await self._check_for_soft_fail(event, state_ids, origin=origin)
await self._run_push_actions_and_persist_event(event, context, backfilled)

compute_event_context sets the partial state flag in the event context and possibly creates a new state group for the event.

FederationEventHandler._run_push_actions_and_persist_event calls FederationEventHandler.persist_events_and_notify:

async def _run_push_actions_and_persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
) -> None:
"""Run the push actions for a received event, and persist it.
Args:
event: The event itself.
context: The event context.
backfilled: True if the event was backfilled.
"""
# this method should not be called on outliers (those code paths call
# persist_events_and_notify directly.)
assert not event.internal_metadata.outlier
if not backfilled and not context.rejected:
min_depth = await self._store.get_min_depth(event.room_id)
if min_depth is None or min_depth > event.depth:
# XXX richvdh 2021/10/07: I don't really understand what this
# condition is doing. I think it's trying not to send pushes
# for events that predate our join - but that's not really what
# min_depth means, and anyway ancient events are a more general
# problem.
#
# for now I'm just going to log about it.
logger.info(
"Skipping push actions for old event with depth %s < %s",
event.depth,
min_depth,
)
else:
await self._bulk_push_rule_evaluator.action_for_event_by_user(
event, context
)
try:
await self.persist_events_and_notify(
event.room_id, [(event, context)], backfilled=backfilled
)
except Exception:
run_in_background(
self._store.remove_push_actions_from_staging, event.event_id
)
raise

Note that we need to await in the except block if we retry persisting the event, otherwise we might end up deleting the push actions from the next attempt. This'll also fix #12987.

FederationEventHandler.persist_events_and_notify calls itself over replication or calls EventsPersistenceStorageController.persist_events, which adds the event onto a queue to be persisted.

async def persist_events_and_notify(
self,
room_id: str,
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
backfilled: bool = False,
) -> int:
"""Persists events and tells the notifier/pushers about them, if
necessary.
Args:
room_id: The room ID of events being persisted.
event_and_contexts: Sequence of events with their associated
context that should be persisted. All events must belong to
the same room.
backfilled: Whether these events are a result of
backfilling or not
Returns:
The stream ID after which all events have been persisted.
"""
if not event_and_contexts:
return self._store.get_room_max_stream_ordering()
instance = self._config.worker.events_shard_config.get_instance(room_id)
if instance != self._instance_name:
# Limit the number of events sent over replication. We choose 200
# here as that is what we default to in `max_request_body_size(..)`
for batch in batch_iter(event_and_contexts, 200):
result = await self._send_events(
instance_name=instance,
store=self._store,
room_id=room_id,
event_and_contexts=batch,
backfilled=backfilled,
)
return result["max_stream_id"]
else:
assert self._storage_controllers.persistence
# Note that this returns the events that were persisted, which may not be
# the same as were passed in if some were deduplicated due to transaction IDs.
(
events,
max_stream_token,
) = await self._storage_controllers.persistence.persist_events(
event_and_contexts, backfilled=backfilled
)

If we want to retry things, there is an annoying amount of code that we have to thread the we-lost-the-partial-state-race exception through, including a replication call.

@squahtx
Copy link
Contributor Author

squahtx commented Jun 10, 2022

Actually it's not just incoming events that are racy. I think it's every code path which can compute event context with partial state, before persisting the event?

@squahtx
Copy link
Contributor Author

squahtx commented Jun 15, 2022

I've mapped out the code paths which persist events:
graphviz

graphviz source
digraph G {
  node [shape=box]
  rankdir=LR;
  color=gray;

  subgraph cluster_Federation {
    label="Federation";
    "RoomMemberHandler.update_membership_locked" -> "RoomMemberMasterHandler._remote_join"
    "RoomMemberHandler.update_membership_locked" -> "RoomMemberWorkerHandler._remote_join"
    
    subgraph cluster_RemoteJoinReplication {
      label="Replication"
      "RoomMemberMasterHandler._remote_join" [label="RoomMemberMasterHandler._remote_join\nSeems to do stuff that the worker path doesnt?"]
    
      "RoomMemberWorkerHandler._remote_join" -> "RoomMemberWorkerHandler._remote_join_client" [constraint=false]
      "RoomMemberWorkerHandler._remote_join_client" -> "ReplicationRemoteJoinRestServlet._handle_request" [constraint=false]
    }
    "RoomMemberMasterHandler._remote_join" -> "FederationHandler.do_invite_join"
    "ReplicationRemoteJoinRestServlet._handle_request" -> "FederationHandler.do_invite_join"
    
    "FederationHandler.do_invite_join" -> "FederationEventHandler.process_remote_join"
    "FederationEventHandler.process_remote_join" [style=filled,fillcolor=orange]
    "FederationEventHandler.process_remote_join" -> "FederationEventHandler.persist_events_and_notify"
    "FederationEventHandler._auth_and_persist_outliers_inner" [style=filled,fillcolor=palegreen,label="FederationEventHandler._auth_and_persist_outliers_inner\nOutlier, never partial"]
    "FederationEventHandler._auth_and_persist_outliers_inner" -> "FederationEventHandler.persist_events_and_notify"
    "FederationServer._on_send_membership_event" -> "FederationEventHandler.on_send_membership_event"
    "FederationEventHandler.on_send_membership_event" [style=filled,fillcolor=orange]
    "FederationEventHandler.on_send_membership_event" -> "FederationEventHandler._run_push_actions_and_persist_event"
    
    "FederationServer._process_incoming_pdus_in_room_inner" -> "FederationEventHandler.on_receive_pdu" 
    "FederationHandler.do_invite_join" -> "FederationHandler._handle_queued_pdus" [constraint=false]
    "FederationHandler._handle_queued_pdus" [label="FederationHandler._handle_queued_pdus\nPDUs that were received while a join was ongoing."]
    "FederationHandler._handle_queued_pdus" -> "FederationEventHandler.on_receive_pdu"
    "FederationEventHandler.on_receive_pdu" [label="FederationEventHandler.on_receive_pdu\nQueues PDUs for later if a join is ongoing."]
    "FederationEventHandler.on_receive_pdu" -> "FederationEventHandler._process_received_pdu"
    "FederationEventHandler.on_receive_pdu" -> "FederationHandler._handle_queued_pdus" [style="dotted",constraint=false]
    "FederationEventHandler._process_pulled_event" [style=filled,fillcolor=palegreen,label="FederationEventHandler._process_pulled_event\nProvides `state_ids`, can never be partial."]
    "FederationEventHandler._process_pulled_event" -> "FederationEventHandler._process_received_pdu"
    
    "FederationEventHandler._process_received_pdu" [style=filled,fillcolor=orange,label="FederationEventHandler._process_received_pdu\nEvent can have partial state unless `state_ids` is provided."]
    "FederationEventHandler._process_received_pdu" -> "FederationEventHandler._run_push_actions_and_persist_event"
    "FederationEventHandler._run_push_actions_and_persist_event" -> "FederationEventHandler.persist_events_and_notify"
    
    "FederationHandler.do_knock" [style=filled,fillcolor=palegreen,label="FederationHandler.do_knock\nOutlier, never partial"]
    "FederationHandler.do_knock" -> "FederationEventHandler.persist_events_and_notify"
    "FederationHandler.on_invite_request" [style=filled,fillcolor=palegreen,label="FederationHandler.on_invite_request\nOutlier, never partial"]
    "FederationHandler.on_invite_request" -> "FederationEventHandler.persist_events_and_notify"
    "FederationHandler.do_remotely_reject_invite" [style=filled,fillcolor=palegreen,label="FederationHandler.do_remotely_reject_invite\nOutlier, never partial"]
    "FederationHandler.do_remotely_reject_invite" -> "FederationEventHandler.persist_events_and_notify"
  
    subgraph cluster_Federation_Replication {
      label="Replication"
      "FederationEventHandler.persist_events_and_notify" -> "FederationEventHandler._send_events" [constraint=false]
      "FederationEventHandler._send_events" -> "ReplicationFederationSendEventsRestServlet._handle_request" [constraint=false]
      "ReplicationFederationSendEventsRestServlet._handle_request" -> "FederationEventHandler.persist_events_and_notify" [constraint=false]
    }
  
    "FederationEventHandler.persist_events_and_notify" -> "EventsPersistenceStorageController.persist_events"
    "EventsPersistenceStorageController.persist_events" -> "enqueue"
  }

  "enqueue" -> "_event_persist_queue.add_to_queue"
  
  subgraph cluster_Client {
    label="Client"
    "EventCreationHandler.create_and_send_nonmember_event" [style=filled,fillcolor=orange]
    "EventCreationHandler.create_and_send_nonmember_event" -> "EventCreationHandler.handle_new_client_event"
    "EventCreationHandler._send_dummy_event_for_room" [style=filled,fillcolor=orange]
    "EventCreationHandler._send_dummy_event_for_room" -> "EventCreationHandler.handle_new_client_event"
    "RoomBatchHandler.persist_historical_events" [style=filled,fillcolor=orange]
    "RoomBatchHandler.persist_historical_events" -> "EventCreationHandler.handle_new_client_event"
    "RoomMemberHandler._local_membership_update" [style=filled,fillcolor=orange]
    "RoomMemberHandler._local_membership_update" -> "EventCreationHandler.handle_new_client_event"
    "FederationHandler.exchange_third_party_invite" [style=filled,fillcolor=orange]
    "FederationHandler.exchange_third_party_invite" -> "RoomMemberHandler.send_membership_event"
    "FederationHandler.on_exchange_third_party_invite_request" [style=filled,fillcolor=orange]
    "FederationHandler.on_exchange_third_party_invite_request" -> "RoomMemberHandler.send_membership_event"
    "RoomMemberHandler.send_membership_event" -> "EventCreationHandler.handle_new_client_event"
    "RoomMemberHandler._generate_local_out_of_band_leave" [style=filled,fillcolor=orange]
    "RoomMemberHandler._generate_local_out_of_band_leave" -> "EventCreationHandler.handle_new_client_event"
    "RoomCreationHandler.upgrade_room" [style=filled,fillcolor=orange]
    "RoomCreationHandler.upgrade_room" -> "_upgrade_response_cache.wrap"
    "_upgrade_response_cache.wrap" -> "RoomCreationHandler._upgrade_room"
    "RoomCreationHandler._upgrade_room" -> "EventCreationHandler.handle_new_client_event"
  
    "EventCreationHandler.handle_new_client_event" -> "EventCreationHandler._persist_event"
  
    subgraph cluster_Client_Replication {
      label="Replication"
      "EventCreationHandler._persist_event" -> "EventCreationHandler.send_event" [constraint=false]
      "EventCreationHandler.send_event" -> "ReplicationSendEventRestServlet._handle_request" [constraint=false]
      "ReplicationSendEventRestServlet._handle_request" -> "EventCreationHandler._persist_event" [constraint=false]
    }

    "EventCreationHandler._persist_event" -> "EventCreationHandler.persist_and_notify_client_event"
    "EventCreationHandler.persist_and_notify_client_event" [label="EventCreationHandler.persist_and_notify_client_event\nMessage rate limiting is applied here."]
    "EventCreationHandler.persist_and_notify_client_event" -> "EventsPersistenceStorageController.persist_event"
  }
  "EventsPersistenceStorageController.persist_event" -> "_event_persist_queue.add_to_queue" [constraint=false]

  subgraph cluster__EventPersistenceQueue {
    label="_EventPersistenceQueue";
    labeljust=left;
    "_event_persist_queue.add_to_queue" -> "EventsPersistenceStorageController._persist_event_batch" [constraint=false]
    "EventsPersistenceStorageController._persist_event_batch" -> "PersistEventsStore._persist_events_and_state_updates" [constraint=false]
  }

  "PersistEventsStore._persist_events_and_state_updates" -> "PersistEventsStore._persist_events_txn"

  subgraph cluster_Database {
    label="Database";
    "PersistEventsStore._persist_events_txn" -> "PersistEventsStore._update_outliers_txn" [constraint=false]
    "PersistEventsStore._persist_events_txn" -> "PersistEventsStore._store_event_state_mappings_txn" [constraint=false]
    "PersistEventsStore._update_outliers_txn" -> "PersistEventsStore._store_event_state_mappings_txn" [constraint=false]
  }
}

Orange functions create event contexts that may have partial state.
Green functions create outlier contexts, which should never be flagged as partial (?).
_store_event_state_mappings_txn in the bottom right is where we detect the race, as a foreign key constraint failure.

I'm tempted to make only the federation paths work for now and have all the client paths raise a 503 Service Unavailable, which ought to be retryable. Whether clients will retry their requests is another matter.

@richvdh
Copy link
Member

richvdh commented Jun 16, 2022

I'm tempted to make only the federation paths work for now and have all the client paths raise a 503 Service Unavailable, which ought to be retryable. Whether clients will retry their requests is another matter.

We can probably do a fairly similar trick in _process_incoming_pdus_in_room_inner for events received via /send. If you raise a dedicated exception type from the depths of the event persistence, you can catch it in _process_incoming_pdus_in_room_inner, and just go round the loop again?

squahtx added a commit that referenced this issue Jul 5, 2022
…#13100)

Whenever we want to persist an event, we first compute an event context,
which includes the state at the event and a flag indicating whether the
state is partial. After a lot of processing, we finally try to store the
event in the database, which can fail for partial state events when the
containing room has been un-partial stated in the meantime.

We detect the race as a foreign key constraint failure in the data store
layer and turn it into a special `PartialStateConflictError` exception,
which makes its way up to the method in which we computed the event
context.

To make things difficult, the exception needs to cross a replication
request: `/fed_send_events` for events coming over federation and
`/send_event` for events from clients. We transport the
`PartialStateConflictError` as a `409 Conflict` over replication and
turn `409`s back into `PartialStateConflictError`s on the worker making
the request.

All client events go through
`EventCreationHandler.handle_new_client_event`, which is called in
*a lot* of places. Instead of trying to update all the code which
creates client events, we turn the `PartialStateConflictError` into a
`429 Too Many Requests` in
`EventCreationHandler.handle_new_client_event` and hope that clients
take it as a hint to retry their request.

On the federation event side, there are 7 places which compute event
contexts. 4 of them use outlier event contexts:
`FederationEventHandler._auth_and_persist_outliers_inner`,
`FederationHandler.do_knock`, `FederationHandler.on_invite_request` and
`FederationHandler.do_remotely_reject_invite`. These events won't have
the partial state flag, so we do not need to do anything for then.

The remaining 3 paths which create events are
`FederationEventHandler.process_remote_join`,
`FederationEventHandler.on_send_membership_event` and
`FederationEventHandler._process_received_pdu`.

We can't experience the race in `process_remote_join`, unless we're
handling an additional join into a partial state room, which currently
blocks, so we make no attempt to handle it correctly.

`on_send_membership_event` is only called by
`FederationServer._on_send_membership_event`, so we catch the
`PartialStateConflictError` there and retry just once.

`_process_received_pdu` is called by `on_receive_pdu` for incoming
events and `_process_pulled_event` for backfill. The latter should never
try to persist partial state events, so we ignore it. We catch the
`PartialStateConflictError` in `on_receive_pdu` and retry just once.

Refering to the graph of code paths in
#12988 (comment)
may make the above make more sense.

Signed-off-by: Sean Quah <seanq@matrix.org>
@squahtx squahtx closed this as completed Jul 5, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
A-Federated-Join joins over federation generally suck T-Defect Bugs, crashes, hangs, security vulnerabilities, or other reported issues.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants