-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathutils.py
36 lines (23 loc) · 1.17 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from uuid import UUID
import redis.asyncio as aioredis
from passlib.context import CryptContext
from src.models import User
password_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_hashed_password(password: str) -> str:
return password_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return password_context.verify(plain_password, hashed_password)
async def clear_cache_for_get_messages(cache: aioredis.Redis, chat_guid: UUID):
pattern_for_get_messages = f"messages_{chat_guid}_*"
keys_found = cache.scan_iter(match=pattern_for_get_messages)
async for key in keys_found:
await cache.delete(key)
async def clear_cache_for_get_direct_chats(cache: aioredis.Redis, user: User):
pattern_for_get_direct_chats = f"direct_chats_{user.guid}"
keys_found = cache.scan_iter(match=pattern_for_get_direct_chats)
async for key in keys_found:
await cache.delete(key)
async def clear_cache_for_all_users(cache: aioredis.Redis):
keys_found = cache.scan_iter(match="*all_users")
async for key in keys_found:
await cache.delete(key)