# 限流

> 从限流粒度看常见的有这几类
>> 1. 接口
>> 2. 访问应用 + 接口

> 从架构上又可以分为：单机限流和集群限流

## 算法

> 1. 固定时间窗口
> 2. 滑动时间窗口
> 3. 令牌桶
> 4. 漏桶

In [2]:
import math
import random
import time


class FixedTimeWindow:
    def __init__(self, permit_per_second):
        assert permit_per_second >= 1

        self.permit = permit_per_second
        self.count = 0
        self.start_time = math.floor(time.time())

    def apply(self):
        current_time = math.floor(time.time())
        if current_time == self.start_time:
            if self.count >= self.permit:
                return False
            else:
                self.count += 1
                return True
        else:
            self.count = 1
            self.start_time = current_time
            return True

rate_limiter = FixedTimeWindow(2)
for i in range(10):
    print(f'time:{time.ctime()}, result:{rate_limiter.apply()}')
    time.sleep(random.random())


time:Fri Jun 20 16:07:54 2025, result:True
time:Fri Jun 20 16:07:54 2025, result:True
time:Fri Jun 20 16:07:55 2025, result:True
time:Fri Jun 20 16:07:56 2025, result:True
time:Fri Jun 20 16:07:56 2025, result:True
time:Fri Jun 20 16:07:57 2025, result:True
time:Fri Jun 20 16:07:58 2025, result:True
time:Fri Jun 20 16:07:59 2025, result:True
time:Fri Jun 20 16:07:59 2025, result:True
time:Fri Jun 20 16:08:00 2025, result:True


In [6]:
from datetime import datetime


class RolledTimeWindow:
    '''
    1. 使用大小为 permit_per_second 的循环数组存储访问时的时间，
    2. 清理超过1秒的数据，更新 head，size
    3. 如果数组满了则返回失败，否则插入数据，更新 tail，size
    '''
    def __init__(self, permit_per_second):
        assert permit_per_second >= 1

        self.ring_buffer = [0] * permit_per_second
        self.head = self.tail = 0
        self.size = 0
        self.capacity = permit_per_second
    
    def apply(self):
        current_time = round(time.time(), 3)
        while self.size > 0 and self.ring_buffer[self.head] + 1 < current_time:
            self.head = (self.head + 1) % self.capacity
            self.size -= 1
        
        if self.size == self.capacity:
            return False
        else:
            self.ring_buffer[self.tail] = current_time
            self.tail = (self.tail + 1) % self.capacity
            self.size += 1
            return True

rate_limiter = RolledTimeWindow(2)
for _ in range(10):
    print(f'time:{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}, result:{rate_limiter.apply()}')
    time.sleep(random.random())

time:2025-06-20 16:09:52.729, result:True
time:2025-06-20 16:09:53.251, result:True
time:2025-06-20 16:09:53.488, result:False
time:2025-06-20 16:09:54.105, result:True
time:2025-06-20 16:09:54.307, result:True
time:2025-06-20 16:09:54.406, result:False
time:2025-06-20 16:09:54.827, result:False
time:2025-06-20 16:09:55.247, result:True
time:2025-06-20 16:09:55.866, result:True
time:2025-06-20 16:09:56.555, result:True


In [None]:
from collections import deque


class RolledTimeWindowV2:
    '''
    1. 使用双端队列存储访问时间
    2. 从队头删除超过1秒的数据，从队尾添加新的访问数据
    '''
    def __init__(self, permit_per_second):
        assert permit_per_second >= 1
        self.capactiy = permit_per_second
        self.window = deque()
    
    def apply(self):
        now = round(time.time(), 3)
        expire_time = now - 1
        while self.window and self.window[0] < expire_time:
            self.window.popleft()
        
        if len(self.window) >= self.capactiy:
            return False
        else:
            self.window.append(now)
            return True

rate_limiter = RolledTimeWindowV2(2)
for _ in range(10):
    print(f'time:{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}, result:{rate_limiter.apply()}')
    time.sleep(random.random())


time:2025-04-08 15:43:48.825, result:True
time:2025-04-08 15:43:49.093, result:True
time:2025-04-08 15:43:49.443, result:False
time:2025-04-08 15:43:50.379, result:True
time:2025-04-08 15:43:51.147, result:True
time:2025-04-08 15:43:51.568, result:True
time:2025-04-08 15:43:52.054, result:False
time:2025-04-08 15:43:52.550, result:True
time:2025-04-08 15:43:53.499, result:True
time:2025-04-08 15:43:54.397, result:True


> 上面滑动窗口算法存在的问题有
> 1. while 循环在极端情况下性能退化
> 2. 时间窗口内还会存在流量突刺


> 解决方案
> 1. 把时间窗口划分为更小的时间窗口，平衡精度和性能
> 2. 使用令牌桶、漏桶算法

In [14]:
from collections import defaultdict


class SlidingWindow:
    def __init__(self, permit_per_second, bucket_size=10):
        self.limit = permit_per_second
        self.buckets = defaultdict(int)
        self.bucket_size = 1.0 / bucket_size
    
    def apply(self):
        now = time.time()
        cur_key = now // self.bucket_size
        expire_key = (now - 1) // self.bucket_size
        for key in list(self.buckets.keys()):
            if key < expire_key:
                del self.buckets[key]
        
        total = sum(self.buckets.values())
        if total >= self.limit:
            return False
        else:
            self.buckets[cur_key] += 1
            return True

rate_limiter = SlidingWindow(2)
for _ in range(10):
    print(f'time:{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}, result:{rate_limiter.apply()}')
    time.sleep(random.random())

time:2025-04-08 17:20:16.172, result:True
time:2025-04-08 17:20:16.259, result:True
time:2025-04-08 17:20:16.358, result:False
time:2025-04-08 17:20:16.669, result:False
time:2025-04-08 17:20:17.203, result:True
time:2025-04-08 17:20:17.573, result:True
time:2025-04-08 17:20:17.900, result:False
time:2025-04-08 17:20:18.048, result:False
time:2025-04-08 17:20:18.380, result:True
time:2025-04-08 17:20:18.428, result:False


In [17]:
class TokenBucket:
    def __init__(self, rate, capacity):
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_time = time.time()

    def acquire(self, token=1, wait=0):
        now = time.time()
        generated_tokens = (now - self.last_time) * self.rate
        self.tokens = min(self.capacity, self.tokens + generated_tokens)
        print(f'generate tokens:{generated_tokens}, tokens:{self.tokens}')
        self.last_time = now

        if token <= self.tokens:
            self.tokens -= token
            return True
        else:
            wait_time = (token - self.tokens) / self.rate
            if wait_time <= wait:
                self.tokens = 0
                self.last_time += wait_time
                print(f'wait time:{wait_time}')
                time.sleep(wait_time)
                return True
            else:
                return False

rate_limiter = TokenBucket(1, 3)
for _ in range(10):
    print(f'time:{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}, result:{rate_limiter.acquire(1, 0.5)}')
    print('======')
    time.sleep(random.random())

generate tokens:0.00012993812561035156, tokens:3
time:2025-06-23 10:22:02.170, result:True
generate tokens:0.4056699275970459, tokens:2.405669927597046
time:2025-06-23 10:22:02.576, result:True
generate tokens:0.20156216621398926, tokens:1.6072320938110352
time:2025-06-23 10:22:02.777, result:True
generate tokens:0.758774995803833, tokens:1.3660070896148682
time:2025-06-23 10:22:03.536, result:True
generate tokens:0.5996408462524414, tokens:0.9656479358673096
wait time:0.03435206413269043
time:2025-06-23 10:22:04.136, result:True
generate tokens:0.8595640659332275, tokens:0.8595640659332275
wait time:0.14043593406677246
time:2025-06-23 10:22:05.029, result:True
generate tokens:0.684577226638794, tokens:0.684577226638794
wait time:0.31542277336120605
time:2025-06-23 10:22:05.854, result:True
generate tokens:0.801800012588501, tokens:0.801800012588501
wait time:0.19819998741149902
time:2025-06-23 10:22:06.972, result:True
generate tokens:0.08492493629455566, tokens:0.08492493629455566
ti

In [19]:
class LeakyBucket:
    def __init__(self, rate, capacity):
        """
        :param rate: 每秒漏出的请求数量（请求处理速度）
        :param capacity: 桶的容量
        """
        self.rate = rate
        self.capacity = capacity
        self.water = 0
        self.last_time = time.time()
        
    def apply(self, num=1):
        now = time.time()
        leaked = (now - self.last_time) * self.rate
        self.water = max(0, self.water - leaked)
        print(f'leaked:{leaked}, water:{self.water}')
        self.last_time = now

        if self.capacity - self.water >= num:
            self.water += num
            return True
        else:
            return False

rate_limiter = LeakyBucket(2, 3)
for _ in range(10):
    print(f'time:{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}, result:{rate_limiter.apply()}')
    print('======')
    time.sleep(0.1)

leaked:0.00021409988403320312, water:0
time:2025-04-09 14:44:17.362, result:True
leaked:0.20190763473510742, water:0.7980923652648926
time:2025-04-09 14:44:17.463, result:True
leaked:0.20462608337402344, water:1.5934662818908691
time:2025-04-09 14:44:17.566, result:True
leaked:0.20183801651000977, water:2.3916282653808594
time:2025-04-09 14:44:17.666, result:False
leaked:0.2007899284362793, water:2.19083833694458
time:2025-04-09 14:44:17.767, result:False
leaked:0.21054840087890625, water:1.9802899360656738
time:2025-04-09 14:44:17.872, result:True
leaked:0.2102518081665039, water:2.77003812789917
time:2025-04-09 14:44:17.977, result:False
leaked:0.2060837745666504, water:2.5639543533325195
time:2025-04-09 14:44:18.080, result:False
leaked:0.20955801010131836, water:2.354396343231201
time:2025-04-09 14:44:18.185, result:False
leaked:0.21039199829101562, water:2.1440043449401855
time:2025-04-09 14:44:18.290, result:False
