-
-
Notifications
You must be signed in to change notification settings - Fork 44
/
redis.py
148 lines (125 loc) · 5.81 KB
/
redis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import typing as _t
from cachelib.base import BaseCache
from cachelib.serializers import RedisSerializer
class RedisCache(BaseCache):
"""Uses the Redis key-value store as a cache backend.
The first argument can be either a string denoting address of the Redis
server or an object resembling an instance of a redis.Redis class.
Note: Python Redis API already takes care of encoding unicode strings on
the fly.
:param host: address of the Redis server or an object which API is
compatible with the official Python Redis client (redis-py).
:param port: port number on which Redis server listens for connections.
:param password: password authentication for the Redis server.
:param db: db (zero-based numeric index) on Redis Server to connect.
:param default_timeout: the default timeout that is used if no timeout is
specified on :meth:`~BaseCache.set`. A timeout of
0 indicates that the cache never expires.
:param key_prefix: A prefix that should be added to all keys.
Any additional keyword arguments will be passed to ``redis.Redis``.
"""
_read_client: _t.Any = None
_write_client: _t.Any = None
serializer = RedisSerializer()
def __init__(
self,
host: _t.Any = "localhost",
port: int = 6379,
password: _t.Optional[str] = None,
db: int = 0,
default_timeout: int = 300,
key_prefix: _t.Optional[str] = None,
**kwargs: _t.Any
):
BaseCache.__init__(self, default_timeout)
if host is None:
raise ValueError("RedisCache host parameter may not be None")
if isinstance(host, str):
try:
import redis
except ImportError as err:
raise RuntimeError("no redis module found") from err
if kwargs.get("decode_responses", None):
raise ValueError("decode_responses is not supported by RedisCache.")
self._write_client = self._read_client = redis.Redis(
host=host, port=port, password=password, db=db, **kwargs
)
else:
self._read_client = self._write_client = host
self.key_prefix = key_prefix or ""
def _normalize_timeout(self, timeout: _t.Optional[int]) -> int:
"""Normalize timeout by setting it to default of 300 if
not defined (None) or -1 if explicitly set to zero.
:param timeout: timeout to normalize.
"""
timeout = BaseCache._normalize_timeout(self, timeout)
if timeout == 0:
timeout = -1
return timeout
def get(self, key: str) -> _t.Any:
return self.serializer.loads(self._read_client.get(self.key_prefix + key))
def get_many(self, *keys: str) -> _t.List[_t.Any]:
if self.key_prefix:
prefixed_keys = [self.key_prefix + key for key in keys]
else:
prefixed_keys = list(keys)
return [self.serializer.loads(x) for x in self._read_client.mget(prefixed_keys)]
def set(self, key: str, value: _t.Any, timeout: _t.Optional[int] = None) -> _t.Any:
timeout = self._normalize_timeout(timeout)
dump = self.serializer.dumps(value)
if timeout == -1:
result = self._write_client.set(name=self.key_prefix + key, value=dump)
else:
result = self._write_client.setex(
name=self.key_prefix + key, value=dump, time=timeout
)
return result
def add(self, key: str, value: _t.Any, timeout: _t.Optional[int] = None) -> _t.Any:
timeout = self._normalize_timeout(timeout)
dump = self.serializer.dumps(value)
created = self._write_client.setnx(name=self.key_prefix + key, value=dump)
# handle case where timeout is explicitly set to zero
if created and timeout != -1:
self._write_client.expire(name=self.key_prefix + key, time=timeout)
return created
def set_many(
self, mapping: _t.Dict[str, _t.Any], timeout: _t.Optional[int] = None
) -> _t.List[_t.Any]:
timeout = self._normalize_timeout(timeout)
# Use transaction=False to batch without calling redis MULTI
# which is not supported by twemproxy
pipe = self._write_client.pipeline(transaction=False)
for key, value in mapping.items():
dump = self.serializer.dumps(value)
if timeout == -1:
pipe.set(name=self.key_prefix + key, value=dump)
else:
pipe.setex(name=self.key_prefix + key, value=dump, time=timeout)
results = pipe.execute()
return [k for k, was_set in zip(mapping.keys(), results) if was_set]
def delete(self, key: str) -> bool:
return bool(self._write_client.delete(self.key_prefix + key))
def delete_many(self, *keys: str) -> _t.List[_t.Any]:
if not keys:
return []
if self.key_prefix:
prefixed_keys = [self.key_prefix + key for key in keys]
else:
prefixed_keys = [k for k in keys]
self._write_client.delete(*prefixed_keys)
return [k for k in prefixed_keys if not self.has(k)]
def has(self, key: str) -> bool:
return bool(self._read_client.exists(self.key_prefix + key))
def clear(self) -> bool:
status = 0
if self.key_prefix:
keys = self._read_client.keys(self.key_prefix + "*")
if keys:
status = self._write_client.delete(*keys)
else:
status = self._write_client.flushdb()
return bool(status)
def inc(self, key: str, delta: int = 1) -> _t.Any:
return self._write_client.incr(name=self.key_prefix + key, amount=delta)
def dec(self, key: str, delta: int = 1) -> _t.Any:
return self._write_client.incr(name=self.key_prefix + key, amount=-delta)