Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert state delta processing from a dict to attrs. #16469

Merged
merged 6 commits into from Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16469.misc
@@ -0,0 +1 @@
Improve type hints.
32 changes: 16 additions & 16 deletions synapse/handlers/presence.py
Expand Up @@ -110,6 +110,7 @@
from synapse.replication.tcp.commands import ClearUserSyncsCommand
from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
from synapse.storage.databases.main import DataStore
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.streams import EventSource
from synapse.types import (
JsonDict,
Expand Down Expand Up @@ -1499,9 +1500,9 @@ async def _unsafe_process(self) -> None:
# We may get multiple deltas for different rooms, but we want to
# handle them on a room by room basis, so we batch them up by
# room.
deltas_by_room: Dict[str, List[JsonDict]] = {}
deltas_by_room: Dict[str, List[StateDelta]] = {}
for delta in deltas:
deltas_by_room.setdefault(delta["room_id"], []).append(delta)
deltas_by_room.setdefault(delta.room_id, []).append(delta)

for room_id, deltas_for_room in deltas_by_room.items():
await self._handle_state_delta(room_id, deltas_for_room)
Expand All @@ -1513,7 +1514,7 @@ async def _unsafe_process(self) -> None:
max_pos
)

async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None:
async def _handle_state_delta(self, room_id: str, deltas: List[StateDelta]) -> None:
"""Process current state deltas for the room to find new joins that need
to be handled.
"""
Expand All @@ -1524,39 +1525,38 @@ async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> Non
newly_joined_users = set()

for delta in deltas:
assert room_id == delta["room_id"]
assert room_id == delta.room_id

typ = delta["type"]
state_key = delta["state_key"]
event_id = delta["event_id"]
prev_event_id = delta["prev_event_id"]

logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
logger.debug(
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
)

# Drop any event that isn't a membership join
if typ != EventTypes.Member:
if delta.event_type != EventTypes.Member:
continue

if event_id is None:
if delta.event_id is None:
# state has been deleted, so this is not a join. We only care about
# joins.
continue

event = await self.store.get_event(event_id, allow_none=True)
event = await self.store.get_event(delta.event_id, allow_none=True)
if not event or event.content.get("membership") != Membership.JOIN:
# We only care about joins
continue

if prev_event_id:
prev_event = await self.store.get_event(prev_event_id, allow_none=True)
if delta.prev_event_id:
prev_event = await self.store.get_event(
delta.prev_event_id, allow_none=True
)
if (
prev_event
and prev_event.content.get("membership") == Membership.JOIN
):
# Ignore changes to join events.
continue

newly_joined_users.add(state_key)
newly_joined_users.add(delta.state_key)

if not newly_joined_users:
# If nobody has joined then there's nothing to do.
Expand Down
21 changes: 8 additions & 13 deletions synapse/handlers/room_member.py
Expand Up @@ -16,7 +16,7 @@
import logging
import random
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple

from synapse import types
from synapse.api.constants import (
Expand Down Expand Up @@ -44,6 +44,7 @@
from synapse.logging import opentracing
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.types import (
JsonDict,
Requester,
Expand Down Expand Up @@ -2146,24 +2147,18 @@ async def _unsafe_process(self) -> None:

await self._store.update_room_forgetter_stream_pos(max_pos)

async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None:
async def _handle_deltas(self, deltas: List[StateDelta]) -> None:
"""Called with the state deltas to process"""
for delta in deltas:
typ = delta["type"]
state_key = delta["state_key"]
room_id = delta["room_id"]
event_id = delta["event_id"]
prev_event_id = delta["prev_event_id"]

if typ != EventTypes.Member:
if delta.event_type != EventTypes.Member:
continue

if not self._hs.is_mine_id(state_key):
if not self._hs.is_mine_id(delta.state_key):
continue

change = await self._get_key_change(
prev_event_id,
event_id,
delta.prev_event_id,
delta.event_id,
key_name="membership",
public_value=Membership.JOIN,
)
Expand All @@ -2172,7 +2167,7 @@ async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None:
if is_leave:
try:
await self._room_member_handler.forget(
UserID.from_string(state_key), room_id
UserID.from_string(delta.state_key), delta.room_id
)
except SynapseError as e:
if e.code == 400:
Expand Down
64 changes: 32 additions & 32 deletions synapse/handlers/stats.py
Expand Up @@ -27,6 +27,7 @@
from synapse.api.constants import EventContentFields, EventTypes, Membership
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.types import JsonDict

if TYPE_CHECKING:
Expand Down Expand Up @@ -142,7 +143,7 @@ async def _unsafe_process(self) -> None:
self.pos = max_pos

async def _handle_deltas(
self, deltas: Iterable[JsonDict]
self, deltas: Iterable[StateDelta]
) -> Tuple[Dict[str, CounterType[str]], Dict[str, CounterType[str]]]:
"""Called with the state deltas to process

Expand All @@ -157,51 +158,50 @@ async def _handle_deltas(
room_to_state_updates: Dict[str, Dict[str, Any]] = {}

for delta in deltas:
typ = delta["type"]
state_key = delta["state_key"]
room_id = delta["room_id"]
event_id = delta["event_id"]
stream_id = delta["stream_id"]
prev_event_id = delta["prev_event_id"]

logger.debug("Handling: %r, %r %r, %s", room_id, typ, state_key, event_id)
logger.debug(
"Handling: %r, %r %r, %s",
delta.room_id,
delta.event_type,
delta.state_key,
delta.event_id,
)

token = await self.store.get_earliest_token_for_stats("room", room_id)
token = await self.store.get_earliest_token_for_stats("room", delta.room_id)

# If the earliest token to begin from is larger than our current
# stream ID, skip processing this delta.
if token is not None and token >= stream_id:
if token is not None and token >= delta.stream_id:
logger.debug(
"Ignoring: %s as earlier than this room's initial ingestion event",
event_id,
delta.event_id,
)
continue

if event_id is None and prev_event_id is None:
if delta.event_id is None and delta.prev_event_id is None:
logger.error(
"event ID is None and so is the previous event ID. stream_id: %s",
stream_id,
delta.stream_id,
)
continue

event_content: JsonDict = {}

if event_id is not None:
event = await self.store.get_event(event_id, allow_none=True)
if delta.event_id is not None:
event = await self.store.get_event(delta.event_id, allow_none=True)
if event:
event_content = event.content or {}

# All the values in this dict are deltas (RELATIVE changes)
room_stats_delta = room_to_stats_deltas.setdefault(room_id, Counter())
room_stats_delta = room_to_stats_deltas.setdefault(delta.room_id, Counter())

room_state = room_to_state_updates.setdefault(room_id, {})
room_state = room_to_state_updates.setdefault(delta.room_id, {})

if prev_event_id is None:
if delta.prev_event_id is None:
# this state event doesn't overwrite another,
# so it is a new effective/current state event
room_stats_delta["current_state_events"] += 1

if typ == EventTypes.Member:
if delta.event_type == EventTypes.Member:
# we could use StateDeltasHandler._get_key_change here but it's
# a bit inefficient given we're not testing for a specific
# result; might as well just grab the prev_membership and
Expand All @@ -210,9 +210,9 @@ async def _handle_deltas(
# in the absence of a previous event because we do not want to
# reduce the leave count when a new-to-the-room user joins.
prev_membership = None
if prev_event_id is not None:
if delta.prev_event_id is not None:
prev_event = await self.store.get_event(
prev_event_id, allow_none=True
delta.prev_event_id, allow_none=True
)
if prev_event:
prev_event_content = prev_event.content
Expand Down Expand Up @@ -256,7 +256,7 @@ async def _handle_deltas(
else:
raise ValueError("%r is not a valid membership" % (membership,))

user_id = state_key
user_id = delta.state_key
if self.is_mine_id(user_id):
# this accounts for transitions like leave → ban and so on.
has_changed_joinedness = (prev_membership == Membership.JOIN) != (
Expand All @@ -272,30 +272,30 @@ async def _handle_deltas(

room_stats_delta["local_users_in_room"] += membership_delta

elif typ == EventTypes.Create:
elif delta.event_type == EventTypes.Create:
room_state["is_federatable"] = (
event_content.get(EventContentFields.FEDERATE, True) is True
)
room_type = event_content.get(EventContentFields.ROOM_TYPE)
if isinstance(room_type, str):
room_state["room_type"] = room_type
elif typ == EventTypes.JoinRules:
elif delta.event_type == EventTypes.JoinRules:
room_state["join_rules"] = event_content.get("join_rule")
elif typ == EventTypes.RoomHistoryVisibility:
elif delta.event_type == EventTypes.RoomHistoryVisibility:
room_state["history_visibility"] = event_content.get(
"history_visibility"
)
elif typ == EventTypes.RoomEncryption:
elif delta.event_type == EventTypes.RoomEncryption:
room_state["encryption"] = event_content.get("algorithm")
elif typ == EventTypes.Name:
elif delta.event_type == EventTypes.Name:
room_state["name"] = event_content.get("name")
elif typ == EventTypes.Topic:
elif delta.event_type == EventTypes.Topic:
room_state["topic"] = event_content.get("topic")
elif typ == EventTypes.RoomAvatar:
elif delta.event_type == EventTypes.RoomAvatar:
room_state["avatar"] = event_content.get("url")
elif typ == EventTypes.CanonicalAlias:
elif delta.event_type == EventTypes.CanonicalAlias:
room_state["canonical_alias"] = event_content.get("alias")
elif typ == EventTypes.GuestAccess:
elif delta.event_type == EventTypes.GuestAccess:
room_state["guest_access"] = event_content.get(
EventContentFields.GUEST_ACCESS
)
Expand Down
34 changes: 17 additions & 17 deletions synapse/handlers/user_directory.py
Expand Up @@ -14,7 +14,7 @@

import logging
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, List, Optional, Set, Tuple

from twisted.internet.interfaces import IDelayedCall

Expand All @@ -23,6 +23,7 @@
from synapse.api.errors import Codes, SynapseError
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.storage.databases.main.user_directory import SearchResult
from synapse.storage.roommember import ProfileInfo
from synapse.types import UserID
Expand Down Expand Up @@ -247,32 +248,31 @@ async def _unsafe_process(self) -> None:

await self.store.update_user_directory_stream_pos(max_pos)

async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None:
async def _handle_deltas(self, deltas: List[StateDelta]) -> None:
"""Called with the state deltas to process"""
for delta in deltas:
typ = delta["type"]
state_key = delta["state_key"]
room_id = delta["room_id"]
event_id: Optional[str] = delta["event_id"]
prev_event_id: Optional[str] = delta["prev_event_id"]

logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
logger.debug(
"Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
)

# For join rule and visibility changes we need to check if the room
# may have become public or not and add/remove the users in said room
if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules):
if delta.event_type in (
EventTypes.RoomHistoryVisibility,
EventTypes.JoinRules,
):
await self._handle_room_publicity_change(
room_id, prev_event_id, event_id, typ
delta.room_id, delta.prev_event_id, delta.event_id, delta.event_type
)
elif typ == EventTypes.Member:
elif delta.event_type == EventTypes.Member:
await self._handle_room_membership_event(
room_id,
prev_event_id,
event_id,
state_key,
delta.room_id,
delta.prev_event_id,
delta.event_id,
delta.state_key,
)
else:
logger.debug("Ignoring irrelevant type: %r", typ)
logger.debug("Ignoring irrelevant type: %r", delta.event_type)

async def _handle_room_publicity_change(
self,
Expand Down
14 changes: 2 additions & 12 deletions synapse/storage/controllers/state.py
Expand Up @@ -16,7 +16,6 @@
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Callable,
Collection,
Dict,
Expand All @@ -32,6 +31,7 @@
from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.logging.opentracing import tag_args, trace
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.storage.roommember import ProfileInfo
from synapse.storage.util.partial_state_events_tracker import (
PartialCurrentStateTracker,
Expand Down Expand Up @@ -531,19 +531,9 @@ async def get_server_acl_for_room(
@tag_args
async def get_current_state_deltas(
self, prev_stream_id: int, max_stream_id: int
) -> Tuple[int, List[Dict[str, Any]]]:
) -> Tuple[int, List[StateDelta]]:
"""Fetch a list of room state changes since the given stream id

Each entry in the result contains the following fields:
- stream_id (int)
- room_id (str)
- type (str): event type
- state_key (str):
- event_id (str|None): new event_id for this state key. None if the
state has been deleted.
- prev_event_id (str|None): previous event_id for this state key. None
if it's new state.

Args:
prev_stream_id: point to get changes since (exclusive)
max_stream_id: the point that we know has been correctly persisted
Expand Down