|
|
@@ -14,6 +14,7 @@ |
|
|
# limitations under the License.
|
|
|
|
|
|
"""Contains handlers for federation events."""
|
|
|
+import synapse.util.logcontext
|
|
|
from signedjson.key import decode_verify_key_bytes
|
|
|
from signedjson.sign import verify_signed_json
|
|
|
from unpaddedbase64 import decode_base64
|
|
|
@@ -31,7 +32,7 @@ |
|
|
)
|
|
|
from synapse.util.metrics import measure_func
|
|
|
from synapse.util.logutils import log_function
|
|
|
-from synapse.util.async import run_on_reactor
|
|
|
+from synapse.util.async import run_on_reactor, Linearizer
|
|
|
from synapse.util.frozenutils import unfreeze
|
|
|
from synapse.crypto.event_signing import (
|
|
|
compute_event_signature, add_hashes_and_signatures,
|
|
|
@@ -79,29 +80,216 @@ def __init__(self, hs): |
|
|
|
|
|
# When joining a room we need to queue any events for that room up
|
|
|
self.room_queues = {}
|
|
|
+ self._room_pdu_linearizer = Linearizer("fed_room_pdu")
|
|
|
|
|
|
- @log_function
|
|
|
@defer.inlineCallbacks
|
|
|
- def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None):
|
|
|
- """ Called by the ReplicationLayer when we have a new pdu. We need to
|
|
|
- do auth checks and put it through the StateHandler.
|
|
|
+ @log_function
|
|
|
+ def on_receive_pdu(self, origin, pdu, get_missing=True):
|
|
|
+ """ Process a PDU received via a federation /send/ transaction, or
|
|
|
+ via backfill of missing prev_events
|
|
|
+
|
|
|
+ Args:
|
|
|
+ origin (str): server which initiated the /send/ transaction. Will
|
|
|
+ be used to fetch missing events or state.
|
|
|
+ pdu (FrozenEvent): received PDU
|
|
|
+ get_missing (bool): True if we should fetch missing prev_events
|
|
|
|
|
|
- auth_chain and state are None if we already have the necessary state
|
|
|
- and prev_events in the db
|
|
|
+ Returns (Deferred): completes with None
|
|
|
"""
|
|
|
- event = pdu
|
|
|
|
|
|
- logger.debug("Got event: %s", event.event_id)
|
|
|
+ # We reprocess pdus when we have seen them only as outliers
|
|
|
+ existing = yield self.get_persisted_pdu(
|
|
|
+ origin, pdu.event_id, do_auth=False
|
|
|
+ )
|
|
|
+
|
|
|
+ # FIXME: Currently we fetch an event again when we already have it
|
|
|
+ # if it has been marked as an outlier.
|
|
|
+
|
|
|
+ already_seen = (
|
|
|
+ existing and (
|
|
|
+ not existing.internal_metadata.is_outlier()
|
|
|
+ or pdu.internal_metadata.is_outlier()
|
|
|
+ )
|
|
|
+ )
|
|
|
+ if already_seen:
|
|
|
+ logger.debug("Already seen pdu %s", pdu.event_id)
|
|
|
+ return
|
|
|
|
|
|
# If we are currently in the process of joining this room, then we
|
|
|
# queue up events for later processing.
|
|
|
- if event.room_id in self.room_queues:
|
|
|
- self.room_queues[event.room_id].append((pdu, origin))
|
|
|
+ if pdu.room_id in self.room_queues:
|
|
|
+ logger.info("Ignoring PDU %s for room %s from %s for now; join "
|
|
|
+ "in progress", pdu.event_id, pdu.room_id, origin)
|
|
|
+ self.room_queues[pdu.room_id].append((pdu, origin))
|
|
|
return
|
|
|
|
|
|
- logger.debug("Processing event: %s", event.event_id)
|
|
|
+ state = None
|
|
|
+
|
|
|
+ auth_chain = []
|
|
|
+
|
|
|
+ have_seen = yield self.store.have_events(
|
|
|
+ [ev for ev, _ in pdu.prev_events]
|
|
|
+ )
|
|
|
+
|
|
|
+ fetch_state = False
|
|
|
+
|
|
|
+ # Get missing pdus if necessary.
|
|
|
+ if not pdu.internal_metadata.is_outlier():
|
|
|
+ # We only backfill backwards to the min depth.
|
|
|
+ min_depth = yield self.get_min_depth_for_context(
|
|
|
+ pdu.room_id
|
|
|
+ )
|
|
|
+
|
|
|
+ logger.debug(
|
|
|
+ "_handle_new_pdu min_depth for %s: %d",
|
|
|
+ pdu.room_id, min_depth
|
|
|
+ )
|
|
|
+
|
|
|
+ prevs = {e_id for e_id, _ in pdu.prev_events}
|
|
|
+ seen = set(have_seen.keys())
|
|
|
+
|
|
|
+ if min_depth and pdu.depth < min_depth:
|
|
|
+ # This is so that we don't notify the user about this
|
|
|
+ # message, to work around the fact that some events will
|
|
|
+ # reference really really old events we really don't want to
|
|
|
+ # send to the clients.
|
|
|
+ pdu.internal_metadata.outlier = True
|
|
|
+ elif min_depth and pdu.depth > min_depth:
|
|
|
+ if get_missing and prevs - seen:
|
|
|
+ # If we're missing stuff, ensure we only fetch stuff one
|
|
|
+ # at a time.
|
|
|
+ logger.info(
|
|
|
+ "Acquiring lock for room %r to fetch %d missing events: %r...",
|
|
|
+ pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
|
|
|
+ )
|
|
|
+ with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
|
|
|
+ logger.info(
|
|
|
+ "Acquired lock for room %r to fetch %d missing events",
|
|
|
+ pdu.room_id, len(prevs - seen),
|
|
|
+ )
|
|
|
+
|
|
|
+ yield self._get_missing_events_for_pdu(
|
|
|
+ origin, pdu, prevs, min_depth
|
|
|
+ )
|
|
|
+
|
|
|
+ prevs = {e_id for e_id, _ in pdu.prev_events}
|
|
|
+ seen = set(have_seen.keys())
|
|
|
+ if prevs - seen:
|
|
|
+ logger.info(
|
|
|
+ "Still missing %d events for room %r: %r...",
|
|
|
+ len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
|
|
|
+ )
|
|
|
+ fetch_state = True
|
|
|
+
|
|
|
+ if fetch_state:
|
|
|
+ # We need to get the state at this event, since we haven't
|
|
|
+ # processed all the prev events.
|
|
|
+ logger.debug(
|
|
|
+ "_handle_new_pdu getting state for %s",
|
|
|
+ pdu.room_id
|
|
|
+ )
|
|
|
+ try:
|
|
|
+ state, auth_chain = yield self.replication_layer.get_state_for_room(
|
|
|
+ origin, pdu.room_id, pdu.event_id,
|
|
|
+ )
|
|
|
+ except:
|
|
|
+ logger.exception("Failed to get state for event: %s", pdu.event_id)
|
|
|
+
|
|
|
+ yield self._process_received_pdu(
|
|
|
+ origin,
|
|
|
+ pdu,
|
|
|
+ state=state,
|
|
|
+ auth_chain=auth_chain,
|
|
|
+ )
|
|
|
+
|
|
|
+ @defer.inlineCallbacks
|
|
|
+ def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
|
|
|
+ """
|
|
|
+ Args:
|
|
|
+ origin (str): Origin of the pdu. Will be called to get the missing events
|
|
|
+ pdu: received pdu
|
|
|
+ prevs (str[]): List of event ids which we are missing
|
|
|
+ min_depth (int): Minimum depth of events to return.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Deferred<dict(str, str?)>: updated have_seen dictionary
|
|
|
+ """
|
|
|
+ # We recalculate seen, since it may have changed.
|
|
|
+ have_seen = yield self.store.have_events(prevs)
|
|
|
+ seen = set(have_seen.keys())
|
|
|
+
|
|
|
+ if not prevs - seen:
|
|
|
+ # nothing left to do
|
|
|
+ defer.returnValue(have_seen)
|
|
|
+
|
|
|
+ latest = yield self.store.get_latest_event_ids_in_room(
|
|
|
+ pdu.room_id
|
|
|
+ )
|
|
|
+
|
|
|
+ # We add the prev events that we have seen to the latest
|
|
|
+ # list to ensure the remote server doesn't give them to us
|
|
|
+ latest = set(latest)
|
|
|
+ latest |= seen
|
|
|
+
|
|
|
+ logger.info(
|
|
|
+ "Missing %d events for room %r: %r...",
|
|
|
+ len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
|
|
|
+ )
|
|
|
+
|
|
|
+ # XXX: we set timeout to 10s to help workaround
|
|
|
+ # https://github.com/matrix-org/synapse/issues/1733.
|
|
|
+ # The reason is to avoid holding the linearizer lock
|
|
|
+ # whilst processing inbound /send transactions, causing
|
|
|
+ # FDs to stack up and block other inbound transactions
|
|
|
+ # which empirically can currently take up to 30 minutes.
|
|
|
+ #
|
|
|
+ # N.B. this explicitly disables retry attempts.
|
|
|
+ #
|
|
|
+ # N.B. this also increases our chances of falling back to
|
|
|
+ # fetching fresh state for the room if the missing event
|
|
|
+ # can't be found, which slightly reduces our security.
|
|
|
+ # it may also increase our DAG extremity count for the room,
|
|
|
+ # causing additional state resolution? See #1760.
|
|
|
+ # However, fetching state doesn't hold the linearizer lock
|
|
|
+ # apparently.
|
|
|
+ #
|
|
|
+ # see https://github.com/matrix-org/synapse/pull/1744
|
|
|
+
|
|
|
+ missing_events = yield self.replication_layer.get_missing_events(
|
|
|
+ origin,
|
|
|
+ pdu.room_id,
|
|
|
+ earliest_events_ids=list(latest),
|
|
|
+ latest_events=[pdu],
|
|
|
+ limit=10,
|
|
|
+ min_depth=min_depth,
|
|
|
+ timeout=10000,
|
|
|
+ )
|
|
|
+
|
|
|
+ # We want to sort these by depth so we process them and
|
|
|
+ # tell clients about them in order.
|
|
|
+ missing_events.sort(key=lambda x: x.depth)
|
|
|
+
|
|
|
+ for e in missing_events:
|
|
|
+ yield self.on_receive_pdu(
|
|
|
+ origin,
|
|
|
+ e,
|
|
|
+ get_missing=False
|
|
|
+ )
|
|
|
|
|
|
- logger.debug("Event: %s", event)
|
|
|
+ have_seen = yield self.store.have_events(
|
|
|
+ [ev for ev, _ in pdu.prev_events]
|
|
|
+ )
|
|
|
+ defer.returnValue(have_seen)
|
|
|
+
|
|
|
+ @log_function
|
|
|
+ @defer.inlineCallbacks
|
|
|
+ def _process_received_pdu(self, origin, pdu, state, auth_chain):
|
|
|
+ """ Called when we have a new pdu. We need to do auth checks and put it
|
|
|
+ through the StateHandler.
|
|
|
+ """
|
|
|
+ event = pdu
|
|
|
+
|
|
|
+ logger.debug("Processing event: %s", event)
|
|
|
|
|
|
# FIXME (erikj): Awful hack to make the case where we are not currently
|
|
|
# in the room work
|
|
|
@@ -670,8 +858,6 @@ def do_invite_join(self, target_hosts, room_id, joinee, content): |
|
|
"""
|
|
|
logger.debug("Joining %s to %s", joinee, room_id)
|
|
|
|
|
|
- yield self.store.clean_room_for_join(room_id)
|
|
|
-
|
|
|
origin, event = yield self._make_and_verify_event(
|
|
|
target_hosts,
|
|
|
room_id,
|
|
|
@@ -680,7 +866,15 @@ def do_invite_join(self, target_hosts, room_id, joinee, content): |
|
|
content,
|
|
|
)
|
|
|
|
|
|
+ # This shouldn't happen, because the RoomMemberHandler has a
|
|
|
+ # linearizer lock which only allows one operation per user per room
|
|
|
+ # at a time - so this is just paranoia.
|
|
|
+ assert (room_id not in self.room_queues)
|
|
|
+
|
|
|
self.room_queues[room_id] = []
|
|
|
+
|
|
|
+ yield self.store.clean_room_for_join(room_id)
|
|
|
+
|
|
|
handled_events = set()
|
|
|
|
|
|
try:
|
|
|
@@ -733,18 +927,37 @@ def do_invite_join(self, target_hosts, room_id, joinee, content): |
|
|
room_queue = self.room_queues[room_id]
|
|
|
del self.room_queues[room_id]
|
|
|
|
|
|
- for p, origin in room_queue:
|
|
|
- if p.event_id in handled_events:
|
|
|
- continue
|
|
|
+ # we don't need to wait for the queued events to be processed -
|
|
|
+ # it's just a best-effort thing at this point. We do want to do
|
|
|
+ # them roughly in order, though, otherwise we'll end up making
|
|
|
+ # lots of requests for missing prev_events which we do actually
|
|
|
+ # have. Hence we fire off the deferred, but don't wait for it.
|
|
|
|
|
|
- try:
|
|
|
- self.on_receive_pdu(origin, p)
|
|
|
- except:
|
|
|
- logger.exception("Couldn't handle pdu")
|
|
|
+ synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)(
|
|
|
+ room_queue
|
|
|
+ )
|
|
|
|
|
|
defer.returnValue(True)
|
|
|
|
|
|
@defer.inlineCallbacks
|
|
|
+ def _handle_queued_pdus(self, room_queue):
|
|
|
+ """Process PDUs which got queued up while we were busy send_joining.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ room_queue (list[FrozenEvent, str]): list of PDUs to be processed
|
|
|
+ and the servers that sent them
|
|
|
+ """
|
|
|
+ for p, origin in room_queue:
|
|
|
+ try:
|
|
|
+ logger.info("Processing queued PDU %s which was received "
|
|
|
+ "while we were joining %s", p.event_id, p.room_id)
|
|
|
+ yield self.on_receive_pdu(origin, p)
|
|
|
+ except Exception as e:
|
|
|
+ logger.warn(
|
|
|
+ "Error handling queued PDU %s from %s: %s",
|
|
|
+ p.event_id, origin, e)
|
|
|
+
|
|
|
+ @defer.inlineCallbacks
|
|
|
@log_function
|
|
|
def on_make_join_request(self, room_id, user_id):
|
|
|
""" We've received a /make_join/ request, so we create a partial
|
|
|
@@ -1325,7 +1538,17 @@ def _persist_auth_tree(self, origin, auth_events, state, event): |
|
|
|
|
|
@defer.inlineCallbacks
|
|
|
def _prep_event(self, origin, event, state=None, auth_events=None):
|
|
|
+ """
|
|
|
+
|
|
|
+ Args:
|
|
|
+ origin:
|
|
|
+ event:
|
|
|
+ state:
|
|
|
+ auth_events:
|
|
|
|
|
|
+ Returns:
|
|
|
+ Deferred, which resolves to synapse.events.snapshot.EventContext
|
|
|
+ """
|
|
|
context = yield self.state_handler.compute_event_context(
|
|
|
event, old_state=state,
|
|
|
)
|
|
|
|