Skip to content

Commit 481d89e

Browse files
authored
Added support for Lag-Aware Healthcheck and OSS Cluster API (#3768)
* Extract additional interfaces and abstract classes * Added base async components * Added command executor * Added recurring background tasks with event loop only * Added MultiDBClient * Added scenario and config tests * Added pipeline and transaction support for MultiDBClient * Added pub/sub support for MultiDBClient * Added check for couroutines methods for pub/sub * Added OSS Cluster API support for MultiDBCLient * Added support for Lag-Aware Healthcheck and OSS Cluster API * Increased timeouts between tests * Fixed space
1 parent 4817a26 commit 481d89e

File tree

14 files changed

+627
-124
lines changed

14 files changed

+627
-124
lines changed

redis/asyncio/cluster.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ def __init__(
404404
else:
405405
self._event_dispatcher = event_dispatcher
406406

407+
self.startup_nodes = startup_nodes
407408
self.nodes_manager = NodesManager(
408409
startup_nodes,
409410
require_full_coverage,
@@ -2199,7 +2200,8 @@ async def _reinitialize_on_error(self, error):
21992200
await self._pipe.cluster_client.nodes_manager.initialize()
22002201
self.reinitialize_counter = 0
22012202
else:
2202-
self._pipe.cluster_client.nodes_manager.update_moved_exception(error)
2203+
if type(error) == MovedError:
2204+
self._pipe.cluster_client.nodes_manager.update_moved_exception(error)
22032205

22042206
self._executing = False
22052207

redis/asyncio/http/__init__.py

Whitespace-only changes.

redis/asyncio/http/http_client.py

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
import asyncio
2+
from abc import ABC, abstractmethod
3+
from concurrent.futures import ThreadPoolExecutor
4+
from typing import Optional, Mapping, Union, Any
5+
from redis.http.http_client import HttpResponse, HttpClient
6+
7+
DEFAULT_USER_AGENT = "HttpClient/1.0 (+https://example.invalid)"
8+
DEFAULT_TIMEOUT = 30.0
9+
RETRY_STATUS_CODES = {429, 500, 502, 503, 504}
10+
11+
class AsyncHTTPClient(ABC):
12+
@abstractmethod
13+
async def get(
14+
self,
15+
path: str,
16+
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None,
17+
headers: Optional[Mapping[str, str]] = None,
18+
timeout: Optional[float] = None,
19+
expect_json: bool = True
20+
) -> Union[HttpResponse, Any]:
21+
"""
22+
Invoke HTTP GET request."""
23+
pass
24+
25+
@abstractmethod
26+
async def delete(
27+
self,
28+
path: str,
29+
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None,
30+
headers: Optional[Mapping[str, str]] = None,
31+
timeout: Optional[float] = None,
32+
expect_json: bool = True
33+
) -> Union[HttpResponse, Any]:
34+
"""
35+
Invoke HTTP DELETE request."""
36+
pass
37+
38+
@abstractmethod
39+
async def post(
40+
self,
41+
path: str,
42+
json_body: Optional[Any] = None,
43+
data: Optional[Union[bytes, str]] = None,
44+
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None,
45+
headers: Optional[Mapping[str, str]] = None,
46+
timeout: Optional[float] = None,
47+
expect_json: bool = True
48+
) -> Union[HttpResponse, Any]:
49+
"""
50+
Invoke HTTP POST request."""
51+
pass
52+
53+
@abstractmethod
54+
async def put(
55+
self,
56+
path: str,
57+
json_body: Optional[Any] = None,
58+
data: Optional[Union[bytes, str]] = None,
59+
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None,
60+
headers: Optional[Mapping[str, str]] = None,
61+
timeout: Optional[float] = None,
62+
expect_json: bool = True
63+
) -> Union[HttpResponse, Any]:
64+
"""
65+
Invoke HTTP PUT request."""
66+
pass
67+
68+
@abstractmethod
69+
async def patch(
70+
self,
71+
path: str,
72+
json_body: Optional[Any] = None,
73+
data: Optional[Union[bytes, str]] = None,
74+
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None,
75+
headers: Optional[Mapping[str, str]] = None,
76+
timeout: Optional[float] = None,
77+
expect_json: bool = True
78+
) -> Union[HttpResponse, Any]:
79+
"""
80+
Invoke HTTP PATCH request."""
81+
pass
82+
83+
@abstractmethod
84+
async def request(
85+
self,
86+
method: str,
87+
path: str,
88+
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None,
89+
headers: Optional[Mapping[str, str]] = None,
90+
body: Optional[Union[bytes, str]] = None,
91+
timeout: Optional[float] = None,
92+
) -> HttpResponse:
93+
"""
94+
Invoke HTTP request with given method."""
95+
pass
96+
97+
class AsyncHTTPClientWrapper(AsyncHTTPClient):
98+
"""
99+
An async wrapper around sync HTTP client with thread pool execution.
100+
"""
101+
def __init__(
102+
self,
103+
client: HttpClient,
104+
max_workers: int = 10
105+
) -> None:
106+
"""
107+
Initialize a new HTTP client instance.
108+
109+
Args:
110+
client: Sync HTTP client instance.
111+
max_workers: Maximum number of concurrent requests.
112+
113+
The client supports both regular HTTPS with server verification and mutual TLS
114+
authentication. For server verification, provide CA certificate information via
115+
ca_file, ca_path or ca_data. For mutual TLS, additionally provide a client
116+
certificate and key via client_cert_file and client_key_file.
117+
"""
118+
self.client = client
119+
self._executor = ThreadPoolExecutor(max_workers=max_workers)
120+
121+
async def get(
122+
self,
123+
path: str,
124+
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None,
125+
headers: Optional[Mapping[str, str]] = None,
126+
timeout: Optional[float] = None,
127+
expect_json: bool = True
128+
) -> Union[HttpResponse, Any]:
129+
loop = asyncio.get_event_loop()
130+
return await loop.run_in_executor(
131+
self._executor,
132+
self.client.get,
133+
path, params, headers, timeout, expect_json
134+
)
135+
136+
async def delete(
137+
self,
138+
path: str,
139+
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None,
140+
headers: Optional[Mapping[str, str]] = None,
141+
timeout: Optional[float] = None,
142+
expect_json: bool = True
143+
) -> Union[HttpResponse, Any]:
144+
loop = asyncio.get_event_loop()
145+
return await loop.run_in_executor(
146+
self._executor,
147+
self.client.delete,
148+
path, params, headers, timeout, expect_json
149+
)
150+
151+
async def post(
152+
self,
153+
path: str,
154+
json_body: Optional[Any] = None,
155+
data: Optional[Union[bytes, str]] = None,
156+
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None,
157+
headers: Optional[Mapping[str, str]] = None,
158+
timeout: Optional[float] = None,
159+
expect_json: bool = True
160+
) -> Union[HttpResponse, Any]:
161+
loop = asyncio.get_event_loop()
162+
return await loop.run_in_executor(
163+
self._executor,
164+
self.client.post,
165+
path, json_body, data, params, headers, timeout, expect_json
166+
)
167+
168+
async def put(
169+
self,
170+
path: str,
171+
json_body: Optional[Any] = None,
172+
data: Optional[Union[bytes, str]] = None,
173+
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None,
174+
headers: Optional[Mapping[str, str]] = None,
175+
timeout: Optional[float] = None,
176+
expect_json: bool = True
177+
) -> Union[HttpResponse, Any]:
178+
loop = asyncio.get_event_loop()
179+
return await loop.run_in_executor(
180+
self._executor,
181+
self.client.put,
182+
path, json_body, data, params, headers, timeout, expect_json
183+
)
184+
185+
async def patch(
186+
self,
187+
path: str,
188+
json_body: Optional[Any] = None,
189+
data: Optional[Union[bytes, str]] = None,
190+
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None,
191+
headers: Optional[Mapping[str, str]] = None,
192+
timeout: Optional[float] = None,
193+
expect_json: bool = True
194+
) -> Union[HttpResponse, Any]:
195+
loop = asyncio.get_event_loop()
196+
return await loop.run_in_executor(
197+
self._executor,
198+
self.client.patch,
199+
path, json_body, data, params, headers, timeout, expect_json
200+
)
201+
202+
async def request(
203+
self,
204+
method: str,
205+
path: str,
206+
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None,
207+
headers: Optional[Mapping[str, str]] = None,
208+
body: Optional[Union[bytes, str]] = None,
209+
timeout: Optional[float] = None,
210+
) -> HttpResponse:
211+
loop = asyncio.get_event_loop()
212+
return await loop.run_in_executor(
213+
self._executor,
214+
self.client.request,
215+
method, path, params, headers, body, timeout
216+
)

redis/asyncio/multidb/client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import logging
23
from typing import Callable, Optional, Coroutine, Any, List, Union, Awaitable
34

45
from redis.asyncio.client import PubSubHandler
@@ -13,6 +14,7 @@
1314
from redis.multidb.exception import NoValidDatabaseException
1415
from redis.typing import KeyT, EncodableT, ChannelT
1516

17+
logger = logging.getLogger(__name__)
1618

1719
class MultiDBClient(AsyncRedisModuleCommands, AsyncCoreCommands):
1820
"""
@@ -274,6 +276,8 @@ async def _check_db_health(
274276
database.circuit.state = CBState.OPEN
275277
is_healthy = False
276278

279+
logger.exception('Health check failed, due to exception', exc_info=e)
280+
277281
if on_error:
278282
await on_error(e)
279283

redis/asyncio/multidb/command_executor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from datetime import datetime
44
from typing import List, Optional, Callable, Any, Union, Awaitable
55

6+
from redis.asyncio import RedisCluster
67
from redis.asyncio.client import PubSub, Pipeline
78
from redis.asyncio.multidb.database import Databases, AsyncDatabase, Database
89
from redis.asyncio.multidb.event import AsyncActiveDatabaseChanged, RegisterCommandFailure, \
@@ -181,6 +182,9 @@ def command_retry(self) -> Retry:
181182

182183
def pubsub(self, **kwargs):
183184
if self._active_pubsub is None:
185+
if isinstance(self._active_database.client, RedisCluster):
186+
raise ValueError("PubSub is not supported for RedisCluster")
187+
184188
self._active_pubsub = self._active_database.client.pubsub(**kwargs)
185189
self._active_pubsub_kwargs = kwargs
186190

0 commit comments

Comments
 (0)