Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Report metrics on expensive rooms for state res #8420

Merged
merged 5 commits into from
Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8420.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental reporting of metrics on expensive rooms for state-resolution.
13 changes: 8 additions & 5 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import logging
from collections.abc import Container
from http import HTTPStatus
from typing import Dict, Iterable, List, Optional, Sequence, Tuple, Union
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Sequence, Tuple, Union

import attr
from signedjson.key import decode_verify_key_bytes
Expand Down Expand Up @@ -69,7 +69,7 @@
ReplicationFederationSendEventsRestServlet,
ReplicationStoreRoomOnInviteRestServlet,
)
from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.state import StateResolutionStore
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
JsonDict,
Expand All @@ -85,6 +85,9 @@
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_server

if TYPE_CHECKING:
from synapse.server import HomeServer

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -116,7 +119,7 @@ class FederationHandler(BaseHandler):
rooms.
"""

def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)

self.hs = hs
Expand All @@ -126,6 +129,7 @@ def __init__(self, hs):
self.state_store = self.storage.state
self.federation_client = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
self._state_resolution_handler = hs.get_state_resolution_handler()
self.server_name = hs.hostname
self.keyring = hs.get_keyring()
self.action_generator = hs.get_action_generator()
Expand Down Expand Up @@ -381,8 +385,7 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None:
event_map[x.event_id] = x

room_version = await self.store.get_room_version_id(room_id)
state_map = await resolve_events_with_store(
self.clock,
state_map = await self._state_resolution_handler.resolve_events_with_store(
room_id,
room_version,
state_maps,
Expand Down
233 changes: 169 additions & 64 deletions synapse/state/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,46 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import heapq
import logging
from collections import namedtuple
from collections import defaultdict, namedtuple
from typing import (
Any,
Awaitable,
Callable,
DefaultDict,
Dict,
Iterable,
List,
Optional,
Sequence,
Set,
Tuple,
Union,
overload,
)

import attr
from frozendict import frozendict
from prometheus_client import Histogram
from prometheus_client import Counter, Histogram
from typing_extensions import Literal

from synapse.api.constants import EventTypes
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.logging.context import ContextResourceUsage
from synapse.logging.utils import log_function
from synapse.state import v1, v2
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.roommember import ProfileInfo
from synapse.types import Collection, StateMap
from synapse.util import Clock
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import Measure, measure_func

logger = logging.getLogger(__name__)

metrics_logger = logging.getLogger("synapse.state.metrics")

# Metrics for number of state groups involved in a resolution.
state_groups_histogram = Histogram(
Expand Down Expand Up @@ -448,19 +452,44 @@ async def resolve_events(

state_map = {ev.event_id: ev for st in state_sets for ev in st}

with Measure(self.clock, "state._resolve_events"):
new_state = await resolve_events_with_store(
self.clock,
event.room_id,
room_version,
state_set_ids,
event_map=state_map,
state_res_store=StateResolutionStore(self.store),
)
new_state = await self._state_resolution_handler.resolve_events_with_store(
event.room_id,
room_version,
state_set_ids,
event_map=state_map,
state_res_store=StateResolutionStore(self.store),
)

return {key: state_map[ev_id] for key, ev_id in new_state.items()}


@attr.s(slots=True)
class _StateResMetrics:
"""Keeps track of some usage metrics about state res."""

# System and User CPU time, in seconds
cpu_time = attr.ib(type=float, default=0.0)

# time spent on database transactions (excluding scheduling time). This roughly
# corresponds to the amount of work done on the db server, excluding event fetches.
db_time = attr.ib(type=float, default=0.0)

# number of events fetched from the db.
db_events = attr.ib(type=int, default=0)


_biggest_room_by_cpu_counter = Counter(
"synapse_state_res_cpu_for_biggest_room_seconds",
"CPU time spent performing state resolution for the single most expensive "
"room for state resolution",
)
_biggest_room_by_db_counter = Counter(
"synapse_state_res_db_for_biggest_room_seconds",
"Database time spent performing state resolution for the single most "
"expensive room for state resolution",
)


class StateResolutionHandler:
"""Responsible for doing state conflict resolution.

Expand All @@ -483,6 +512,17 @@ def __init__(self, hs):
reset_expiry_on_get=True,
)

#
# stuff for tracking time spent on state-res by room
#

# tracks the amount of work done on state res per room
self._state_res_metrics = defaultdict(
_StateResMetrics
) # type: DefaultDict[str, _StateResMetrics]

self.clock.looping_call(self._report_metrics, 120 * 1000)

@log_function
async def resolve_state_groups(
self,
Expand Down Expand Up @@ -530,15 +570,13 @@ async def resolve_state_groups(

state_groups_histogram.observe(len(state_groups_ids))

with Measure(self.clock, "state._resolve_events"):
new_state = await resolve_events_with_store(
self.clock,
room_id,
room_version,
list(state_groups_ids.values()),
event_map=event_map,
state_res_store=state_res_store,
)
new_state = await self.resolve_events_with_store(
room_id,
room_version,
list(state_groups_ids.values()),
event_map=event_map,
state_res_store=state_res_store,
)

# if the new state matches any of the input state groups, we can
# use that state group again. Otherwise we will generate a state_id
Expand All @@ -552,6 +590,114 @@ async def resolve_state_groups(

return cache

async def resolve_events_with_store(
self,
room_id: str,
room_version: str,
state_sets: Sequence[StateMap[str]],
event_map: Optional[Dict[str, EventBase]],
state_res_store: "StateResolutionStore",
) -> StateMap[str]:
"""
Args:
room_id: the room we are working in

room_version: Version of the room

state_sets: List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.

event_map:
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
used as a starting point fof finding the state we need; any missing
events will be requested via state_map_factory.

If None, all events will be fetched via state_res_store.

state_res_store: a place to fetch events from

Returns:
a map from (type, state_key) to event_id.
"""
try:
with Measure(self.clock, "state._resolve_events") as m:
v = KNOWN_ROOM_VERSIONS[room_version]
if v.state_res == StateResolutionVersions.V1:
return await v1.resolve_events_with_store(
room_id, state_sets, event_map, state_res_store.get_events
)
else:
return await v2.resolve_events_with_store(
self.clock,
room_id,
room_version,
state_sets,
event_map,
state_res_store,
)
finally:
self._record_state_res_metrics(room_id, m.get_resource_usage())

def _record_state_res_metrics(self, room_id: str, rusage: ContextResourceUsage):
room_metrics = self._state_res_metrics[room_id]
room_metrics.cpu_time += rusage.ru_utime + rusage.ru_stime
room_metrics.db_time += rusage.db_txn_duration_sec
room_metrics.db_events += rusage.evt_db_fetch_count

def _report_metrics(self):
if not self._state_res_metrics:
# no state res has happened since the last iteration: don't bother logging.
return

self._report_biggest(
lambda i: i.cpu_time, "CPU time", _biggest_room_by_cpu_counter,
)

self._report_biggest(
lambda i: i.db_time, "DB time", _biggest_room_by_db_counter,
)

self._state_res_metrics.clear()

def _report_biggest(
self,
extract_key: Callable[[_StateResMetrics], Any],
metric_name: str,
prometheus_counter_metric: Counter,
) -> None:
"""Report metrics on the biggest rooms for state res

Args:
extract_key: a callable which, given a _StateResMetrics, extracts a single
metric to sort by.
metric_name: the name of the metric we have extracted, for the log line
prometheus_counter_metric: a prometheus metric recording the sum of the
the extracted metric
"""
n_to_log = 10
if not metrics_logger.isEnabledFor(logging.DEBUG):
# only need the most expensive if we don't have debug logging, which
# allows nlargest() to degrade to max()
n_to_log = 1

items = self._state_res_metrics.items()

# log the N biggest rooms
biggest = heapq.nlargest(
n_to_log, items, key=lambda i: extract_key(i[1])
) # type: List[Tuple[str, _StateResMetrics]]
metrics_logger.debug(
"%i biggest rooms for state-res by %s: %s",
len(biggest),
metric_name,
["%s (%gs)" % (r, extract_key(m)) for (r, m) in biggest],
)

# report info on the single biggest to prometheus
_, biggest_metrics = biggest[0]
prometheus_counter_metric.inc(extract_key(biggest_metrics))


def _make_state_cache_entry(
new_state: StateMap[str], state_groups_ids: Dict[int, StateMap[str]]
Expand Down Expand Up @@ -605,47 +751,6 @@ def _make_state_cache_entry(
)


def resolve_events_with_store(
clock: Clock,
room_id: str,
room_version: str,
state_sets: Sequence[StateMap[str]],
event_map: Optional[Dict[str, EventBase]],
state_res_store: "StateResolutionStore",
) -> Awaitable[StateMap[str]]:
"""
Args:
room_id: the room we are working in

room_version: Version of the room

state_sets: List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.

event_map:
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
used as a starting point fof finding the state we need; any missing
events will be requested via state_map_factory.

If None, all events will be fetched via state_res_store.

state_res_store: a place to fetch events from

Returns:
a map from (type, state_key) to event_id.
"""
v = KNOWN_ROOM_VERSIONS[room_version]
if v.state_res == StateResolutionVersions.V1:
return v1.resolve_events_with_store(
room_id, state_sets, event_map, state_res_store.get_events
)
else:
return v2.resolve_events_with_store(
clock, room_id, room_version, state_sets, event_map, state_res_store
)


@attr.s(slots=True)
class StateResolutionStore:
"""Interface that allows state resolution algorithms to access the database
Expand Down