In [1]:
import redis
import json

r = redis.Redis(host="redis")

# Rate Limiter

The rate limiter increments a key based on the current time bucket and ip to check if a limit was breached. The time buckets (keys) will be kept for `history_buckets` windows.

In [2]:
import uuid
import time

class RateLimiter:
  def __init__(self, limit, window, history_buckets = 10):
    self.id = str(uuid.uuid4())
    self.limit = limit
    self.window = window
    self.history_buckets = history_buckets
  
  def current_time_bucket(self):
    now = int(time.time())
    return now - now % self.window
  
  def check(self, ip):
    bucket = self.current_time_bucket()
    key = f"{self.id}:{bucket}:{ip}"
    r.expire(key, self.window * self.history_buckets)
    count = int(r.incr(key))
    return count if count <= self.limit else False

We can now create a rate limiter. Here we want to allow access 4 times every 20 seconds:

In [3]:
limiter = RateLimiter(4, 20)

In [4]:
limiter.check("126.28.29.1")

1

In [5]:
limiter.check("126.28.29.1")

2

In [6]:
limiter.check("126.28.29.1")

3

In [7]:
limiter.check("126.28.29.1")

4

In [8]:
limiter.check("126.28.29.1")

False

In [9]:
limiter.check("126.28.29.1")

False

Let's wait until the next window:

In [10]:
time.sleep(20)

In [11]:
limiter.check("126.28.29.1")

1

In [12]:
limiter.check("126.28.29.1")

2

In [13]:
limiter.check("126.28.29.1")

3

In [14]:
limiter.check("126.28.29.1")

4

In [15]:
limiter.check("126.28.29.1")

False

In [16]:
limiter.check("126.28.29.1")

False