Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Reject concurrent transactions (#9597)
Browse files Browse the repository at this point in the history
If more transactions arrive from an origin while we're still processing the
first one, reject them.

Hopefully a quick fix to #9489
  • Loading branch information
richvdh committed Mar 12, 2021
1 parent 2b328d7 commit 1e67bff
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 35 deletions.
1 change: 1 addition & 0 deletions changelog.d/9597.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.20 which caused incoming federation transactions to stack up, causing slow recovery from outages.
77 changes: 42 additions & 35 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ def __init__(self, hs: "HomeServer"):
# with FederationHandlerRegistry.
hs.get_directory_handler()

self._federation_ratelimiter = hs.get_federation_ratelimiter()

self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")

# origins that we are currently processing a transaction from.
# a dict from origin to txn id.
self._active_transactions = {} # type: Dict[str, str]

# We cache results for transaction with the same ID
self._transaction_resp_cache = ResponseCache(
Expand Down Expand Up @@ -169,6 +170,33 @@ async def on_incoming_transaction(

logger.debug("[%s] Got transaction", transaction_id)

# Reject malformed transactions early: reject if too many PDUs/EDUs
if len(transaction.pdus) > 50 or ( # type: ignore
hasattr(transaction, "edus") and len(transaction.edus) > 100 # type: ignore
):
logger.info("Transaction PDU or EDU count too large. Returning 400")
return 400, {}

# we only process one transaction from each origin at a time. We need to do
# this check here, rather than in _on_incoming_transaction_inner so that we
# don't cache the rejection in _transaction_resp_cache (so that if the txn
# arrives again later, we can process it).
current_transaction = self._active_transactions.get(origin)
if current_transaction and current_transaction != transaction_id:
logger.warning(
"Received another txn %s from %s while still processing %s",
transaction_id,
origin,
current_transaction,
)
return 429, {
"errcode": Codes.UNKNOWN,
"error": "Too many concurrent transactions",
}

# CRITICAL SECTION: we must now not await until we populate _active_transactions
# in _on_incoming_transaction_inner.

# We wrap in a ResponseCache so that we de-duplicate retried
# transactions.
return await self._transaction_resp_cache.wrap(
Expand All @@ -182,26 +210,18 @@ async def on_incoming_transaction(
async def _on_incoming_transaction_inner(
self, origin: str, transaction: Transaction, request_time: int
) -> Tuple[int, Dict[str, Any]]:
# Use a linearizer to ensure that transactions from a remote are
# processed in order.
with await self._transaction_linearizer.queue(origin):
# We rate limit here *after* we've queued up the incoming requests,
# so that we don't fill up the ratelimiter with blocked requests.
#
# This is important as the ratelimiter allows N concurrent requests
# at a time, and only starts ratelimiting if there are more requests
# than that being processed at a time. If we queued up requests in
# the linearizer/response cache *after* the ratelimiting then those
# queued up requests would count as part of the allowed limit of N
# concurrent requests.
with self._federation_ratelimiter.ratelimit(origin) as d:
await d

result = await self._handle_incoming_transaction(
origin, transaction, request_time
)
# CRITICAL SECTION: the first thing we must do (before awaiting) is
# add an entry to _active_transactions.
assert origin not in self._active_transactions
self._active_transactions[origin] = transaction.transaction_id # type: ignore

return result
try:
result = await self._handle_incoming_transaction(
origin, transaction, request_time
)
return result
finally:
del self._active_transactions[origin]

async def _handle_incoming_transaction(
self, origin: str, transaction: Transaction, request_time: int
Expand All @@ -227,19 +247,6 @@ async def _handle_incoming_transaction(

logger.debug("[%s] Transaction is new", transaction.transaction_id) # type: ignore

# Reject if PDU count > 50 or EDU count > 100
if len(transaction.pdus) > 50 or ( # type: ignore
hasattr(transaction, "edus") and len(transaction.edus) > 100 # type: ignore
):

logger.info("Transaction PDU or EDU count too large. Returning 400")

response = {}
await self.transaction_actions.set_response(
origin, transaction, 400, response
)
return 400, response

# We process PDUs and EDUs in parallel. This is important as we don't
# want to block things like to device messages from reaching clients
# behind the potentially expensive handling of PDUs.
Expand Down

0 comments on commit 1e67bff

Please sign in to comment.