A collection of helpful utilities for coredis.
- Caching decorator with thundering herd protection and error caching
- Idempotency keys
- Fixed-window rate limiting
$ pip install coredis-utilsFirst, create a CoredisUtils object wrapping a coredis.Redis instance:
from coredis import Redis
from coredis_utils import CoredisUtils
client = Redis(...)
utils = CoredisUtils(client)Caching is implemented with a decorator:
@utils.cached(ttl=60)
async def my_task() -> int: ...Idempotency uses a simple check:
if await utils.idempotent("my-key", ttl=60):
... # code in this block can only run onceRate limiting is similar:
for _ in range(10):
if await utils.limit("my-ip-addr", 5, 1): # limit to 5/second
print("success")success
success
success
success
success
Cache keys are generated using a SHA256 hash of pickled arguments. You can exclude non-serializable arguments from cache key construction:
from sqlalchemy.ext.asyncio import AsyncSession
@utils.cached(ttl=60, exclude={"session"})
async def my_task(session: AsyncSession) -> int: ...You can also customize which parts of arguments get hashed:
@utils.cached(
ttl=60,
key_fns={
# hash just the ID, not the entire model
"user": lambda u: u.id,
# hash a couple relevant fields
"message": lambda m: (m.type, m.timestamp),
},
)
async def my_task(user: User, message: Message) -> int: ...Errors can be cached and propagated just like normal responses:
@utils.cached(ttl=60, error_ttl=5)
async def my_task() -> int:
raise Exception("Oh no!")You can easily invalidate keys by passing the same arguments:
@utils.cached(ttl=60)
async def my_task(time: int) -> int: ...
await my_task.invalidate(3)