-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix scan iter command issued to different replicas #3220
base: master
Are you sure you want to change the base?
Changes from all commits
698176d
b6e40b9
7696071
1c9e140
d4bfe2a
f78f270
7e03d33
19050fe
241033d
391f233
ee12e86
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,16 @@ | ||
import asyncio | ||
import random | ||
import weakref | ||
from typing import AsyncIterator, Iterable, Mapping, Optional, Sequence, Tuple, Type | ||
from typing import ( | ||
Any, | ||
AsyncIterator, | ||
Iterable, | ||
Mapping, | ||
Optional, | ||
Sequence, | ||
Tuple, | ||
Type, | ||
) | ||
|
||
from redis.asyncio.client import Redis | ||
from redis.asyncio.connection import ( | ||
|
@@ -66,6 +75,22 @@ async def connect(self): | |
lambda error: asyncio.sleep(0), | ||
) | ||
|
||
async def _connect_to_address_retry(self, host: str, port: int) -> None: | ||
if self._reader: | ||
return # already connected | ||
try: | ||
return await self.connect_to((host, port)) | ||
except ConnectionError: | ||
raise SlaveNotFoundError | ||
|
||
async def connect_to_address(self, host: str, port: int) -> None: | ||
# Connect to the specified host and port | ||
# instead of connecting to the master / rotated slaves | ||
return await self.retry.call_with_retry( | ||
lambda: self._connect_to_address_retry(host, port), | ||
lambda error: asyncio.sleep(0), | ||
) | ||
|
||
async def read_response( | ||
self, | ||
disable_decoding: bool = False, | ||
|
@@ -122,6 +147,7 @@ def __init__(self, service_name, sentinel_manager, **kwargs): | |
self.sentinel_manager = sentinel_manager | ||
self.master_address = None | ||
self.slave_rr_counter = None | ||
self._request_id_to_replica_address = {} | ||
|
||
def __repr__(self): | ||
return ( | ||
|
@@ -167,6 +193,92 @@ async def rotate_slaves(self) -> AsyncIterator: | |
pass | ||
raise SlaveNotFoundError(f"No slave found for {self.service_name!r}") | ||
|
||
async def get_connection( | ||
self, command_name: str, *keys: Any, **options: Any | ||
) -> SentinelManagedConnection: | ||
""" | ||
Get a connection from the pool. | ||
'xxxscan_iter' ('scan_iter', 'hscan_iter', 'sscan_iter', 'zscan_iter') | ||
commands needs to be handled specially. | ||
If the client is created using a connection pool, in replica mode, | ||
all 'scan' command-equivalent of the 'xxx_scan_iter' commands needs | ||
to be issued to the same Redis replica. | ||
|
||
The way each server positions each key is different with one another, | ||
and the cursor acts as the offset of the scan. | ||
Hence, all scans coming from a single 'xxx_scan_iter_channel' command | ||
should go to the same replica. | ||
""" | ||
# If not an iter command or in master mode, call superclass' implementation | ||
if not (iter_req_id := options.get("_iter_req_id", None)) or self.is_master: | ||
return await super().get_connection(command_name, *keys, **options) | ||
|
||
# Check if this iter request has already been directed to a particular server | ||
( | ||
server_host, | ||
server_port, | ||
) = self._request_id_to_replica_address.get(iter_req_id, (None, None)) | ||
connection = None | ||
# If this is the first scan request of the iter command, | ||
# get a connection from the pool | ||
if server_host is None or server_port is None: | ||
try: | ||
connection = self._available_connections.pop() | ||
except IndexError: | ||
connection = self.make_connection() | ||
# If this is not the first scan request of the iter command | ||
else: | ||
# Check from the available connections, if any of the connection | ||
# is connected to the host and port that we want | ||
for available_connection in self._available_connections.copy(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if there are many connections? This linear search might slow down things. |
||
# if yes, use that connection | ||
if ( | ||
available_connection.host == server_host | ||
and available_connection.port == server_port | ||
): | ||
self._available_connections.remove(available_connection) | ||
connection = available_connection | ||
# If not, make a new dummy connection object, and set its host and port | ||
# to the one that we want later in the call to ``connect_to_address`` | ||
if not connection: | ||
connection = self.make_connection() | ||
assert connection | ||
self._in_use_connections.add(connection) | ||
try: | ||
# Ensure this connection is connected to Redis | ||
# If this is the first scan request, it will | ||
# call rotate_slaves and connect to a random replica | ||
if server_port is None or server_port is None: | ||
await connection.connect() | ||
# If this is not the first scan request, | ||
# connect to the previous replica. | ||
# This will connect to the host and port of the replica | ||
else: | ||
await connection.connect_to_address(server_host, server_port) | ||
# Connections that the pool provides should be ready to send | ||
# a command. If not, the connection was either returned to the | ||
# pool before all data has been read or the socket has been | ||
# closed. Either way, reconnect and verify everything is good. | ||
try: | ||
if await connection.can_read_destructive(): | ||
raise ConnectionError("Connection has data") from None | ||
except (ConnectionError, OSError): | ||
await connection.disconnect() | ||
await connection.connect() | ||
if await connection.can_read_destructive(): | ||
raise ConnectionError("Connection not ready") from None | ||
except BaseException: | ||
# Release the connection back to the pool so that we don't | ||
# leak it | ||
await self.release(connection) | ||
raise | ||
# Store the connection to the dictionary | ||
self._request_id_to_replica_address[iter_req_id] = ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When would these entries be removed from the dict? So that it does not grow indefinitely. |
||
connection.host, | ||
connection.port, | ||
) | ||
return connection | ||
|
||
|
||
class Sentinel(AsyncSentinelCommands): | ||
""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to use a more general name, e.g. instead of
request_id
usecontext_id
? Would it express better the fact that you are basically trying to run several commands in the same context?