In [None]:
import asyncio
from pyrate_limiter import Limiter, Duration, Rate, RedisBucket
from redis.asyncio import Redis

import time 

limit_s = 3
max_delay = 6 * Duration.SECOND

rates = [Rate(limit_s, Duration.SECOND * 5)]
redis = Redis(host="localhost")
bucket = await RedisBucket.init(rates, redis, "test")
limiter = Limiter(bucket, max_delay=max_delay, raise_when_fail=True)

start = time.time()

def mapping(*args, **kwargs):
    return "check", 1

async def check_rate_limiter(i):
    try:
        acquired = await limiter.try_acquire("check", 1)
        if not acquired:
            raise RuntimeError(f"{i}: Unable to acquire")
        print(f"{i}: Calling t{round( time.time() - start)}")
        await asyncio.sleep(6)
        print(f"{i}: Done at t{round( time.time() - start)}")
    except Exception as e:
        print(f"{i}: Exception {e}")
async def main():
    await asyncio.gather(*(check_rate_limiter(i) for i in range(10)))

await main()


In [None]:
import asyncio
from pyrate_limiter import Limiter, Duration, Rate, InMemoryBucket
from redis.asyncio import Redis

import time 

limit_s = 3
max_delay = 6 * Duration.SECOND

rates = [Rate(limit_s, Duration.SECOND * 5)]
bucket = InMemoryBucket(rates)
limiter = Limiter(bucket, max_delay=max_delay, raise_when_fail=True)

start = time.time()

def mapping(*args, **kwargs):
    return "check", 1

async def check_rate_limiter(i):
    try:
        acquired = limiter.try_acquire("check", 1)
        if not acquired:
            raise RuntimeError(f"{i}: Unable to acquire")
        print(f"{i}: Calling t{round( time.time() - start)}")
        await asyncio.sleep(6)
        print(f"{i}: Done at t{round( time.time() - start)}")
    except Exception as e:
        print(f"{i}: Exception {e}")
async def main():
    await asyncio.gather(*(check_rate_limiter(i) for i in range(10)))

await main()


In [None]:
import logging
#from benchmarks.stress_limiters import *

logging.basicConfig(
    format='%(asctime)s %(name)s %(levelname)-8s %(message)s',
    level=logging.DEBUG,
    datefmt='%Y-%m-%d %H:%M:%S')

logger = logging.getLogger(__name__)


import time
from multiprocessing.dummy import Pool

from pyrate_limiter import InMemoryBucket
from pyrate_limiter import Limiter, Duration, Rate

limit_s = 3
max_delay_s=5

rates = [Rate(limit_s, Duration.SECOND * 5)]

bucket = InMemoryBucket(rates)

rate_limiter = Limiter(bucket, max_delay=max_delay_s, raise_when_fail=False)


start = time.time()

def mapping(*args, **kwargs):
    return "check", 1

def check_rate_limiter(i):
    rate_limiter.try_acquire(i, 1, None)

    print(f"Calling {i} t{round(time.time() - start)}")
    time.sleep(7)
    print(f"Done {i} at t{round(time.time() - start)}")


if __name__ == "__main__":
    x = list(range(10))

    with Pool(processes=10) as p:
        p.map(check_rate_limiter, x) 

: 