Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add metrics to track how the rate limiter is affecting requests (sleep/reject) #13534

Merged
merged 6 commits into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13534.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add metrics to track how the rate limiter is affecting requests (sleep/reject).
37 changes: 29 additions & 8 deletions synapse/util/ratelimitutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import typing
from typing import Any, DefaultDict, Iterator, List, Set

from prometheus_client.core import Counter

from twisted.internet import defer

from synapse.api.errors import LimitExceededError
Expand All @@ -37,6 +39,9 @@
logger = logging.getLogger(__name__)


# Track how much the ratelimiter is affecting requests
rate_limit_sleep_counter = Counter("synapse_rate_limit_sleep", "")
rate_limit_reject_counter = Counter("synapse_rate_limit_reject", "")
queue_wait_timer = Histogram(
"synapse_rate_limit_queue_wait_time_seconds",
"sec",
Expand Down Expand Up @@ -84,7 +89,7 @@ def ratelimit(self, host: str) -> "_GeneratorContextManager[defer.Deferred[None]
Returns:
context manager which returns a deferred.
"""
return self.ratelimiters[host].ratelimit()
return self.ratelimiters[host].ratelimit(host)
Copy link
Contributor Author

@MadLittleMods MadLittleMods Aug 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better way to pass the host string to the _PerHostRateLimiter?

There doesn't seem to be a good way to make DefaultDict pass the string key host on, https://stackoverflow.com/questions/2912231/is-there-a-clever-way-to-pass-the-key-to-defaultdicts-default-factory



class _PerHostRatelimiter:
Expand Down Expand Up @@ -119,12 +124,14 @@ def __init__(self, clock: Clock, config: FederationRatelimitSettings):
self.request_times: List[int] = []

@contextlib.contextmanager
def ratelimit(self) -> "Iterator[defer.Deferred[None]]":
def ratelimit(self, host: str) -> "Iterator[defer.Deferred[None]]":
# `contextlib.contextmanager` takes a generator and turns it into a
# context manager. The generator should only yield once with a value
# to be returned by manager.
# Exceptions will be reraised at the yield.

self.host = host

request_id = object()
ret = self._on_enter(request_id)
try:
Expand All @@ -144,6 +151,8 @@ def _on_enter(self, request_id: object) -> "defer.Deferred[None]":
# sleeping or in the ready queue).
queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be good to track queue_size?

if queue_size > self.reject_limit:
logger.debug("Ratelimiter(%s): rejecting request", self.host)
rate_limit_reject_counter.inc()
raise LimitExceededError(
retry_after_ms=int(self.window_size / self.sleep_limit)
)
Expand All @@ -155,7 +164,8 @@ def queue_request() -> "defer.Deferred[None]":
queue_defer: defer.Deferred[None] = defer.Deferred()
self.ready_request_queue[request_id] = queue_defer
logger.info(
"Ratelimiter: queueing request (queue now %i items)",
"Ratelimiter(%s): queueing request (queue now %i items)",
self.host,
len(self.ready_request_queue),
)

Expand All @@ -164,19 +174,28 @@ def queue_request() -> "defer.Deferred[None]":
return defer.succeed(None)

logger.debug(
"Ratelimit [%s]: len(self.request_times)=%d",
"Ratelimit(%s) [%s]: len(self.request_times)=%d",
self.host,
id(request_id),
len(self.request_times),
)

if len(self.request_times) > self.sleep_limit:
logger.debug("Ratelimiter: sleeping request for %f sec", self.sleep_sec)
logger.debug(
"Ratelimiter(%s) [%s]: sleeping request for %f sec",
self.host,
id(request_id),
self.sleep_sec,
)
rate_limit_sleep_counter.inc()
ret_defer = run_in_background(self.clock.sleep, self.sleep_sec)

self.sleeping_requests.add(request_id)

def on_wait_finished(_: Any) -> "defer.Deferred[None]":
logger.debug("Ratelimit [%s]: Finished sleeping", id(request_id))
logger.debug(
"Ratelimit(%s) [%s]: Finished sleeping", self.host, id(request_id)
)
self.sleeping_requests.discard(request_id)
queue_defer = queue_request()
return queue_defer
Expand All @@ -186,7 +205,9 @@ def on_wait_finished(_: Any) -> "defer.Deferred[None]":
ret_defer = queue_request()

def on_start(r: object) -> object:
logger.debug("Ratelimit [%s]: Processing req", id(request_id))
logger.debug(
"Ratelimit(%s) [%s]: Processing req", self.host, id(request_id)
)
self.current_processing.add(request_id)
return r

Expand Down Expand Up @@ -217,7 +238,7 @@ def on_both(r: object) -> object:
return make_deferred_yieldable(ret_defer)

def _on_exit(self, request_id: object) -> None:
logger.debug("Ratelimit [%s]: Processed req", id(request_id))
logger.debug("Ratelimit(%s) [%s]: Processed req", self.host, id(request_id))
self.current_processing.discard(request_id)
try:
# start processing the next item on the queue.
Expand Down