-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Added support for Lag-Aware Healthcheck and OSS Cluster API #3768
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
b26a03e
Extract additional interfaces and abstract classes
vladvildanov 6bc4f71
Merge branch 'feat/active-active' of github.com:redis/redis-py into v…
vladvildanov bad9bcc
Added base async components
vladvildanov ae42bea
Added command executor
vladvildanov 8fc74b9
Added recurring background tasks with event loop only
vladvildanov 97c3cde
Added MultiDBClient
vladvildanov e376544
Added scenario and config tests
vladvildanov 57f6d8b
Added pipeline and transaction support for MultiDBClient
vladvildanov 25eebb9
Added pub/sub support for MultiDBClient
vladvildanov a82d8e7
Added check for couroutines methods for pub/sub
vladvildanov d38fb0a
Added OSS Cluster API support for MultiDBCLient
vladvildanov 54db16b
Added support for Lag-Aware Healthcheck and OSS Cluster API
vladvildanov 80e253a
Merge branch 'feat/active-active' of github.com:redis/redis-py into v…
vladvildanov 4431861
Increased timeouts between tests
vladvildanov 8a866b0
Fixed space
vladvildanov File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,216 @@ | ||
import asyncio | ||
from abc import ABC, abstractmethod | ||
from concurrent.futures import ThreadPoolExecutor | ||
from typing import Optional, Mapping, Union, Any | ||
from redis.http.http_client import HttpResponse, HttpClient | ||
|
||
DEFAULT_USER_AGENT = "HttpClient/1.0 (+https://example.invalid)" | ||
DEFAULT_TIMEOUT = 30.0 | ||
RETRY_STATUS_CODES = {429, 500, 502, 503, 504} | ||
|
||
class AsyncHTTPClient(ABC): | ||
@abstractmethod | ||
async def get( | ||
self, | ||
path: str, | ||
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None, | ||
headers: Optional[Mapping[str, str]] = None, | ||
timeout: Optional[float] = None, | ||
expect_json: bool = True | ||
) -> Union[HttpResponse, Any]: | ||
""" | ||
Invoke HTTP GET request.""" | ||
pass | ||
|
||
@abstractmethod | ||
async def delete( | ||
self, | ||
path: str, | ||
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None, | ||
headers: Optional[Mapping[str, str]] = None, | ||
timeout: Optional[float] = None, | ||
expect_json: bool = True | ||
) -> Union[HttpResponse, Any]: | ||
""" | ||
Invoke HTTP DELETE request.""" | ||
pass | ||
|
||
@abstractmethod | ||
async def post( | ||
self, | ||
path: str, | ||
json_body: Optional[Any] = None, | ||
data: Optional[Union[bytes, str]] = None, | ||
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None, | ||
headers: Optional[Mapping[str, str]] = None, | ||
timeout: Optional[float] = None, | ||
expect_json: bool = True | ||
) -> Union[HttpResponse, Any]: | ||
""" | ||
Invoke HTTP POST request.""" | ||
pass | ||
|
||
@abstractmethod | ||
async def put( | ||
self, | ||
path: str, | ||
json_body: Optional[Any] = None, | ||
data: Optional[Union[bytes, str]] = None, | ||
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None, | ||
headers: Optional[Mapping[str, str]] = None, | ||
timeout: Optional[float] = None, | ||
expect_json: bool = True | ||
) -> Union[HttpResponse, Any]: | ||
""" | ||
Invoke HTTP PUT request.""" | ||
pass | ||
|
||
@abstractmethod | ||
async def patch( | ||
self, | ||
path: str, | ||
json_body: Optional[Any] = None, | ||
data: Optional[Union[bytes, str]] = None, | ||
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None, | ||
headers: Optional[Mapping[str, str]] = None, | ||
timeout: Optional[float] = None, | ||
expect_json: bool = True | ||
) -> Union[HttpResponse, Any]: | ||
""" | ||
Invoke HTTP PATCH request.""" | ||
pass | ||
|
||
@abstractmethod | ||
async def request( | ||
self, | ||
method: str, | ||
path: str, | ||
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None, | ||
headers: Optional[Mapping[str, str]] = None, | ||
body: Optional[Union[bytes, str]] = None, | ||
timeout: Optional[float] = None, | ||
) -> HttpResponse: | ||
""" | ||
Invoke HTTP request with given method.""" | ||
pass | ||
|
||
class AsyncHTTPClientWrapper(AsyncHTTPClient): | ||
""" | ||
An async wrapper around sync HTTP client with thread pool execution. | ||
""" | ||
def __init__( | ||
self, | ||
client: HttpClient, | ||
max_workers: int = 10 | ||
) -> None: | ||
""" | ||
Initialize a new HTTP client instance. | ||
|
||
Args: | ||
client: Sync HTTP client instance. | ||
max_workers: Maximum number of concurrent requests. | ||
|
||
The client supports both regular HTTPS with server verification and mutual TLS | ||
authentication. For server verification, provide CA certificate information via | ||
ca_file, ca_path or ca_data. For mutual TLS, additionally provide a client | ||
certificate and key via client_cert_file and client_key_file. | ||
""" | ||
self.client = client | ||
self._executor = ThreadPoolExecutor(max_workers=max_workers) | ||
|
||
async def get( | ||
self, | ||
path: str, | ||
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None, | ||
headers: Optional[Mapping[str, str]] = None, | ||
timeout: Optional[float] = None, | ||
expect_json: bool = True | ||
) -> Union[HttpResponse, Any]: | ||
loop = asyncio.get_event_loop() | ||
return await loop.run_in_executor( | ||
self._executor, | ||
self.client.get, | ||
path, params, headers, timeout, expect_json | ||
) | ||
|
||
async def delete( | ||
self, | ||
path: str, | ||
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None, | ||
headers: Optional[Mapping[str, str]] = None, | ||
timeout: Optional[float] = None, | ||
expect_json: bool = True | ||
) -> Union[HttpResponse, Any]: | ||
loop = asyncio.get_event_loop() | ||
return await loop.run_in_executor( | ||
self._executor, | ||
self.client.delete, | ||
path, params, headers, timeout, expect_json | ||
) | ||
|
||
async def post( | ||
self, | ||
path: str, | ||
json_body: Optional[Any] = None, | ||
data: Optional[Union[bytes, str]] = None, | ||
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None, | ||
headers: Optional[Mapping[str, str]] = None, | ||
timeout: Optional[float] = None, | ||
expect_json: bool = True | ||
) -> Union[HttpResponse, Any]: | ||
loop = asyncio.get_event_loop() | ||
return await loop.run_in_executor( | ||
self._executor, | ||
self.client.post, | ||
path, json_body, data, params, headers, timeout, expect_json | ||
) | ||
|
||
async def put( | ||
self, | ||
path: str, | ||
json_body: Optional[Any] = None, | ||
data: Optional[Union[bytes, str]] = None, | ||
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None, | ||
headers: Optional[Mapping[str, str]] = None, | ||
timeout: Optional[float] = None, | ||
expect_json: bool = True | ||
) -> Union[HttpResponse, Any]: | ||
loop = asyncio.get_event_loop() | ||
return await loop.run_in_executor( | ||
self._executor, | ||
self.client.put, | ||
path, json_body, data, params, headers, timeout, expect_json | ||
) | ||
|
||
async def patch( | ||
self, | ||
path: str, | ||
json_body: Optional[Any] = None, | ||
data: Optional[Union[bytes, str]] = None, | ||
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None, | ||
headers: Optional[Mapping[str, str]] = None, | ||
timeout: Optional[float] = None, | ||
expect_json: bool = True | ||
) -> Union[HttpResponse, Any]: | ||
loop = asyncio.get_event_loop() | ||
return await loop.run_in_executor( | ||
self._executor, | ||
self.client.patch, | ||
path, json_body, data, params, headers, timeout, expect_json | ||
) | ||
|
||
async def request( | ||
self, | ||
method: str, | ||
path: str, | ||
params: Optional[Mapping[str, Union[None, str, int, float, bool, list, tuple]]] = None, | ||
headers: Optional[Mapping[str, str]] = None, | ||
body: Optional[Union[bytes, str]] = None, | ||
timeout: Optional[float] = None, | ||
) -> HttpResponse: | ||
loop = asyncio.get_event_loop() | ||
return await loop.run_in_executor( | ||
self._executor, | ||
self.client.request, | ||
method, path, params, headers, body, timeout | ||
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.