Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

add some metrics on the federation sender #6160

Merged
merged 1 commit into from Oct 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6160.misc
@@ -0,0 +1 @@
Add some metrics on the federation sender.
11 changes: 6 additions & 5 deletions synapse/federation/sender/__init__.py
Expand Up @@ -38,7 +38,7 @@
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.metrics import measure_func
from synapse.util.metrics import Measure, measure_func

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -183,8 +183,8 @@ def handle_event(event):
# Otherwise if the last member on a server in a room is
# banned then it won't receive the event because it won't
# be in the room after the ban.
destinations = yield self.state.get_current_hosts_in_room(
event.room_id, latest_event_ids=event.prev_event_ids()
destinations = yield self.state.get_hosts_in_room_at_events(
event.room_id, event_ids=event.prev_event_ids()
)
except Exception:
logger.exception(
Expand All @@ -207,8 +207,9 @@ def handle_event(event):

@defer.inlineCallbacks
def handle_room_events(events):
for event in events:
yield handle_event(event)
with Measure(self.clock, "handle_room_events"):
for event in events:
yield handle_event(event)

events_by_room = {}
for event in events:
Expand Down
24 changes: 18 additions & 6 deletions synapse/state/__init__.py
Expand Up @@ -33,7 +33,7 @@
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import Measure
from synapse.util.metrics import Measure, measure_func

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -191,11 +191,22 @@ def get_current_users_in_room(self, room_id, latest_event_ids=None):
return joined_users

@defer.inlineCallbacks
def get_current_hosts_in_room(self, room_id, latest_event_ids=None):
if not latest_event_ids:
latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
logger.debug("calling resolve_state_groups from get_current_hosts_in_room")
entry = yield self.resolve_state_groups_for_events(room_id, latest_event_ids)
def get_current_hosts_in_room(self, room_id):
event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
return (yield self.get_hosts_in_room_at_events(room_id, event_ids))

@defer.inlineCallbacks
def get_hosts_in_room_at_events(self, room_id, event_ids):
"""Get the hosts that were in a room at the given event ids

Args:
room_id (str):
event_ids (list[str]):

Returns:
Deferred[list[str]]: the hosts in the room at the given events
"""
entry = yield self.resolve_state_groups_for_events(room_id, event_ids)
joined_hosts = yield self.store.get_joined_hosts(room_id, entry)
return joined_hosts

Expand Down Expand Up @@ -344,6 +355,7 @@ def compute_event_context(self, event, old_state=None):

return context

@measure_func()
@defer.inlineCallbacks
def resolve_state_groups_for_events(self, room_id, event_ids):
""" Given a list of event_ids this method fetches the state at each
Expand Down
21 changes: 15 additions & 6 deletions synapse/storage/roommember.py
Expand Up @@ -33,6 +33,7 @@
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.metrics import Measure
from synapse.util.stringutils import to_ascii

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -483,6 +484,7 @@ def get_joined_users_from_context(self, event, context):
)
return result

@defer.inlineCallbacks
def get_joined_users_from_state(self, room_id, state_entry):
state_group = state_entry.state_group
if not state_group:
Expand All @@ -492,9 +494,12 @@ def get_joined_users_from_state(self, room_id, state_entry):
# To do this we set the state_group to a new object as object() != object()
state_group = object()

return self._get_joined_users_from_context(
room_id, state_group, state_entry.state, context=state_entry
)
with Measure(self._clock, "get_joined_users_from_state"):
return (
yield self._get_joined_users_from_context(
room_id, state_group, state_entry.state, context=state_entry
)
)

@cachedInlineCallbacks(
num_args=2, cache_context=True, iterable=True, max_entries=100000
Expand Down Expand Up @@ -669,6 +674,7 @@ def was_host_joined(self, room_id, host):

return True

@defer.inlineCallbacks
def get_joined_hosts(self, room_id, state_entry):
state_group = state_entry.state_group
if not state_group:
Expand All @@ -678,9 +684,12 @@ def get_joined_hosts(self, room_id, state_entry):
# To do this we set the state_group to a new object as object() != object()
state_group = object()

return self._get_joined_hosts(
room_id, state_group, state_entry.state, state_entry=state_entry
)
with Measure(self._clock, "get_joined_hosts"):
return (
yield self._get_joined_hosts(
room_id, state_group, state_entry.state, state_entry=state_entry
)
)

@cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True)
# @defer.inlineCallbacks
Expand Down
6 changes: 4 additions & 2 deletions synapse/util/metrics.py
Expand Up @@ -60,12 +60,14 @@
)


def measure_func(name):
def measure_func(name=None):
def wrapper(func):
block_name = func.__name__ if name is None else name

@wraps(func)
@defer.inlineCallbacks
def measured_func(self, *args, **kwargs):
with Measure(self.clock, name):
with Measure(self.clock, block_name):
r = yield func(self, *args, **kwargs)
return r

Expand Down