-
Notifications
You must be signed in to change notification settings - Fork 13
/
ratelimiter.py
104 lines (87 loc) · 3.53 KB
/
ratelimiter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import abc
import time
import typing
import asyncio
import logging
from . import log
class BaseRateLimiter(abc.ABC):
"""
This is the interface that must be implemented to satisfy naz's rate limiting.
User implementations should inherit this class and
implement the :func:`limit <BaseRateLimiter.limit>` methods with the type signatures shown.
It may be important to control the rate at which the client(naz) sends requests to an SMSC/server.
naz lets you do this, by allowing you to specify a custom rate limiter.
"""
@abc.abstractmethod
async def limit(self) -> None:
"""
rate limit sending of messages to SMSC.
"""
raise NotImplementedError("limit method must be implemented.")
class SimpleRateLimiter(BaseRateLimiter):
"""
This is an implementation of BaseRateLimiter.
It does rate limiting using a `token bucket rate limiting algorithm <https://en.wikipedia.org/wiki/Token_bucket>`_
example usage:
.. highlight:: python
.. code-block:: python
rate_limiter = SimpleRateLimiter(send_rate=10)
await rate_limiter.limit()
send_messsages()
"""
def __init__(
self, send_rate: float = 100_000.00, logger: typing.Union[None, logging.Logger] = None
) -> None:
"""
Parameters:
send_rate: the maximum rate, in messages/second, at which naz can send messages to SMSC.
"""
if not isinstance(send_rate, float):
raise ValueError(
"`send_rate` should be of type:: `float` You entered: {0}".format(type(send_rate))
)
if not isinstance(logger, (type(None), logging.Logger)):
raise ValueError(
"`logger` should be of type:: `None` or `logging.Logger` You entered: {0}".format(
type(logger)
)
)
self.send_rate: float = send_rate
self.max_tokens: float = self.send_rate
self.tokens: float = self.max_tokens
self.delay_for_tokens: float = 1.0
self.updated_at: float = time.monotonic()
self.messages_delivered: int = 0
self.effective_send_rate: float = 0.00
if logger is not None:
self.logger = logger
else:
self.logger = log.SimpleLogger("naz.SimpleRateLimiter")
async def limit(self) -> None:
self.logger.log(logging.DEBUG, {"event": "naz.SimpleRateLimiter.limit", "stage": "start"})
while self.tokens < 1:
self._add_new_tokens()
# todo: sleep in an exponetial manner upto a maximum then wrap around.
await asyncio.sleep(self.delay_for_tokens)
self.logger.log(
logging.DEBUG,
{
"event": "naz.SimpleRateLimiter.limit",
"stage": "end",
"state": "limiting rate",
"send_rate": self.send_rate,
"delay": self.delay_for_tokens,
"effective_send_rate": self.effective_send_rate,
},
)
self.messages_delivered += 1
self.tokens -= 1
def _add_new_tokens(self) -> None:
now = time.monotonic()
time_since_update = now - self.updated_at
self.effective_send_rate = self.messages_delivered / time_since_update
new_tokens = time_since_update * self.send_rate
if new_tokens > 1:
self.tokens = min(self.tokens + new_tokens, self.max_tokens)
self.updated_at = now
self.messages_delivered = 0