Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Only lock when we're backfilling #16159

Merged
merged 3 commits into from Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16159.misc
@@ -0,0 +1 @@
Reduce scope of locks when paginating to alleviate DB contention.
35 changes: 25 additions & 10 deletions synapse/handlers/federation.py
Expand Up @@ -60,6 +60,7 @@
from synapse.events.snapshot import EventContext, UnpersistedEventContextBase
from synapse.events.validator import EventValidator
from synapse.federation.federation_client import InvalidResponseError
from synapse.handlers.pagination import PURGE_PAGINATION_LOCK_NAME
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
Expand Down Expand Up @@ -152,6 +153,7 @@ def __init__(self, hs: "HomeServer"):
self._device_handler = hs.get_device_handler()
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
self._notifier = hs.get_notifier()
self._worker_locks = hs.get_worker_locks_handler()

self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
hs
Expand Down Expand Up @@ -200,7 +202,7 @@ def __init__(self, hs: "HomeServer"):
@trace
@tag_args
async def maybe_backfill(
self, room_id: str, current_depth: int, limit: int
self, room_id: str, current_depth: int, limit: int, record_time: bool = True
) -> bool:
"""Checks the database to see if we should backfill before paginating,
and if so do.
Expand All @@ -213,21 +215,25 @@ async def maybe_backfill(
limit: The number of events that the pagination request will
return. This is used as part of the heuristic to decide if we
should back paginate.
record_time: Whether to record the time it takes to backfill.

Returns:
True if we actually tried to backfill something, otherwise False.
"""
# Starting the processing time here so we can include the room backfill
# linearizer lock queue in the timing
processing_start_time = self.clock.time_msec()
processing_start_time = self.clock.time_msec() if record_time else 0

async with self._room_backfill.queue(room_id):
return await self._maybe_backfill_inner(
room_id,
current_depth,
limit,
processing_start_time=processing_start_time,
)
async with self._worker_locks.acquire_read_write_lock(
PURGE_PAGINATION_LOCK_NAME, room_id, write=False
):
return await self._maybe_backfill_inner(
room_id,
current_depth,
limit,
processing_start_time=processing_start_time,
)

@trace
@tag_args
Expand Down Expand Up @@ -305,12 +311,21 @@ async def _maybe_backfill_inner(
# of history that extends all the way back to where we are currently paginating
# and it's within the 100 events that are returned from `/backfill`.
if not sorted_backfill_points and current_depth != MAX_DEPTH:
# Check that we actually have later backfill points, if not just return.
have_later_backfill_points = await self.store.get_backfill_points_in_room(
room_id=room_id,
current_depth=MAX_DEPTH,
limit=1,
)
if not have_later_backfill_points:
return False

logger.debug(
"_maybe_backfill_inner: all backfill points are *after* current depth. Trying again with later backfill points."
)
run_as_background_process(
"_maybe_backfill_inner_anyway_with_max_depth",
self._maybe_backfill_inner,
self.maybe_backfill,
room_id=room_id,
# We use `MAX_DEPTH` so that we find all backfill points next
# time (all events are below the `MAX_DEPTH`)
Expand All @@ -319,7 +334,7 @@ async def _maybe_backfill_inner(
# We don't want to start another timing observation from this
# nested recursive call. The top-most call can record the time
# overall otherwise the smaller one will throw off the results.
processing_start_time=None,
record_time=False,
)
# We return `False` because we're backfilling in the background and there is
# no new events immediately for the caller to know about yet.
Expand Down
267 changes: 131 additions & 136 deletions synapse/handlers/pagination.py
Expand Up @@ -487,155 +487,150 @@ async def get_messages(

room_token = from_token.room_key

async with self._worker_locks.acquire_read_write_lock(
PURGE_PAGINATION_LOCK_NAME, room_id, write=False
):
(membership, member_event_id) = (None, None)
if not use_admin_priviledge:
(
membership,
member_event_id,
) = await self.auth.check_user_in_room_or_world_readable(
room_id, requester, allow_departed_users=True
(membership, member_event_id) = (None, None)
if not use_admin_priviledge:
(
membership,
member_event_id,
) = await self.auth.check_user_in_room_or_world_readable(
room_id, requester, allow_departed_users=True
)

if pagin_config.direction == Direction.BACKWARDS:
# if we're going backwards, we might need to backfill. This
# requires that we have a topo token.
if room_token.topological:
curr_topo = room_token.topological
else:
curr_topo = await self.store.get_current_topological_token(
room_id, room_token.stream
)

if pagin_config.direction == Direction.BACKWARDS:
# if we're going backwards, we might need to backfill. This
# requires that we have a topo token.
if room_token.topological:
curr_topo = room_token.topological
else:
curr_topo = await self.store.get_current_topological_token(
room_id, room_token.stream
)
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
if (
pagin_config.direction == Direction.BACKWARDS
and not use_admin_priviledge
and membership == Membership.LEAVE
):
# This is only None if the room is world_readable, in which case
# "Membership.JOIN" would have been returned and we should never hit
# this branch.
assert member_event_id

# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
if (
pagin_config.direction == Direction.BACKWARDS
and not use_admin_priviledge
and membership == Membership.LEAVE
):
# This is only None if the room is world_readable, in which case
# "Membership.JOIN" would have been returned and we should never hit
# this branch.
assert member_event_id
leave_token = await self.store.get_topological_token_for_event(
member_event_id
)
assert leave_token.topological is not None

leave_token = await self.store.get_topological_token_for_event(
member_event_id
if leave_token.topological < curr_topo:
from_token = from_token.copy_and_replace(
StreamKeyType.ROOM, leave_token
)
assert leave_token.topological is not None

if leave_token.topological < curr_topo:
from_token = from_token.copy_and_replace(
StreamKeyType.ROOM, leave_token
)
to_room_key = None
if pagin_config.to_token:
to_room_key = pagin_config.to_token.room_key

# Initially fetch the events from the database. With any luck, we can return
# these without blocking on backfill (handled below).
events, next_key = await self.store.paginate_room_events(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
event_filter=event_filter,
)

to_room_key = None
if pagin_config.to_token:
to_room_key = pagin_config.to_token.room_key

# Initially fetch the events from the database. With any luck, we can return
# these without blocking on backfill (handled below).
events, next_key = await self.store.paginate_room_events(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
event_filter=event_filter,
if pagin_config.direction == Direction.BACKWARDS:
# We use a `Set` because there can be multiple events at a given depth
# and we only care about looking at the unique continum of depths to
# find gaps.
event_depths: Set[int] = {event.depth for event in events}
sorted_event_depths = sorted(event_depths)

# Inspect the depths of the returned events to see if there are any gaps
found_big_gap = False
number_of_gaps = 0
previous_event_depth = (
sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0
)

if pagin_config.direction == Direction.BACKWARDS:
# We use a `Set` because there can be multiple events at a given depth
# and we only care about looking at the unique continum of depths to
# find gaps.
event_depths: Set[int] = {event.depth for event in events}
sorted_event_depths = sorted(event_depths)

# Inspect the depths of the returned events to see if there are any gaps
found_big_gap = False
number_of_gaps = 0
previous_event_depth = (
sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0
)
for event_depth in sorted_event_depths:
# We don't expect a negative depth but we'll just deal with it in
# any case by taking the absolute value to get the true gap between
# any two integers.
depth_gap = abs(event_depth - previous_event_depth)
# A `depth_gap` of 1 is a normal continuous chain to the next event
# (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's
# also possible there is no event at a given depth but we can't ever
# know that for sure)
if depth_gap > 1:
number_of_gaps += 1

# We only tolerate a small number single-event long gaps in the
# returned events because those are most likely just events we've
# failed to pull in the past. Anything longer than that is probably
# a sign that we're missing a decent chunk of history and we should
# try to backfill it.
#
# XXX: It's possible we could tolerate longer gaps if we checked
# that a given events `prev_events` is one that has failed pull
# attempts and we could just treat it like a dead branch of history
# for now or at least something that we don't need the block the
# client on to try pulling.
#
# XXX: If we had something like MSC3871 to indicate gaps in the
# timeline to the client, we could also get away with any sized gap
# and just have the client refetch the holes as they see fit.
if depth_gap > 2:
found_big_gap = True
break
previous_event_depth = event_depth

# Backfill in the foreground if we found a big gap, have too many holes,
# or we don't have enough events to fill the limit that the client asked
# for.
missing_too_many_events = (
number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD
for event_depth in sorted_event_depths:
# We don't expect a negative depth but we'll just deal with it in
# any case by taking the absolute value to get the true gap between
# any two integers.
depth_gap = abs(event_depth - previous_event_depth)
# A `depth_gap` of 1 is a normal continuous chain to the next event
# (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's
# also possible there is no event at a given depth but we can't ever
# know that for sure)
if depth_gap > 1:
number_of_gaps += 1

# We only tolerate a small number single-event long gaps in the
# returned events because those are most likely just events we've
# failed to pull in the past. Anything longer than that is probably
# a sign that we're missing a decent chunk of history and we should
# try to backfill it.
#
# XXX: It's possible we could tolerate longer gaps if we checked
# that a given events `prev_events` is one that has failed pull
# attempts and we could just treat it like a dead branch of history
# for now or at least something that we don't need the block the
# client on to try pulling.
#
# XXX: If we had something like MSC3871 to indicate gaps in the
# timeline to the client, we could also get away with any sized gap
# and just have the client refetch the holes as they see fit.
if depth_gap > 2:
found_big_gap = True
break
previous_event_depth = event_depth

# Backfill in the foreground if we found a big gap, have too many holes,
# or we don't have enough events to fill the limit that the client asked
# for.
missing_too_many_events = (
number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD
)
not_enough_events_to_fill_response = len(events) < pagin_config.limit
if (
found_big_gap
or missing_too_many_events
or not_enough_events_to_fill_response
):
did_backfill = await self.hs.get_federation_handler().maybe_backfill(
room_id,
curr_topo,
limit=pagin_config.limit,
)
not_enough_events_to_fill_response = len(events) < pagin_config.limit
if (
found_big_gap
or missing_too_many_events
or not_enough_events_to_fill_response
):
did_backfill = (
await self.hs.get_federation_handler().maybe_backfill(
room_id,
curr_topo,
limit=pagin_config.limit,
)
)

# If we did backfill something, refetch the events from the database to
# catch anything new that might have been added since we last fetched.
if did_backfill:
events, next_key = await self.store.paginate_room_events(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
event_filter=event_filter,
)
else:
# Otherwise, we can backfill in the background for eventual
# consistency's sake but we don't need to block the client waiting
# for a costly federation call and processing.
run_as_background_process(
"maybe_backfill_in_the_background",
self.hs.get_federation_handler().maybe_backfill,
room_id,
curr_topo,
# If we did backfill something, refetch the events from the database to
# catch anything new that might have been added since we last fetched.
if did_backfill:
events, next_key = await self.store.paginate_room_events(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
event_filter=event_filter,
)
else:
# Otherwise, we can backfill in the background for eventual
# consistency's sake but we don't need to block the client waiting
# for a costly federation call and processing.
run_as_background_process(
"maybe_backfill_in_the_background",
self.hs.get_federation_handler().maybe_backfill,
room_id,
curr_topo,
limit=pagin_config.limit,
)

next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)
next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)

# if no events are returned from pagination, that implies
# we have reached the end of the available events.
Expand Down