Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix spinloop during partial state sync when a prev event is in backoff (
Browse files Browse the repository at this point in the history
#15351)

Previously, we would spin in a tight loop until
`update_state_for_partial_state_event` stopped raising
`FederationPullAttemptBackoffError`s. Replace the spinloop with a wait
until the backoff period has expired.

Signed-off-by: Sean Quah <seanq@matrix.org>
  • Loading branch information
squahtx committed Mar 30, 2023
1 parent a3bad89 commit d9f6949
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 47 deletions.
1 change: 1 addition & 0 deletions changelog.d/15351.bugfix
@@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.70.0 where the background sync from a faster join could spin for hours when one of the events involved had been marked for backoff.
17 changes: 13 additions & 4 deletions synapse/api/errors.py
Expand Up @@ -27,7 +27,7 @@

if typing.TYPE_CHECKING:
from synapse.config.homeserver import HomeServerConfig
from synapse.types import JsonDict
from synapse.types import JsonDict, StrCollection

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -682,18 +682,27 @@ class FederationPullAttemptBackoffError(RuntimeError):
Attributes:
event_id: The event_id which we are refusing to pull
message: A custom error message that gives more context
retry_after_ms: The remaining backoff interval, in milliseconds
"""

def __init__(self, event_ids: List[str], message: Optional[str]):
self.event_ids = event_ids
def __init__(
self, event_ids: "StrCollection", message: Optional[str], retry_after_ms: int
):
event_ids = list(event_ids)

if message:
error_message = message
else:
error_message = f"Not attempting to pull event_ids={self.event_ids} because we already tried to pull them recently (backing off)."
error_message = (
f"Not attempting to pull event_ids={event_ids} because we already "
"tried to pull them recently (backing off)."
)

super().__init__(error_message)

self.event_ids = event_ids
self.retry_after_ms = retry_after_ms


class HttpResponseException(CodeMessageException):
"""
Expand Down
36 changes: 18 additions & 18 deletions synapse/handlers/federation.py
Expand Up @@ -1949,27 +1949,25 @@ async def _sync_partial_state_room(
)
for event in events:
for attempt in itertools.count():
# We try a new destination on every iteration.
try:
await self._federation_event_handler.update_state_for_partial_state_event(
destination, event
)
while True:
try:
await self._federation_event_handler.update_state_for_partial_state_event(
destination, event
)
break
except FederationPullAttemptBackoffError as e:
# We are in the backoff period for one of the event's
# prev_events. Wait it out and try again after.
logger.warning(
"%s; waiting for %d ms...", e, e.retry_after_ms
)
await self.clock.sleep(e.retry_after_ms / 1000)

# Success, no need to try the rest of the destinations.
break
except FederationPullAttemptBackoffError as exc:
# Log a warning about why we failed to process the event (the error message
# for `FederationPullAttemptBackoffError` is pretty good)
logger.warning("_sync_partial_state_room: %s", exc)
# We do not record a failed pull attempt when we backoff fetching a missing
# `prev_event` because not being able to fetch the `prev_events` just means
# we won't be able to de-outlier the pulled event. But we can still use an
# `outlier` in the state/auth chain for another event. So we shouldn't stop
# a downstream event from trying to pull it.
#
# This avoids a cascade of backoff for all events in the DAG downstream from
# one event backoff upstream.
except FederationError as e:
# TODO: We should `record_event_failed_pull_attempt` here,
# see https://github.com/matrix-org/synapse/issues/13700

if attempt == len(destinations) - 1:
# We have tried every remote server for this event. Give up.
# TODO(faster_joins) giving up isn't the right thing to do
Expand All @@ -1986,6 +1984,8 @@ async def _sync_partial_state_room(
destination,
e,
)
# TODO: We should `record_event_failed_pull_attempt` here,
# see https://github.com/matrix-org/synapse/issues/13700
raise

# Try the next remote server.
Expand Down
24 changes: 17 additions & 7 deletions synapse/handlers/federation_event.py
Expand Up @@ -140,6 +140,7 @@ class FederationEventHandler:
"""

def __init__(self, hs: "HomeServer"):
self._clock = hs.get_clock()
self._store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self._state_storage_controller = self._storage_controllers.state
Expand Down Expand Up @@ -1038,8 +1039,8 @@ async def _compute_event_context_with_maybe_missing_prevs(
Raises:
FederationPullAttemptBackoffError if we are are deliberately not attempting
to pull the given event over federation because we've already done so
recently and are backing off.
to pull one of the given event's `prev_event`s over federation because
we've already done so recently and are backing off.
FederationError if we fail to get the state from the remote server after any
missing `prev_event`s.
"""
Expand All @@ -1053,13 +1054,22 @@ async def _compute_event_context_with_maybe_missing_prevs(
# If we've already recently attempted to pull this missing event, don't
# try it again so soon. Since we have to fetch all of the prev_events, we can
# bail early here if we find any to ignore.
prevs_to_ignore = await self._store.get_event_ids_to_not_pull_from_backoff(
room_id, missing_prevs
prevs_with_pull_backoff = (
await self._store.get_event_ids_to_not_pull_from_backoff(
room_id, missing_prevs
)
)
if len(prevs_to_ignore) > 0:
if len(prevs_with_pull_backoff) > 0:
raise FederationPullAttemptBackoffError(
event_ids=prevs_to_ignore,
message=f"While computing context for event={event_id}, not attempting to pull missing prev_event={prevs_to_ignore[0]} because we already tried to pull recently (backing off).",
event_ids=prevs_with_pull_backoff.keys(),
message=(
f"While computing context for event={event_id}, not attempting to "
f"pull missing prev_events={list(prevs_with_pull_backoff.keys())} "
"because we already tried to pull recently (backing off)."
),
retry_after_ms=(
max(prevs_with_pull_backoff.values()) - self._clock.time_msec()
),
)

if not missing_prevs:
Expand Down
35 changes: 21 additions & 14 deletions synapse/storage/databases/main/event_federation.py
Expand Up @@ -1544,7 +1544,7 @@ async def get_event_ids_to_not_pull_from_backoff(
self,
room_id: str,
event_ids: Collection[str],
) -> List[str]:
) -> Dict[str, int]:
"""
Filter down the events to ones that we've failed to pull before recently. Uses
exponential backoff.
Expand All @@ -1554,7 +1554,8 @@ async def get_event_ids_to_not_pull_from_backoff(
event_ids: A list of events to filter down
Returns:
List of event_ids that should not be attempted to be pulled
A dictionary of event_ids that should not be attempted to be pulled and the
next timestamp at which we may try pulling them again.
"""
event_failed_pull_attempts = await self.db_pool.simple_select_many_batch(
table="event_failed_pull_attempts",
Expand All @@ -1570,22 +1571,28 @@ async def get_event_ids_to_not_pull_from_backoff(
)

current_time = self._clock.time_msec()
return [
event_failed_pull_attempt["event_id"]
for event_failed_pull_attempt in event_failed_pull_attempts

event_ids_with_backoff = {}
for event_failed_pull_attempt in event_failed_pull_attempts:
event_id = event_failed_pull_attempt["event_id"]
# Exponential back-off (up to the upper bound) so we don't try to
# pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc.
if current_time
< event_failed_pull_attempt["last_attempt_ts"]
+ (
2
** min(
event_failed_pull_attempt["num_attempts"],
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
backoff_end_time = (
event_failed_pull_attempt["last_attempt_ts"]
+ (
2
** min(
event_failed_pull_attempt["num_attempts"],
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
)
)
* BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS
)
* BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS
]

if current_time < backoff_end_time: # `backoff_end_time` is exclusive
event_ids_with_backoff[event_id] = backoff_end_time

return event_ids_with_backoff

async def get_missing_events(
self,
Expand Down
13 changes: 9 additions & 4 deletions tests/storage/test_event_federation.py
Expand Up @@ -1143,19 +1143,24 @@ def test_get_event_ids_to_not_pull_from_backoff(self) -> None:
tok = self.login("alice", "test")
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)

failure_time = self.clock.time_msec()
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "$failed_event_id", "fake cause"
)
)

event_ids_to_backoff = self.get_success(
event_ids_with_backoff = self.get_success(
self.store.get_event_ids_to_not_pull_from_backoff(
room_id=room_id, event_ids=["$failed_event_id", "$normal_event_id"]
)
)

self.assertEqual(event_ids_to_backoff, ["$failed_event_id"])
self.assertEqual(
event_ids_with_backoff,
# We expect a 2^1 hour backoff after a single failed attempt.
{"$failed_event_id": failure_time + 2 * 60 * 60 * 1000},
)

def test_get_event_ids_to_not_pull_from_backoff_retry_after_backoff_duration(
self,
Expand All @@ -1179,14 +1184,14 @@ def test_get_event_ids_to_not_pull_from_backoff_retry_after_backoff_duration(
# attempt (2^1 hours).
self.reactor.advance(datetime.timedelta(hours=2).total_seconds())

event_ids_to_backoff = self.get_success(
event_ids_with_backoff = self.get_success(
self.store.get_event_ids_to_not_pull_from_backoff(
room_id=room_id, event_ids=["$failed_event_id", "$normal_event_id"]
)
)
# Since this function only returns events we should backoff from, time has
# elapsed past the backoff range so there is no events to backoff from.
self.assertEqual(event_ids_to_backoff, [])
self.assertEqual(event_ids_with_backoff, {})


@attr.s(auto_attribs=True)
Expand Down

0 comments on commit d9f6949

Please sign in to comment.