Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Rename pagination&purge locks and add comments explaining them (#16112)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mathieu Velten authored and H-Shay committed Aug 16, 2023
1 parent 98ec525 commit a432ae0
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 19 deletions.
1 change: 1 addition & 0 deletions changelog.d/16112.misc
@@ -0,0 +1 @@
Rename pagination and purge locks and add comments to explain why they exist and how they work.
4 changes: 2 additions & 2 deletions synapse/federation/federation_server.py
Expand Up @@ -63,7 +63,7 @@
)
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import (
make_deferred_yieldable,
Expand Down Expand Up @@ -1245,7 +1245,7 @@ async def _process_incoming_pdus_in_room_inner(
# while holding the `_INBOUND_EVENT_HANDLING_LOCK_NAME`
# lock.
async with self._worker_lock_handler.acquire_read_write_lock(
DELETE_ROOM_LOCK_NAME, room_id, write=False
NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False
):
await self._federation_event_handler.on_receive_pdu(
origin, event
Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/message.py
Expand Up @@ -53,7 +53,7 @@
from synapse.events.utils import SerializeEventConfig, maybe_upsert_event_field
from synapse.events.validator import EventValidator
from synapse.handlers.directory import DirectoryHandler
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
Expand Down Expand Up @@ -1034,7 +1034,7 @@ async def create_and_send_nonmember_event(
)

async with self._worker_lock_handler.acquire_read_write_lock(
DELETE_ROOM_LOCK_NAME, room_id, write=False
NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False
):
return await self._create_and_send_nonmember_event_locked(
requester=requester,
Expand Down Expand Up @@ -1978,7 +1978,7 @@ async def _send_dummy_events_to_fill_extremities(self) -> None:

for room_id in room_ids:
async with self._worker_lock_handler.acquire_read_write_lock(
DELETE_ROOM_LOCK_NAME, room_id, write=False
NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False
):
dummy_event_sent = await self._send_dummy_event_for_room(room_id)

Expand Down
19 changes: 12 additions & 7 deletions synapse/handlers/pagination.py
Expand Up @@ -24,6 +24,7 @@
from synapse.api.filtering import Filter
from synapse.events.utils import SerializeEventConfig
from synapse.handlers.room import ShutdownRoomResponse
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.logging.opentracing import trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.admin._base import assert_user_is_admin
Expand All @@ -46,9 +47,10 @@
BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3


PURGE_HISTORY_LOCK_NAME = "purge_history_lock"

DELETE_ROOM_LOCK_NAME = "delete_room_lock"
# This is used to avoid purging a room several time at the same moment,
# and also paginating during a purge. Pagination can trigger backfill,
# which would create old events locally, and would potentially clash with the room delete.
PURGE_PAGINATION_LOCK_NAME = "purge_pagination_lock"


@attr.s(slots=True, auto_attribs=True)
Expand Down Expand Up @@ -363,7 +365,7 @@ async def _purge_history(
self._purges_in_progress_by_room.add(room_id)
try:
async with self._worker_locks.acquire_read_write_lock(
PURGE_HISTORY_LOCK_NAME, room_id, write=True
PURGE_PAGINATION_LOCK_NAME, room_id, write=True
):
await self._storage_controllers.purge_events.purge_history(
room_id, token, delete_local_events
Expand Down Expand Up @@ -421,7 +423,10 @@ async def purge_room(self, room_id: str, force: bool = False) -> None:
force: set true to skip checking for joined users.
"""
async with self._worker_locks.acquire_multi_read_write_lock(
[(PURGE_HISTORY_LOCK_NAME, room_id), (DELETE_ROOM_LOCK_NAME, room_id)],
[
(PURGE_PAGINATION_LOCK_NAME, room_id),
(NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id),
],
write=True,
):
# first check that we have no users in this room
Expand Down Expand Up @@ -483,7 +488,7 @@ async def get_messages(
room_token = from_token.room_key

async with self._worker_locks.acquire_read_write_lock(
PURGE_HISTORY_LOCK_NAME, room_id, write=False
PURGE_PAGINATION_LOCK_NAME, room_id, write=False
):
(membership, member_event_id) = (None, None)
if not use_admin_priviledge:
Expand Down Expand Up @@ -761,7 +766,7 @@ async def _shutdown_and_purge_room(
self._purges_in_progress_by_room.add(room_id)
try:
async with self._worker_locks.acquire_read_write_lock(
PURGE_HISTORY_LOCK_NAME, room_id, write=True
PURGE_PAGINATION_LOCK_NAME, room_id, write=True
):
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
self._delete_by_id[
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/room_member.py
Expand Up @@ -39,7 +39,7 @@
from synapse.events.snapshot import EventContext
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.logging import opentracing
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
Expand Down Expand Up @@ -621,7 +621,7 @@ async def update_membership(
async with self.member_as_limiter.queue(as_id):
async with self.member_linearizer.queue(key):
async with self._worker_lock_handler.acquire_read_write_lock(
DELETE_ROOM_LOCK_NAME, room_id, write=False
NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False
):
with opentracing.start_active_span("update_membership_locked"):
result = await self.update_membership_locked(
Expand Down
6 changes: 5 additions & 1 deletion synapse/handlers/worker_lock.py
Expand Up @@ -42,7 +42,11 @@
from synapse.server import HomeServer


DELETE_ROOM_LOCK_NAME = "delete_room_lock"
# This lock is used to avoid creating an event while we are purging the room.
# We take a read lock when creating an event, and a write one when purging a room.
# This is because it is fine to create several events concurrently, since referenced events
# will not disappear under our feet as long as we don't delete the room.
NEW_EVENT_DURING_PURGE_LOCK_NAME = "new_event_during_purge_lock"


class WorkerLocksHandler:
Expand Down
4 changes: 2 additions & 2 deletions synapse/rest/client/room_upgrade_rest_servlet.py
Expand Up @@ -17,7 +17,7 @@

from synapse.api.errors import Codes, ShadowBanError, SynapseError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.http.server import HttpServer
from synapse.http.servlet import (
RestServlet,
Expand Down Expand Up @@ -81,7 +81,7 @@ async def on_POST(

try:
async with self._worker_lock_handler.acquire_read_write_lock(
DELETE_ROOM_LOCK_NAME, room_id, write=False
NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False
):
new_room_id = await self._room_creation_handler.upgrade_room(
requester, room_id, new_version
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/controllers/persist_events.py
Expand Up @@ -45,7 +45,7 @@
from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.logging.opentracing import (
SynapseTags,
Expand Down Expand Up @@ -357,7 +357,7 @@ async def _process_event_persist_queue_task(
# it. We might already have taken out the lock, but since this is just a
# "read" lock its inherently reentrant.
async with self.hs.get_worker_locks_handler().acquire_read_write_lock(
DELETE_ROOM_LOCK_NAME, room_id, write=False
NEW_EVENT_DURING_PURGE_LOCK_NAME, room_id, write=False
):
if isinstance(task, _PersistEventsTask):
return await self._persist_event_batch(room_id, task)
Expand Down

0 comments on commit a432ae0

Please sign in to comment.