高性能 PHP 限流器,支持令牌桶、滑动窗口算法,可用于本地和分布式限流。
- 令牌桶算法:支持突发流量,按固定速率补充令牌
- 滑动窗口算法:基于时间窗口的精确限流
- 本地限流:使用内存存储,适用于单进程
- 分布式限流:使用 Redis,支持单机/Sentinel/Cluster 模式
- 并发控制:支持任务、进程、Fiber 协程的本地和分布式限流
- 原子操作:Lua 脚本保证分布式环境下的原子性
- 高可用:支持 Redis Sentinel 和 Cluster 自动故障转移
- PHP 8.2+:使用 readonly、enum 等新特性优化性能
- PHP >= 8.2
- 可选:Redis 扩展(用于分布式部署)
composer require kode/limiting使用 Limiter 类可以更简洁地创建各种限流器:
use Kode\Limiting\Limiter;
// 令牌桶限流器(默认)
$limiter = Limiter::tokenBucket(100, 10.0);
$limiter->allow('api:user:123');
// 滑动窗口限流器
$limiter = Limiter::slidingWindow(100, 60.0);
// 漏桶限流器
$limiter = Limiter::leakyBucket(100, 1.0);
// 计数器限流器
$limiter = Limiter::counter(1000, 60);
// Redis 分布式限流器
$limiter = Limiter::redis(
\Kode\Limiting\Enum\LimiterType::TOKEN_BUCKET,
100, 10.0,
'127.0.0.1', 6379
);
// Memcached 分布式限流器
$limiter = Limiter::memcached(
\Kode\Limiting\Enum\LimiterType::TOKEN_BUCKET,
100, 10.0,
'127.0.0.1', 11211
);
// PDO 分布式限流器
$limiter = Limiter::pdo(
\Kode\Limiting\Enum\LimiterType::TOKEN_BUCKET,
100, 10.0,
'mysql:host=127.0.0.1;dbname=limiting', 'root', 'password'
);
// 并发控制器
$taskLimiter = Limiter::task(10); // 任务并发控制
$processLimiter = Limiter::process(10); // 进程并发控制
$fiberLimiter = Limiter::fiber(10); // Fiber 并发控制
// 中间件
$middleware = Limiter::middleware(\Kode\Limiting\Enum\LimiterType::TOKEN_BUCKET, 100, 10.0);use Kode\Limiting\Algorithm\TokenBucket;
use Kode\Limiting\Store\MemoryStore;
$store = new MemoryStore();
$bucket = new TokenBucket($store, 100, 10.0);
if ($bucket->allow('user:123', 1)) {
echo "请求通过";
} else {
echo "被限流";
}
echo "剩余令牌: " . $bucket->getRemaining('user:123');
echo "等待时间: " . $bucket->getWaitTime('user:123') . "秒";use Kode\Limiting\Algorithm\SlidingWindow;
use Kode\Limiting\Store\MemoryStore;
$window = new SlidingWindow($store, 100, 1.0);
if ($window->allow('api:v1', 1)) {
echo "请求通过";
}use Kode\Limiting\Distributed\DistributedLimiter;
// 单机模式
$limiter = DistributedLimiter::create(
'127.0.0.1', 6379,
1000, // 容量
100.0 // 每秒补充100个令牌
);
// Sentinel 高可用模式
$limiter = DistributedLimiter::createSentinel(
['192.168.1.1:26379', '192.168.1.2:26379'],
'mymaster',
1000,
100.0
);
// Cluster 分片模式
$limiter = DistributedLimiter::createCluster(
['192.168.1.1:6379', '192.168.1.2:6379', '192.168.1.3:6379'],
1000,
100.0
);
if ($limiter->allow('global:api', 1)) {
echo "请求通过";
}use Kode\Limiting\Concurrency\TaskLimiter;
$limiter = TaskLimiter::create(10, 10, 1.0);
// 手动获取和释放
if ($limiter->tryAcquire('task:1')) {
try {
do_something();
} finally {
$limiter->release('task:1');
}
}
// 自动释放
$result = $limiter->run('task:2', fn() => do_something());
// 阻塞等待
$limiter->acquire('task:3', 30.0);src/
├── Limiter.php # 统一入口类
├── DTO/ # 数据传输对象
│ ├── LimiterConfig.php # 限流器配置(readonly)
│ └── LimiterResult.php # 限流结果(readonly)
├── Enum/ # 枚举类型
│ ├── LimiterType.php # 限流器类型
│ ├── StoreType.php # 存储类型
│ └── RedisMode.php # Redis 模式
├── Store/ # 存储层
│ ├── StoreInterface.php # 存储接口
│ ├── MemoryStore.php # 内存存储
│ ├── RedisStore.php # Redis 存储(支持 Sentinel/Cluster)
│ ├── MemcachedStore.php # Memcached 存储
│ └── PdoStore.php # PDO 存储(MySQL/SQLite/PostgreSQL)
├── Algorithm/ # 限流算法
│ ├── RateLimiterInterface.php # 限流器接口
│ ├── TokenBucket.php # 令牌桶
│ ├── SlidingWindow.php # 滑动窗口
│ ├── LeakyBucket.php # 漏桶
│ └── Counter.php # 计数器
├── Distributed/ # 分布式限流
│ ├── DistributedLimiter.php # 分布式令牌桶
│ ├── DistributedTaskLimiter.php # 分布式任务限流
│ ├── DistributedProcessLimiter.php # 分布式进程限流
│ └── DistributedFiberLimiter.php # 分布式 Fiber 限流
├── Concurrency/ # 本地并发控制
│ ├── TaskLimiter.php # 任务限流
│ ├── ProcessLimiter.php # 进程限流
│ └── FiberLimiter.php # Fiber 限流
└── Middleware/ # 中间件
├── LimiterMiddleware.php # 限流中间件
└── LimiterMiddlewareInterface.php # 中间件接口
存储接口,所有存储实现必须实现此接口。
public function get(string $key): ?string; // 获取值
public function set(string $key, string $value, int $ttl = 0): void; // 设置值
public function delete(string $key): void; // 删除键
public function incr(string $key, int $step = 1): int; // 原子递增
public function ttl(string $key): int; // 获取 TTL限流器接口。
public function allow(string $key, int $tokens = 1): bool; // 检查是否允许
public function getRemaining(string $key): float; // 获取剩余数量
public function getWaitTime(string $key): float; // 获取等待时间
public function reset(string $key): void; // 重置统一入口类,提供简洁的 API 创建限流器。
// 工厂方法
Limiter::tokenBucket(int $capacity, float $refillRate, $store); // 令牌桶
Limiter::slidingWindow(int $capacity, float $windowSize, $store); // 滑动窗口
Limiter::leakyBucket(int $capacity, float $leakRate, $store); // 漏桶
Limiter::counter(int $maxRequests, int $windowSeconds, $store); // 计数器
Limiter::redis(LimiterType, int $capacity, float $refillRate, ...); // Redis 限流
Limiter::memcached(LimiterType, int $capacity, float $refillRate, ...); // Memcached 限流
Limiter::pdo(LimiterType, int $capacity, float $refillRate, ...); // PDO 限流
Limiter::task(int $maxConcurrency); // 任务并发控制器
Limiter::process(int $maxConcurrency); // 进程并发控制器
Limiter::fiber(int $maxConcurrency); // Fiber 并发控制器
Limiter::middleware(LimiterType, int $capacity, float $refillRate); // 中间件
// 实例方法
$limiter->allow(string $key, int $tokens); // 检查是否允许
$limiter->getRemaining(string $key); // 获取剩余数量
$limiter->getWaitTime(string $key); // 获取等待时间
$limiter->reset(string $key); // 重置
$limiter->build(); // 获取底层限流器
$limiter->getStore(); // 获取存储
$limiter->getConfig(); // 获取配置令牌桶限流算法。
$bucket = new TokenBucket($store, $capacity, $refillRate, $ttl = 3600, $prefix = 'bucket:');
$bucket->allow('key', 1); // 检查是否允许
$bucket->getRemaining('key'); // 获取剩余令牌数
$bucket->getWaitTime('key'); // 获取等待时间
$bucket->reset('key'); // 重置
$bucket->getCapacity(); // 获取容量
$bucket->getRefillRate(); // 获取补充速率
$bucket->getTtl(); // 获取 TTL
$bucket->check('key', 1); // 检查并返回 LimiterResult滑动窗口限流算法。
$window = new SlidingWindow($store, $capacity, $windowSize = 1.0, $ttl = 3600, $prefix = 'sw:');
$window->allow('key', 1); // 检查是否允许
$window->getRemaining('key'); // 获取剩余请求数
$window->getWaitTime('key'); // 获取等待时间
$window->reset('key'); // 重置
$window->getCapacity(); // 获取容量
$window->getWindowSize(); // 获取窗口大小
$window->check('key', 1); // 检查并返回 LimiterResult分布式限流器(基于 Redis)。
// 单机模式
$limiter = DistributedLimiter::create($host, $port, $capacity, $refillRate, $password, $database);
// Sentinel 模式
$limiter = DistributedLimiter::createSentinel($sentinels, $masterName, $capacity, $refillRate);
// Cluster 模式
$limiter = DistributedLimiter::createCluster($nodes, $capacity, $refillRate);
// 方法
$limiter->allow('key', 1); // 检查是否允许(原子操作)
$limiter->tryAcquire('key', 1); // 尝试获取(别名)
$limiter->getRemaining('key'); // 获取剩余令牌数
$limiter->getWaitTime('key'); // 获取等待时间
$limiter->reset('key'); // 重置
$limiter->allowBatch(['key1', 'key2'], 1); // 批量检查
$limiter->check('key', 1); // 检查并返回 LimiterResultuse Kode\Limiting\Distributed\DistributedTaskLimiter;
use Kode\Limiting\Distributed\DistributedProcessLimiter;
use Kode\Limiting\Distributed\DistributedFiberLimiter;
$taskLimiter = DistributedTaskLimiter::create('127.0.0.1', 6379, 10);
$processLimiter = DistributedProcessLimiter::create('127.0.0.1', 6379, 10);
$fiberLimiter = DistributedFiberLimiter::create('127.0.0.1', 6379, 100);
$taskLimiter->tryAcquire('task:1'); // 非阻塞获取
$taskLimiter->acquire('task:1', 30.0); // 阻塞等待
$taskLimiter->run('task:1', fn() => null); // 执行任务
$taskLimiter->release('task:1'); // 释放
$taskLimiter->getActiveCount(); // 当前活跃数use Kode\Limiting\Concurrency\TaskLimiter;
use Kode\Limiting\Concurrency\ProcessLimiter;
use Kode\Limiting\Concurrency\FiberLimiter;
$taskLimiter = TaskLimiter::create($maxConcurrency, $capacity, $refillRate);
$processLimiter = ProcessLimiter::getInstance($maxProcesses, $capacity, $refillRate);
$fiberLimiter = new FiberLimiter($maxFibers, $capacity, $refillRate, $store);
$taskLimiter->tryAcquire('task:1');
$taskLimiter->acquire('task:1', 30.0);
$taskLimiter->run('task:1', fn() => null);
$taskLimiter->release('task:1');
$taskLimiter->getMaxConcurrency();限流器配置(不可变对象)。
$config = new LimiterConfig($capacity, $refillRate, $ttl = 3600, $prefix = 'limiter:');
$config->capacity; // 容量
$config->refillRate; // 补充速率
$config->ttl; // TTL
$config->prefix; // 前缀
// 链式调用创建新实例
$newConfig = $config->withCapacity(200)->withRefillRate(20.0);限流结果(不可变对象)。
$result = $bucket->check('key', 1);
$result->allowed; // 是否允许
$result->remaining; // 剩余数量
$result->waitTime; // 等待时间
$result->timestamp; // 时间戳
$result->isAllowed(); // 是否允许(布尔值)
$result->toArray(); // 转换为数组use Kode\Limiting\Enum\LimiterType;
use Kode\Limiting\Enum\StoreType;
use Kode\Limiting\Enum\RedisMode;
LimiterType::TOKEN_BUCKET->label(); // 令牌桶
LimiterType::SLIDING_WINDOW->label(); // 滑动窗口
StoreType::MEMORY->label(); // 内存存储
StoreType::REDIS->label(); // Redis 存储
RedisMode::STANDALONE->label(); // 单机模式
RedisMode::SENTINEL->label(); // Sentinel 高可用
RedisMode::CLUSTER->label(); // Cluster 分片测试覆盖所有核心功能,共计 35 个测试用例。
Token Bucket (令牌桶)
✔ 允许首次请求
✔ 允许多次请求
✔ 超出容量拒绝
✔ 获取剩余令牌数
✔ 新键返回完整容量
✔ 获取等待时间
✔ 重置限流器
✔ 时间推移后补充令牌
Sliding Window (滑动窗口)
✔ 容量内允许通过
✔ 超出容量拒绝
✔ 获取剩余请求数
✔ 请求后剩余数正确
✔ 重置限流器
✔ 可用时等待时间为0
✔ 耗尽时等待时间大于0
Memory Store (内存存储)
✔ 设置和获取
✔ 获取不存在的键
✔ 删除键
✔ 递增操作
✔ 新键递增
✔ TTL 计算
✔ 清空存储
Task Limiter (任务限流)
✔ 尝试获取许可
✔ 多次获取
✔ 释放许可
✔ 执行任务
✔ 获取最大并发数
Process Limiter (进程限流)
✔ 尝试获取许可
✔ 释放许可
✔ 获取最大进程数
✔ 单例模式
Fiber Limiter (Fiber 限流)
✔ 支持检查
✔ 尝试获取许可
✔ 释放许可
✔ 获取最大 Fiber 数
# 运行所有测试
./vendor/bin/phpunit
# 显示详细输出
./vendor/bin/phpunit --testdox
# 运行指定测试类
./vendor/bin/phpunit tests/TokenBucketTest.php
# 生成覆盖率报告
./vendor/bin/phpunit --coverage-html coverage实现 StoreInterface 接口即可:
use Kode\Limiting\Store\StoreInterface;
class MyStore implements StoreInterface
{
public function get(string $key): ?string { /* ... */ }
public function set(string $key, string $value, int $ttl = 0): void { /* ... */ }
public function delete(string $key): void { /* ... */ }
public function incr(string $key, int $step = 1): int { /* ... */ }
public function ttl(string $key): int { /* ... */ }
}$limiter = DistributedLimiter::create(
'127.0.0.1', // 主机
6379, // 端口
1000, // 容量
100.0, // 补充速率
'password', // 密码(可选)
0 // 数据库编号
);$limiter = DistributedLimiter::createSentinel(
['192.168.1.1:26379', '192.168.1.2:26379', '192.168.1.3:26379'],
'mymaster', // 主节点名称
1000,
100.0,
'sentinel_pass'
);$limiter = DistributedLimiter::createCluster(
['192.168.1.1:6379', '192.168.1.2:6379', '192.168.1.3:6379'],
1000,
100.0,
'cluster_pass'
);$limiter = DistributedLimiter::create('127.0.0.1', 6379, 100, 10.0);
$response = $limiter->allow('api:user:' . $userId, 1)
? processRequest()
: new Response('Rate limited', 429);$limiter = DistributedLimiter::create('127.0.0.1', 6379, 1000, 0);
if (!$limiter->allow('seckill:' . $productId, 1)) {
throw new Exception('活动已结束');
}$limiter = DistributedTaskLimiter::create('127.0.0.1', 6379, 10);
foreach ($tasks as $task) {
$limiter->run('task:' . $task['id'], function () use ($task) {
processTask($task);
});
}# 安装依赖
composer install
# 运行测试
./vendor/bin/phpunit
# 语法检查
find src -name "*.php" -exec php -l {} \;kode/limiting/
├── src/
│ ├── DTO/ # 数据传输对象
│ ├── Enum/ # 枚举类型
│ ├── Store/ # 存储层
│ ├── Algorithm/ # 限流算法
│ ├── Distributed/ # 分布式限流
│ └── Concurrency/ # 并发控制
├── tests/ # 单元测试
├── composer.json
├── phpunit.xml
├── LICENSE # Apache 2.0
└── README.md
Apache License 2.0 - 参见 LICENSE 文件