Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#68 Events in Rate Limiters #97

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions hyx/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ async def _get_or_init_listeners(self) -> List[ListenerT]:
return self._inited_listeners


class NoOpEventDispatcher(EventDispatcher):
def __getattr__(self, event_handler_name: str) -> Callable:
async def handle_event(*args, **kwargs) -> None:
pass

return handle_event


def get_default_name(func: Optional[Callable] = None) -> str:
"""
Get the default name of the component based on code context where it's being used
Expand Down
26 changes: 23 additions & 3 deletions hyx/ratelimit/api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import functools
from types import TracebackType
from typing import Any, Optional, Type, cast
from typing import Any, Optional, Sequence, Type, cast

from hyx.events import EventDispatcher, EventManager, get_default_name
from hyx.ratelimit.events import _RATELIMITER_LISTENERS, RateLimiterListener
from hyx.ratelimit.managers import RateLimiter, TokenBucketLimiter
from hyx.typing import FuncT

Expand Down Expand Up @@ -49,7 +51,7 @@ class tokenbucket:
**Parameters**

* **max_executions** *(float)* - How many executions are permitted?
* **per_time_secs** *(float)* - Per what time span? (in seconds)
* **per_time_secs** *(float)* - Per what time period? (in seconds)
* **bucket_size** *(None | float)* - The token bucket size. Defines the max number of executions
that are permitted to happen during bursts.
The burst is when no executions have happened for a long time, and then you are receiving a
Expand All @@ -58,13 +60,31 @@ class tokenbucket:

__slots__ = ("_limiter",)

def __init__(self, max_executions: float, per_time_secs: float, bucket_size: Optional[float] = None) -> None:
def __init__(
self,
max_executions: float,
per_time_secs: float,
bucket_size: Optional[float] = None,
name: Optional[str] = None,
listeners: Optional[Sequence[RateLimiterListener]] = None,
event_manager: Optional[EventManager] = None,
) -> None:
event_dispatcher = EventDispatcher[RateLimiter, RateLimiterListener](
listeners,
_RATELIMITER_LISTENERS,
event_manager=event_manager,
)

self._limiter = TokenBucketLimiter(
name=name or get_default_name(),
max_executions=max_executions,
per_time_secs=per_time_secs,
bucket_size=bucket_size,
event_dispatcher=event_dispatcher.as_listener,
)

event_dispatcher.set_component(self._limiter)

async def __aenter__(self) -> "tokenbucket":
await self._limiter.acquire()

Expand Down
Empty file added hyx/ratelimit/buckets.py
Empty file.
21 changes: 21 additions & 0 deletions hyx/ratelimit/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import TYPE_CHECKING, Union

from hyx.events import ListenerFactoryT, ListenerRegistry

if TYPE_CHECKING:
from hyx.ratelimit.managers import RateLimiter

_RATELIMITER_LISTENERS: ListenerRegistry["RateLimiter", "RateLimiterListener"] = ListenerRegistry()


class RateLimiterListener:
...


def register_ratelimiter_listener(listener: Union[RateLimiterListener, ListenerFactoryT]) -> None:
"""
Register a listener that will listen to all rate limiter components in the system
"""
global _RATELIMITER_LISTENERS

_RATELIMITER_LISTENERS.register(listener)
10 changes: 9 additions & 1 deletion hyx/ratelimit/managers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
from typing import Optional

from hyx.ratelimit.events import RateLimiterListener
from hyx.ratelimit.exceptions import RateLimitExceeded


Expand All @@ -26,7 +27,14 @@ class TokenBucketLimiter(RateLimiter):
"_next_replenish_at",
)

def __init__(self, max_executions: float, per_time_secs: float, bucket_size: Optional[float] = None) -> None:
def __init__(
self,
name: str,
max_executions: float,
per_time_secs: float,
event_dispatcher: RateLimiterListener,
bucket_size: Optional[float] = None,
) -> None:
self._max_executions = max_executions
self._per_time_secs = per_time_secs

Expand Down
1 change: 1 addition & 0 deletions hyx/retry/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def bucket_retry(

def _decorator(func: FuncT) -> FuncT:
limiter = TokenBucketLimiter(attempts, per_time_secs, bucket_size) if attempts and per_time_secs else None

event_dispatcher = EventDispatcher[RetryManager, RetryListener](
listeners,
_RETRY_LISTENERS,
Expand Down
41 changes: 37 additions & 4 deletions tests/test_ratelimiter/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,21 @@

import pytest

from hyx.events import NoOpEventDispatcher
from hyx.ratelimit import TokenBucketLimiter, ratelimiter, tokenbucket
from hyx.ratelimit.exceptions import RateLimitExceeded


async def test__ratelimiter__decorator() -> None:
@ratelimiter(limiter=TokenBucketLimiter(max_executions=4, per_time_secs=1, bucket_size=4))
limiter = TokenBucketLimiter(
name="hyx.tests.decorator",
max_executions=4,
per_time_secs=1,
bucket_size=4,
event_dispatcher=NoOpEventDispatcher().as_listener,
)

@ratelimiter(limiter=limiter)
async def calc() -> float:
return 42

Expand All @@ -25,7 +34,15 @@ async def calc() -> float:


async def test__ratelimiter__context_manager() -> None:
limiter = ratelimiter(limiter=TokenBucketLimiter(max_executions=4, per_time_secs=1, bucket_size=4))
limiter = ratelimiter(
limiter=TokenBucketLimiter(
name="hyx.tests.ctxmgr",
max_executions=4,
per_time_secs=1,
bucket_size=4,
event_dispatcher=NoOpEventDispatcher().as_listener,
)
)

for _ in range(4):
async with limiter:
Expand All @@ -41,7 +58,15 @@ async def test__ratelimiter__token_bucket_context_manager() -> None:


async def test__ratelimiter__limit_exceeded() -> None:
@ratelimiter(limiter=TokenBucketLimiter(max_executions=3, per_time_secs=1, bucket_size=3))
limiter = TokenBucketLimiter(
name="hyx.tests.limiter",
max_executions=3,
per_time_secs=1,
bucket_size=3,
event_dispatcher=NoOpEventDispatcher().as_listener,
)

@ratelimiter(limiter=limiter)
async def calc() -> float:
return 42

Expand All @@ -51,7 +76,15 @@ async def calc() -> float:


async def test__ratelimiter__replenish_after_full_bucket() -> None:
@ratelimiter(limiter=TokenBucketLimiter(max_executions=3, per_time_secs=1, bucket_size=3))
limiter = TokenBucketLimiter(
name="hyx.tests.limiter",
max_executions=3,
per_time_secs=1,
bucket_size=3,
event_dispatcher=NoOpEventDispatcher().as_listener,
)

@ratelimiter(limiter=limiter)
async def calc() -> float:
return 42

Expand Down
Loading