Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for cluster myshardid #2704

Merged
merged 11 commits into from
May 8, 2023
112 changes: 60 additions & 52 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def parse_pubsub_numsub(command, res, **options):


def parse_cluster_slots(
resp: Any, **options: Any
resp: Any, **options: Any
) -> Dict[Tuple[int, int], Dict[str, Any]]:
current_host = options.get("current_host", "")

Expand Down Expand Up @@ -113,6 +113,13 @@ def parse_cluster_shards(resp, **options):
return shards


def parse_cluster_myshardid(resp, **options):
"""
Parse CLUSTER MYSHARDID response.
"""
return resp.decode('utf-8')


PRIMARY = "primary"
REPLICA = "replica"
SLOT_ID = "slot-id"
Expand Down Expand Up @@ -341,6 +348,7 @@ class AbstractRedisCluster:
CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {
"CLUSTER SLOTS": parse_cluster_slots,
"CLUSTER SHARDS": parse_cluster_shards,
"CLUSTER MYSHARDID": parse_cluster_myshardid
}

RESULT_CALLBACKS = dict_merge(
Expand Down Expand Up @@ -455,18 +463,18 @@ class initializer. In the case of conflicting arguments, querystring
return cls(url=url, **kwargs)

def __init__(
self,
host: Optional[str] = None,
port: int = 6379,
startup_nodes: Optional[List["ClusterNode"]] = None,
cluster_error_retry_attempts: int = 3,
retry: Optional["Retry"] = None,
require_full_coverage: bool = False,
reinitialize_steps: int = 5,
read_from_replicas: bool = False,
dynamic_startup_nodes: bool = True,
url: Optional[str] = None,
**kwargs,
self,
host: Optional[str] = None,
port: int = 6379,
startup_nodes: Optional[List["ClusterNode"]] = None,
cluster_error_retry_attempts: int = 3,
retry: Optional["Retry"] = None,
require_full_coverage: bool = False,
reinitialize_steps: int = 5,
read_from_replicas: bool = False,
dynamic_startup_nodes: bool = True,
url: Optional[str] = None,
**kwargs,
):
"""
Initialize a new RedisCluster client.
Expand Down Expand Up @@ -762,14 +770,14 @@ def pipeline(self, transaction=None, shard_hint=None):
)

def lock(
self,
name,
timeout=None,
sleep=0.1,
blocking=True,
blocking_timeout=None,
lock_class=None,
thread_local=True,
self,
name,
timeout=None,
sleep=0.1,
blocking=True,
blocking_timeout=None,
lock_class=None,
thread_local=True,
):
"""
Return a new Lock object using key ``name`` that mimics
Expand Down Expand Up @@ -935,7 +943,7 @@ def determine_slot(self, *args):
if len(args) <= 2:
raise RedisClusterException(f"Invalid args in command: {args}")
num_actual_keys = args[2]
eval_keys = args[3 : 3 + num_actual_keys]
eval_keys = args[3: 3 + num_actual_keys]
# if there are 0 keys, that means the script can be run on any node
# so we can just return a random slot
if len(eval_keys) == 0:
Expand Down Expand Up @@ -1052,8 +1060,8 @@ def execute_command(self, *args, **kwargs):
f"No targets were found to execute {args} command on"
)
if (
len(target_nodes) == 1
and target_nodes[0] == self.get_default_node()
len(target_nodes) == 1
and target_nodes[0] == self.get_default_node()
):
is_default_node = True
for node in target_nodes:
Expand Down Expand Up @@ -1262,14 +1270,14 @@ def reset(self) -> None:

class NodesManager:
def __init__(
self,
startup_nodes,
from_url=False,
require_full_coverage=False,
lock=None,
dynamic_startup_nodes=True,
connection_pool_class=ConnectionPool,
**kwargs,
self,
startup_nodes,
from_url=False,
require_full_coverage=False,
lock=None,
dynamic_startup_nodes=True,
connection_pool_class=ConnectionPool,
**kwargs,
):
self.nodes_cache = {}
self.slots_cache = {}
Expand Down Expand Up @@ -1368,9 +1376,9 @@ def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None):
primary_name, len(self.slots_cache[slot])
)
elif (
server_type is None
or server_type == PRIMARY
or len(self.slots_cache[slot]) == 1
server_type is None
or server_type == PRIMARY
or len(self.slots_cache[slot]) == 1
):
# return a primary
node_idx = 0
Expand Down Expand Up @@ -1490,9 +1498,9 @@ def initialize(self):
# If there's only one server in the cluster, its ``host`` is ''
# Fix it to the host in startup_nodes
if (
len(cluster_slots) == 1
and len(cluster_slots[0][2][0]) == 0
and len(self.startup_nodes) == 1
len(cluster_slots) == 1
and len(cluster_slots[0][2][0]) == 0
and len(self.startup_nodes) == 1
):
cluster_slots[0][2][0] = startup_node.host

Expand Down Expand Up @@ -1734,25 +1742,25 @@ class ClusterPipeline(RedisCluster):
)

def __init__(
self,
nodes_manager: "NodesManager",
commands_parser: "CommandsParser",
result_callbacks: Optional[Dict[str, Callable]] = None,
cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
startup_nodes: Optional[List["ClusterNode"]] = None,
read_from_replicas: bool = False,
cluster_error_retry_attempts: int = 3,
reinitialize_steps: int = 5,
lock=None,
**kwargs,
self,
nodes_manager: "NodesManager",
commands_parser: "CommandsParser",
result_callbacks: Optional[Dict[str, Callable]] = None,
cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
startup_nodes: Optional[List["ClusterNode"]] = None,
read_from_replicas: bool = False,
cluster_error_retry_attempts: int = 3,
reinitialize_steps: int = 5,
lock=None,
**kwargs,
):
""" """
self.command_stack = []
self.nodes_manager = nodes_manager
self.commands_parser = commands_parser
self.refresh_table_asap = False
self.result_callbacks = (
result_callbacks or self.__class__.RESULT_CALLBACKS.copy()
result_callbacks or self.__class__.RESULT_CALLBACKS.copy()
)
self.startup_nodes = startup_nodes if startup_nodes else []
self.read_from_replicas = read_from_replicas
Expand Down Expand Up @@ -1875,7 +1883,7 @@ def reset(self):
# self.connection = None

def send_cluster_commands(
self, stack, raise_on_error=True, allow_redirections=True
self, stack, raise_on_error=True, allow_redirections=True
):
"""
Wrapper for CLUSTERDOWN error handling.
Expand Down Expand Up @@ -1912,7 +1920,7 @@ def send_cluster_commands(
raise e

def _send_cluster_commands(
self, stack, raise_on_error=True, allow_redirections=True
self, stack, raise_on_error=True, allow_redirections=True
):
"""
Send a bunch of cluster commands to the redis cluster.
Expand Down