Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Ensure (room_id, next_batch_id) is unique to avoid cross-talk/confl…
Browse files Browse the repository at this point in the history
…icts between batches (MSC2716) (#10877)

Part of [MSC2716](matrix-org/matrix-spec-proposals#2716)

Part of #10737
  • Loading branch information
MadLittleMods committed Sep 29, 2021
1 parent 0f007fe commit 9fd057b
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 4 deletions.
1 change: 1 addition & 0 deletions changelog.d/10877.feature
@@ -0,0 +1 @@
Ensure `(room_id, next_batch_id)` is unique across [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) insertion events in rooms to avoid cross-talk/conflicts between batches.
34 changes: 34 additions & 0 deletions synapse/handlers/message.py
Expand Up @@ -16,6 +16,7 @@
# limitations under the License.
import logging
import random
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple

from canonicaljson import encode_canonical_json
Expand Down Expand Up @@ -1461,6 +1462,39 @@ async def persist_and_notify_client_event(
if prev_state_ids:
raise AuthError(403, "Changing the room create event is forbidden")

if event.type == EventTypes.MSC2716_INSERTION:
room_version = await self.store.get_room_version_id(event.room_id)
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]

create_event = await self.store.get_create_event_for_room(event.room_id)
room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR)

# Only check an insertion event if the room version
# supports it or the event is from the room creator.
if room_version_obj.msc2716_historical or (
self.config.experimental.msc2716_enabled
and event.sender == room_creator
):
next_batch_id = event.content.get(
EventContentFields.MSC2716_NEXT_BATCH_ID
)
conflicting_insertion_event_id = (
await self.store.get_insertion_event_by_batch_id(
event.room_id, next_batch_id
)
)
if conflicting_insertion_event_id is not None:
# The current insertion event that we're processing is invalid
# because an insertion event already exists in the room with the
# same next_batch_id. We can't allow multiple because the batch
# pointing will get weird, e.g. we can't determine which insertion
# event the batch event is pointing to.
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Another insertion event already exists with the same next_batch_id",
errcode=Codes.INVALID_PARAM,
)

# Mark any `m.historical` messages as backfilled so they don't appear
# in `/sync` and have the proper decrementing `stream_ordering` as we import
backfilled = False
Expand Down
6 changes: 4 additions & 2 deletions synapse/rest/client/room_batch.py
Expand Up @@ -306,11 +306,13 @@ async def on_POST(
# Verify the batch_id_from_query corresponds to an actual insertion event
# and have the batch connected.
corresponding_insertion_event_id = (
await self.store.get_insertion_event_by_batch_id(batch_id_from_query)
await self.store.get_insertion_event_by_batch_id(
room_id, batch_id_from_query
)
)
if corresponding_insertion_event_id is None:
raise SynapseError(
400,
HTTPStatus.BAD_REQUEST,
"No insertion event corresponds to the given ?batch_id",
errcode=Codes.INVALID_PARAM,
)
Expand Down
6 changes: 4 additions & 2 deletions synapse/storage/databases/main/room_batch.py
Expand Up @@ -18,7 +18,9 @@


class RoomBatchStore(SQLBaseStore):
async def get_insertion_event_by_batch_id(self, batch_id: str) -> Optional[str]:
async def get_insertion_event_by_batch_id(
self, room_id: str, batch_id: str
) -> Optional[str]:
"""Retrieve a insertion event ID.
Args:
Expand All @@ -30,7 +32,7 @@ async def get_insertion_event_by_batch_id(self, batch_id: str) -> Optional[str]:
"""
return await self.db_pool.simple_select_one_onecol(
table="insertion_events",
keyvalues={"next_batch_id": batch_id},
keyvalues={"room_id": room_id, "next_batch_id": batch_id},
retcol="event_id",
allow_none=True,
)

0 comments on commit 9fd057b

Please sign in to comment.