RedisXsync 是一个 同步 / 异步统一 Redis 客户端,专为 分布式锁 和 爬虫账号池限流 场景设计,支持多实例、多 DB,线程安全和协程安全。
# 安装
pip install redis-xsync
# 导包
from redis_xsync import register_redis, resolve_redis, redisXsync, RedisXsync| 特性 | 描述 |
|---|---|
| 同步 / 异步统一接口 | 同一套 API 支持 with 和 async with |
| 多实例 / 多 DB | 支持不同 Redis 实例与数据库同时管理 |
| 分布式锁 | 单键锁、组合锁,阻塞 / 非阻塞模式 |
| 账号池限流 | 基于 Lua 原子操作的令牌桶限流,可等待可用 token |
| 线程 & 协程安全 | 避免多线程 / 协程冲突 |
| 自动连接管理 | Context manager 自动释放连接 |
| RedisXsync |
|---|
| Sync / Async Interfaces |
| Distributed Lock Manager |
| Rate Limiter / TokenPool |
- 分布式锁:通过单键或多键原子操作保证并发安全
- 限流/账号池:Lua 脚本保证跨进程/跨机器的原子性,精确到毫秒
pip install redis-xsync
---
## 🚀 快速开始
### 1️⃣ 注册 Redis
```python
from redis_xsync import register_redis, resolve_redis, redisXsync
redis1 = register_redis(
label="redis_xsync",
host="127.0.0.1",
port=6379,
db=0,
password=None,
decode_responses=True
)
# 通过 label 获取
redis2 = resolve_redis("redis_xsync")
# 使用方式说明:
# 1️⃣ 全局默认实例 redisXsync
# - 在首次调用 register_redis 注册任意 Redis 时,首次注册会自动赋值给 redisXsync,非并发安全
# - 后续可以直接使用 redisXsync,无需通过 label 获取
#
# 2️⃣ 通过 label 显式管理 Redis
# - register_redis(label=..., ...) 注册 Redis 实例
# - resolve_redis(label) 根据 label 获取对应 Redis 实例
#
# 3️⃣ 独立实例
# - 可以直接创建 RedisXsync() 实例,不必参与 register_redis 、resolve_redis、 redisXsync 全局管理
# - 适合临时或隔离使用场景with redis1(db=1) as r:
r.setnx("key", "value")
print(r.get("key"))import asyncio
async def main():
async with redis1(db=1) as r:
await r.setnx("key", "value")
print(await r.get("key"))
asyncio.run(main())- 多 key 原子加锁
- 所有 key 释放才算解锁
- 支持阻塞等待
import threading
import asyncio
from redis_xsync import register_redis
cx = 0
redis = register_redis(label="redis_xsync", host="127.0.0.1", port=6379)
Lock = redis.AsyncRedisAtomicMultiLock
def sync_worker():
global cx
for _ in range(5):
with Lock("lock_key", db=0, ttl_ms=10, blocking=True):
with redis(db=2) as r:
r.setnx("db2", str(cx))
cx += 1
print(f"[SYNC] {cx=}")
async def async_worker():
global cx
for _ in range(5):
with Lock("lock_key", db=0, ttl_ms=10, blocking=True):
async with redis(db=3) as r:
await r.setnx("db3", str(cx))
cx += 1
print(f"[ASYNC] {cx=}")
await asyncio.sleep(0.2)
# 启动线程
threading.Thread(target=sync_worker).start()
# 启动协程
asyncio.run(async_worker())基于 RediSearch + Lua,实现分布式限流 + 自动等待可用账号
📦 Limited — RediSearch 容器操作方法对照表
| 方法 | 异步 | 阻塞 | 遵循限流 | 数量 | 描述 |
|---|---|---|---|---|---|
set_available_containers(*containers) |
✅ | ❌ | ✅ | N | 存储/更新容器配置(生产者) |
ask_available_containers(quantity) |
✅ | ❌ | ✅ | N | 获取 N 个可用容器,若无返回 None |
ask_available_container() |
✅ | ❌ | ✅ | 1 | 获取单个可用容器,若无返回 None |
wait_ask_available_containers(quantity, timeout) |
✅ | ✅ | ✅ | N | 阻塞等待 N 个可用容器 |
wait_ask_available_container(timeout) |
✅ | ✅ | ✅ | 1 | 阻塞等待单个可用容器 |
acquire_random_containers(quantity) |
✅ | ❌ | ❌ | N | 随机获取 N 个容器,忽略限流 |
acquire_random_container() |
✅ | ❌ | ❌ | 1 | 随机获取单个容器,忽略限流 |
wait_acquire_random_containers(quantity, timeout) |
✅ | ✅ | ❌ | N | 阻塞等待 N 个随机容器,忽略限流 |
wait_acquire_random_container(timeout) |
✅ | ✅ | ❌ | 1 | 阻塞等待单个随机容器,忽略限流 |
LimitedModel 用于表示受限访问(rate-limited)的容器条目,每条数据包含唯一标识、时间戳、TTL、访问间隔和访问计数等信息。
| 字段 | 类型 | 默认值 | 描述 |
|---|---|---|---|
id |
str |
— | 数据唯一标识符,必须提供 |
ct |
int |
当前时间戳(毫秒) | 记录条目的收集时间,无需设置 |
ttl_ms |
int |
0 |
条目的生存时间(毫秒),0 表示无限期 |
interval_ms |
int |
0 |
访问间隔(毫秒),0 表示不限速 |
next_time_available |
int |
当前时间戳(毫秒) | 下次可访问时间,用于限流控制,无需设置 |
containers |
str |
None |
存储容器数据,可为空 |
usage_count |
int |
0 |
条目被访问的次数,无需设置 |
ct和next_time_available都以毫秒为单位,便于高精度限流。interval_ms > 0时,访问条目会更新next_time_available为当前时间 + 间隔。- 每次访问条目时,都会通过
usage_count记录被访问次数,以统计和调度。 ttl_ms可配合 Redis 等存储设置过期时间,实现自动清理。
import random
from redis_xsync import register_redis
from redis_xsync.rtypes import LimitedModel
redis = register_redis(label="redis_xsync", host="127.0.0.1", port=6379)
Limited = redis.Limited
async def produce_container():
async with Limited(redis_key="crawler:identity:flow_limit") as lt:
for i in range(10):
token = LimitedModel(
id=f"user_{i}",
ttl_ms=random.randint(30000, 60000),
interval_ms=random.randint(1000, 6000),
containers="cookies"
)
await lt.set_available_containers(token)async def consume_token():
async with Limited(redis_key="crawler:identity:flow_limit") as lt:
# 等待可用 token,超时 10 秒
token = await lt.wait_ask_available_containers(quantity=1, timeout=10)
print("获取到账号:", token)def consume_sync():
with Limited(redis_key="crawler:identity:flow_limit") as lt:
token = lt.wait_ask_available_containers(quantity=1, timeout=10)
print("获取到账号:", token)- produce_container 写入 token 到 Redis,带过期时间和间隔限流
- consume_token 或 consume_sync 从 Redis 中获取可用 token,如果当前没有可用,会等待直到 timeout
- 支持异步和同步,且保证跨进程 / 跨机器原子性
- 基于 Redis 原子操作
- 支持多 key(组合锁)
- 防止并发冲突
-
每个 token:
- 独立 TTL
- 独立间隔(interval)
-
Lua 保证原子性
-
支持多进程 / 多机器共享
import asyncio
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())RedisXsync 基于标准 Redis 协议(RESP)实现,理论上兼容大多数 Redis 版本(6.x / 7.x / 8.x)。 但在实际生产环境中,建议使用较新版本以获得更好的稳定性与性能。
推荐版本: Redis ≥ 6.0 最佳体验: Redis 7.x 及以上 不推荐: Redis ≤ 5.x
- 官方支持策略 Redis 官方逐步停止对旧版本的维护(EOL),不再提供安全更新与问题修复。
- 功能与性能差异 Redis 7.x / 8.x 在性能、命令能力以及 Lua 执行方面更加完善。
- Lua 脚本依赖(关键) RedisXsync 在分布式锁与限流中大量依赖 Lua 原子操作, 旧版本 Redis 在脚本执行效率与行为一致性上可能存在问题。
✔ 推荐组合:
- Redis 7.x + RedisXsync ← 最稳定
✔ 可用组合:
- Redis 6.x + RedisXsync ← 兼容良好
❌ 不推荐:
- Redis 5.x 及以下 ← 可能存在兼容或性能问题
- RedisXsync 不依赖 Redis Cluster,单机 Redis 即可运行
- 完整支持以下能力: 1.分布式锁 2.限流(基于 Lua) 3.多实例 / 多 DB 管理
👉 在生产环境中,建议统一使用较新版本 Redis,以避免潜在兼容问题并获得最佳性能。
- 单机非集群
- 爬虫账号池
- 接口限流
- 分布式任务调度
- 多服务并发控制
欢迎 Issue / PR!
MIT License