diff --git a/.gitignore b/.gitignore index 7184ad4e20..aaef11682a 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,4 @@ docker/stunnel/keys /dockers/replica/ /dockers/sentinel/ /dockers/redis-stack/ +/experimenting/ diff --git a/docker-compose.yml b/docker-compose.yml index 1699cf61af..c0d5acf0ef 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,11 +1,14 @@ --- # image tag 8.0-RC2-pre is the one matching the 8.0 GA release x-client-libs-stack-image: &client-libs-stack-image - image: "redislabs/client-libs-test:${CLIENT_LIBS_TEST_STACK_IMAGE_TAG:-8.4-RC1-pre.2}" + image: "redislabs/client-libs-test:${CLIENT_LIBS_TEST_STACK_IMAGE_TAG:-8.4-GA-pre.2}" x-client-libs-image: &client-libs-image - image: "redislabs/client-libs-test:${CLIENT_LIBS_TEST_IMAGE_TAG:-8.4-RC1-pre.2}" + image: "redislabs/client-libs-test:${CLIENT_LIBS_TEST_IMAGE_TAG:-8.4-GA-pre.2}" +networks: + redis-net: + driver: bridge services: redis: @@ -106,3 +109,42 @@ services: - standalone - all-stack - all + + redis-proxied: + <<: *client-libs-image + container_name: redis-proxied + ports: + - "3000:3000" + networks: + - redis-net + healthcheck: + test: ["CMD", "redis-cli", "-p", "3000", "PING"] + interval: 10s + timeout: 3s + retries: 5 + + resp-proxy: + image: redislabs/client-resp-proxy + container_name: resp-proxy + environment: + LISTEN_HOST: "0.0.0.0" + LISTEN_PORT: "15379,15380,15381" + TARGET_HOST: "redis-proxied" + TARGET_PORT: "3000" + API_PORT: "4000" + ENABLE_LOGGING: true + SIMULATE_CLUSTER: true + ports: + - "15379:15379" + - "15380:15380" + - "15381:15381" + - "4000:4000" + depends_on: + - redis-proxied + networks: + - redis-net + healthcheck: + test: ["CMD", "sh", "-c", "wget -qO- http://localhost:4000/stats || exit 1"] + interval: 10s + timeout: 3s + retries: 5 diff --git a/redis/client.py b/redis/client.py index d3ab3cfcfe..e2712fc3f8 100755 --- a/redis/client.py +++ b/redis/client.py @@ -281,7 +281,7 @@ def __init__( if `True`, the response will be decoded to utf-8. Argument is ignored when connection_pool is provided. maint_notifications_config: - configuration the pool to support maintenance notifications - see + configures the pool to support maintenance notifications - see `redis.maint_notifications.MaintNotificationsConfig` for details. Only supported with RESP3 If not provided and protocol is RESP3, the maintenance notifications diff --git a/redis/cluster.py b/redis/cluster.py index 33b54b1bed..f06000563a 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -196,6 +196,7 @@ def parse_cluster_myshardid(resp, **options): "username", "cache", "cache_config", + "maint_notifications_config", ) KWARGS_DISABLED_KEYS = ("host", "port", "retry") @@ -535,6 +536,7 @@ def __init__( cache_config: Optional[CacheConfig] = None, event_dispatcher: Optional[EventDispatcher] = None, policy_resolver: PolicyResolver = StaticPolicyResolver(), + maint_notifications_config: Optional[MaintNotificationsConfig] = None, **kwargs, ): """ @@ -605,6 +607,13 @@ def __init__( which the nodes _think_ they are, to addresses at which a client may reach them, such as when they sit behind a proxy. + :param maint_notifications_config: + Configures the nodes connections to support maintenance notifications - see + `redis.maint_notifications.MaintNotificationsConfig` for details. + Only supported with RESP3. + If not provided and protocol is RESP3, the maintenance notifications + will be enabled by default (logic is included in the NodesManager + initialization). :**kwargs: Extra arguments that will be sent into Redis instance when created (See Official redis-py doc for supported kwargs - the only limitation @@ -709,6 +718,7 @@ def __init__( cache=cache, cache_config=cache_config, event_dispatcher=self._event_dispatcher, + maint_notifications_config=maint_notifications_config, **kwargs, ) @@ -1622,6 +1632,9 @@ def __init__( cache_config: Optional[CacheConfig] = None, cache_factory: Optional[CacheFactoryInterface] = None, event_dispatcher: Optional[EventDispatcher] = None, + maint_notifications_config: Optional[ + MaintNotificationsConfig + ] = MaintNotificationsConfig(), **kwargs, ): self.nodes_cache: Dict[str, Redis] = {} @@ -1650,6 +1663,7 @@ def __init__( self._credential_provider = self.connection_kwargs.get( "credential_provider", None ) + self.maint_notifications_config = maint_notifications_config self.initialize() def get_node(self, host=None, port=None, node_name=None): @@ -1797,7 +1811,10 @@ def create_redis_connections(self, nodes): for node in nodes: if node.redis_connection is None: node.redis_connection = self.create_redis_node( - host=node.host, port=node.port, **self.connection_kwargs + host=node.host, + port=node.port, + maint_notifications_config=self.maint_notifications_config, + **self.connection_kwargs, ) connection_pools.append(node.redis_connection.connection_pool) @@ -1807,7 +1824,12 @@ def create_redis_connections(self, nodes): ) ) - def create_redis_node(self, host, port, **kwargs): + def create_redis_node( + self, + host, + port, + **kwargs, + ): # We are configuring the connection pool not to retry # connections on lower level clients to avoid retrying # connections to nodes that are not reachable @@ -1821,13 +1843,8 @@ def create_redis_node(self, host, port, **kwargs): backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,) ) - protocol = kwargs.get("protocol", None) - if protocol in [3, "3"]: - kwargs.update( - {"maint_notifications_config": MaintNotificationsConfig(enabled=False)} - ) if self.from_url: - # Create a redis node with a costumed connection pool + # Create a redis node with a custom connection pool kwargs.update({"host": host}) kwargs.update({"port": port}) kwargs.update({"cache": self._cache}) @@ -1885,7 +1902,10 @@ def initialize(self): else: # Create a new Redis connection r = self.create_redis_node( - startup_node.host, startup_node.port, **kwargs + startup_node.host, + startup_node.port, + maint_notifications_config=self.maint_notifications_config, + **kwargs, ) self.startup_nodes[startup_node.name].redis_connection = r # Make sure cluster mode is enabled on this node diff --git a/tests/maint_notifications/proxy_server_helpers.py b/tests/maint_notifications/proxy_server_helpers.py new file mode 100644 index 0000000000..f0276312f9 --- /dev/null +++ b/tests/maint_notifications/proxy_server_helpers.py @@ -0,0 +1,249 @@ +import base64 +import logging +from typing import Union + +from redis.http.http_client import HttpClient, HttpError +# from urllib.request import Request, urlopen +# from urllib.error import URLError + + +class RespTranslator: + """Helper class to translate between RESP and other encodings.""" + + @staticmethod + def cluster_slots_to_resp(resp: str) -> str: + """Convert query to RESP format.""" + return ( + f"*{len(resp.split())}\r\n" + + "\r\n".join(f"${len(x)}\r\n{x}" for x in resp.split()) + + "\r\n" + ) + + @staticmethod + def smigrating_to_resp(resp: str) -> str: + """Convert query to RESP format.""" + return ( + f">{len(resp.split())}\r\n" + + "\r\n".join(f"${len(x)}\r\n{x}" for x in resp.split()) + + "\r\n" + ) + + +class ProxyInterceptorHelper: + """Helper class for intercepting socket calls and managing interceptor server.""" + + def __init__(self, server_url: str = "http://localhost:4000"): + self.server_url = server_url + self._resp_translator = RespTranslator() + self.http_client = HttpClient() + + def cleanup_interceptors(self, *names: str): + """ + Resets all the interceptors by providing empty pattern and returned response. + + Args: + names: Names of the interceptors to reset + """ + for name in names: + self._reset_interceptor(name) + + def set_cluster_nodes(self, name: str, nodes: list[tuple[str, int]]) -> str: + """ + Set cluster nodes by intercepting CLUSTER SLOTS command. + + This method creates an interceptor that intercepts CLUSTER SLOTS commands + and returns a modified topology with the provided nodes. + + Args: + name: Name of the interceptor + nodes: List of (host, port) tuples representing the cluster nodes + + Returns: + The interceptor name that was created + + Example: + interceptor = ProxyInterceptorHelper(None, "http://localhost:4000") + interceptor_name = interceptor.set_cluster_nodes( + "test_topology", + [("127.0.0.1", 6379), ("127.0.0.1", 6380), ("127.0.0.1", 6381)] + ) + """ + # Build RESP response for CLUSTER SLOTS + # Format: * for each range: *3 :start :end *3 $ : $ + resp_parts = [f"*{len(nodes)}"] + + # For simplicity, distribute slots evenly across nodes + total_slots = 16384 + slots_per_node = total_slots // len(nodes) + + for i, (host, port) in enumerate(nodes): + start_slot = i * slots_per_node + end_slot = ( + (i + 1) * slots_per_node - 1 if i < len(nodes) - 1 else total_slots - 1 + ) + + # Node info: *3 for (host, port, id) + resp_parts.append("*3") + resp_parts.append(f":{start_slot}") + resp_parts.append(f":{end_slot}") + + # Node details: *3 for (host, port, id) + resp_parts.append("*3") + resp_parts.append(f"${len(host)}") + resp_parts.append(host) + resp_parts.append(f":{port}") + resp_parts.append("$13") + resp_parts.append(f"proxy-id-{port}") + + response = "\r\n".join(resp_parts) + "\r\n" + + # Add the interceptor + self._add_interceptor( + name=name, + match="*2\r\n$7\r\ncluster\r\n$5\r\nslots\r\n", + response=response, + encoding="raw", + ) + + return name + + def get_stats(self) -> dict: + """ + Get statistics from the interceptor server. + + Returns: + Statistics dictionary containing connection information + """ + url = f"{self.server_url}/stats" + + try: + response = self.http_client.get(url) + return response.json() + + except HttpError as e: + raise RuntimeError(f"Failed to get stats from interceptor server: {e}") + + def get_connections(self) -> dict: + """ + Get all active connections from the server. + + Returns: + Response from the server as a dictionary + """ + url = f"{self.server_url}/connections" + + try: + response = self.http_client.get(url) + return response.json() + except HttpError as e: + raise RuntimeError(f"Failed to get connections: {e}") + + def send_notification( + self, + connected_to_port: Union[int, str], + notification: str, + ) -> dict: + """ + Send a notification to all connections connected to + a specific node(identified by port number). + + This method: + 1. Fetches stats from the interceptor server + 2. Finds all connection IDs connected to the specified node + 3. Sends the notification to each connection + + Args: + connected_to_port: Port number of the node to send the notification to + notification: The notification message to send (in RESP format) + + Returns: + Response from the server as a dictionary + + Example: + interceptor = ProxyInterceptorHelper(None, "http://localhost:4000") + result = interceptor.send_notification( + "6379", + "KjENCiQ0DQpQSU5HDQo=" # PING command in base64 + ) + """ + # Get stats to find connection IDs for the node + stats = self.get_stats() + + # Extract connection IDs for the specified node + conn_ids = [] + for node_key, node_info in stats.items(): + node_port = node_key.split("@")[1] + if int(node_port) == int(connected_to_port): + for conn in node_info.get("connections", []): + conn_ids.append(conn["id"]) + + if not conn_ids: + raise RuntimeError( + f"No connections found for node {node_port}. " + f"Available nodes: {list(set(c.get('node') for c in stats.get('connections', {}).values()))}" + ) + + # Send notification to each connection + results = {} + logging.info(f"Sending notification to {len(conn_ids)} connections: {conn_ids}") + connections_query = f"connectionIds={','.join(conn_ids)}" + url = f"{self.server_url}/send-to-clients?{connections_query}&encoding=base64" + # Encode notification to base64 + data = base64.b64encode(notification.encode("utf-8")) + + try: + response = self.http_client.post(url, json_body=data) + results = response.json() + except HttpError as e: + results = {"error": str(e)} + + return { + "node_address": node_port, + "connection_ids": conn_ids, + "results": results, + } + + def _add_interceptor( + self, + name: str, + match: str, + response: str, + encoding: str = "raw", + ) -> dict: + """ + Add an interceptor to the server. + + Args: + name: Name of the interceptor + match: Pattern to match (RESP format) + response: Response to return when matched (RESP format) + encoding: Encoding type - "base64" or "raw" + + Returns: + Response from the server as a dictionary + """ + url = f"{self.server_url}/interceptors" + payload = { + "name": name, + "match": match, + "response": response, + "encoding": encoding, + } + headers = {"Content-Type": "application/json"} + + try: + proxy_response = self.http_client.post( + url, json_body=payload, headers=headers + ) + return proxy_response.json() + except HttpError as e: + raise RuntimeError(f"Failed to add interceptor: {e}") + + def _reset_interceptor(self, name: str): + """ + Reset an interceptor by providing empty pattern and returned response. + + Args: + name: Name of the interceptor to reset + """ + self._add_interceptor(name, "", "") diff --git a/tests/maint_notifications/test_cluster_maint_notifications_handling.py b/tests/maint_notifications/test_cluster_maint_notifications_handling.py new file mode 100644 index 0000000000..6d81224603 --- /dev/null +++ b/tests/maint_notifications/test_cluster_maint_notifications_handling.py @@ -0,0 +1,343 @@ +from typing import cast + +from redis import ConnectionPool, RedisCluster +from redis.cluster import ClusterNode +from redis.connection import ( + BlockingConnectionPool, +) +from redis.maint_notifications import MaintNotificationsConfig +from redis.cache import CacheConfig +from tests.maint_notifications.proxy_server_helpers import ( + ProxyInterceptorHelper, + RespTranslator, +) + +# Initial cluster node configuration for proxy-based tests +PROXY_CLUSTER_NODES = [ + ClusterNode("127.0.0.1", 15379), + ClusterNode("127.0.0.1", 15380), + ClusterNode("127.0.0.1", 15381), +] + + +class TestClusterMaintNotificationsConfig: + """Test the maint_notifications_config parameter of RedisCluster.""" + + # Helper methods + def _create_cluster_client( + self, + maint_config=None, + connection_pool_class=None, + cache_config=None, + skip_full_coverage_check=True, + ): + """Create a RedisCluster instance with real cluster nodes.""" + kwargs = { + "startup_nodes": PROXY_CLUSTER_NODES, + "protocol": 3, + "skip_full_coverage_check": skip_full_coverage_check, + } + if maint_config is not None: + kwargs["maint_notifications_config"] = maint_config + if connection_pool_class is not None: + kwargs["connection_pool_class"] = connection_pool_class + if cache_config is not None: + kwargs["cache_config"] = cache_config + + return RedisCluster(**kwargs) + + def _validate_maint_config_on_nodes_manager( + self, + cluster: RedisCluster, + expected_enabled: bool, + expected_proactive_reconnect: bool, + expected_relaxed_timeout: int, + ) -> None: + """Validate maint_notifications_config on NodesManager.""" + assert cluster.nodes_manager.maint_notifications_config is not None + assert ( + cluster.nodes_manager.maint_notifications_config.enabled == expected_enabled + ) + assert ( + cluster.nodes_manager.maint_notifications_config.proactive_reconnect + == expected_proactive_reconnect + ) + assert ( + cluster.nodes_manager.maint_notifications_config.relaxed_timeout + == expected_relaxed_timeout + ) + + def _validate_maint_config_on_nodes( + self, + cluster: RedisCluster, + expected_enabled: bool, + expected_proactive_reconnect: bool, + expected_relaxed_timeout: int, + should_have_handler: bool = True, + ) -> None: + """Validate maint_notifications_config on individual nodes.""" + nodes = list(cluster.nodes_manager.nodes_cache.values()) + assert len(nodes) > 0, "Cluster should have at least one node" + + for node in nodes: + cluster_node = cast(ClusterNode, node) + assert cluster_node.redis_connection is not None + connection_pool = cluster_node.redis_connection.connection_pool + assert connection_pool is not None + + if should_have_handler: + if hasattr(connection_pool, "_maint_notifications_pool_handler"): + handler = connection_pool._maint_notifications_pool_handler + if handler is not None: + assert handler.config.enabled == expected_enabled + assert ( + handler.config.proactive_reconnect + == expected_proactive_reconnect + ) + assert ( + handler.config.relaxed_timeout == expected_relaxed_timeout + ) + + def test_maint_notifications_config(self): + """ + Test that maint_notifications_config is passed to NodesManager and nodes. + + Creates a RedisCluster instance with 3 real startup nodes and validates + that the maint_notifications_config is properly set on both the NodesManager + and the individual nodes. + """ + maint_config = MaintNotificationsConfig( + enabled=False, proactive_reconnect=True, relaxed_timeout=30 + ) + + cluster = self._create_cluster_client(maint_config=maint_config) + + try: + self._validate_maint_config_on_nodes_manager(cluster, False, True, 30) + self._validate_maint_config_on_nodes(cluster, False, True, 30) + + # Verify we can execute commands without errors + cluster.set("test", "VAL") + res = cluster.get("test") + assert res == b"VAL" + finally: + cluster.close() + + def test_config_propagation_to_new_nodes(self): + """ + Test that when a new node is discovered/added to the cluster, + it receives the same maint_notifications_config. + """ + maint_config = MaintNotificationsConfig( + enabled=False, proactive_reconnect=True, relaxed_timeout=25 + ) + + cluster = self._create_cluster_client(maint_config=maint_config) + + try: + # Verify initial nodes have the config + initial_node_count = len(cluster.nodes_manager.nodes_cache) + self._validate_maint_config_on_nodes(cluster, False, True, 25) + + # Reinitialize to ensure all nodes are discovered + cluster.nodes_manager.initialize() + + # Verify all nodes have the config + new_node_count = len(cluster.nodes_manager.nodes_cache) + assert new_node_count >= initial_node_count + self._validate_maint_config_on_nodes(cluster, False, True, 25) + finally: + cluster.close() + + def test_config_with_blocking_connection_pool(self): + """ + Test that maint_notifications_config works with BlockingConnectionPool. + """ + maint_config = MaintNotificationsConfig( + enabled=False, proactive_reconnect=True, relaxed_timeout=20 + ) + + cluster = self._create_cluster_client( + maint_config=maint_config, + connection_pool_class=BlockingConnectionPool, + ) + + try: + # Verify config is set on NodesManager + self._validate_maint_config_on_nodes_manager(cluster, False, True, 20) + + # Verify config is set on nodes + self._validate_maint_config_on_nodes(cluster, False, True, 20) + + # Verify we can execute commands without errors + cluster.set("test", "VAL") + res = cluster.get("test") + assert res == b"VAL" + finally: + cluster.close() + + def test_config_with_cache_enabled(self): + """ + Test that maint_notifications_config works with caching enabled. + """ + maint_config = MaintNotificationsConfig( + enabled=False, proactive_reconnect=True, relaxed_timeout=15 + ) + cache_config = CacheConfig() + + cluster = self._create_cluster_client( + maint_config=maint_config, + cache_config=cache_config, + ) + + try: + self._validate_maint_config_on_nodes_manager(cluster, False, True, 15) + self._validate_maint_config_on_nodes(cluster, False, True, 15) + + # Verify we can execute commands without errors + cluster.set("test", "VAL") + res = cluster.get("test") + assert res == b"VAL" + finally: + cluster.close() + + def test_none_config_default_behavior(self): + """ + Test that when maint_notifications_config=None, the system works without errors. + """ + cluster = self._create_cluster_client(maint_config=None) + + try: + # Verify cluster is created successfully + assert cluster.nodes_manager is not None + assert cluster.nodes_manager.maint_notifications_config is None + assert len(cluster.nodes_manager.nodes_cache) > 0 + # Verify we can execute commands without errors + cluster.set("test", "VAL") + res = cluster.get("test") + assert res == b"VAL" + finally: + cluster.close() + + def test_config_with_enabled_false(self): + """ + Test that when enabled=False, maint notifications handlers are not created/initialized. + """ + maint_config = MaintNotificationsConfig( + enabled=False, proactive_reconnect=False, relaxed_timeout=-1 + ) + + cluster = self._create_cluster_client(maint_config=maint_config) + + try: + self._validate_maint_config_on_nodes_manager(cluster, False, False, -1) + # When enabled=False, handlers should not be created + self._validate_maint_config_on_nodes( + cluster, False, False, -1, should_have_handler=False + ) + + # Verify we can execute commands without errors + cluster.set("test", "VAL") + res = cluster.get("test") + assert res == b"VAL" + finally: + cluster.close() + + def test_config_with_pipeline_operations(self): + """ + Test that maint_notifications_config works with pipelined commands. + """ + maint_config = MaintNotificationsConfig( + enabled=False, proactive_reconnect=True, relaxed_timeout=10 + ) + + cluster = self._create_cluster_client(maint_config=maint_config) + + try: + self._validate_maint_config_on_nodes_manager(cluster, False, True, 10) + self._validate_maint_config_on_nodes(cluster, False, True, 10) + + # Verify pipeline operations work without errors + pipe = cluster.pipeline() + pipe.set("pipe_key1", "value1") + pipe.set("pipe_key2", "value2") + pipe.get("pipe_key1") + pipe.get("pipe_key2") + results = pipe.execute() + + # Verify pipeline results + assert results[0] is True or results[0] == b"OK" # SET returns True or OK + assert results[1] is True or results[1] == b"OK" # SET returns True or OK + assert results[2] == b"value1" # GET returns value + assert results[3] == b"value2" # GET returns value + finally: + cluster.close() + + +class TestClusterMaintNotificationsHandlingBase: + """Base class for maintenance notifications handling tests.""" + + def setup_method(self): + """Set up test fixtures with mocked sockets.""" + self.proxy_helper = ProxyInterceptorHelper() + + # Create maintenance notifications config + self.config = MaintNotificationsConfig( + enabled="auto", proactive_reconnect=True, relaxed_timeout=30 + ) + self.cluster = self._create_cluster_client(maint_config=self.config) + + def _create_cluster_client( + self, + pool_class=ConnectionPool, + enable_cache=False, + max_connections=10, + maint_config=None, + ) -> RedisCluster: + """Create a RedisCluster instance with mocked sockets.""" + config = maint_config if maint_config is not None else self.config + kwargs = {} + if enable_cache: + kwargs = {"cache_config": CacheConfig()} + + test_redis_client = RedisCluster( + protocol=3, + startup_nodes=PROXY_CLUSTER_NODES, + maint_notifications_config=config, + connection_pool_class=pool_class, + max_connections=max_connections, + **kwargs, + ) + + return test_redis_client + + def teardown_method(self): + """Clean up test fixtures.""" + self.cluster.close() + self.proxy_helper.cleanup_interceptors() + + +class TestClusterMaintNotificationsHandling(TestClusterMaintNotificationsHandlingBase): + """Test maintenance notifications handling with RedisCluster.""" + + def test_receive_maint_notification(self): + """Test receiving a maintenance notification.""" + self.cluster.set("test", "VAL") + pubsub = self.cluster.pubsub() + pubsub.subscribe("test") + test_msg = pubsub.get_message(ignore_subscribe_messages=True, timeout=10) + print(test_msg) + + # Try to send a push notification to the clients of given server node + # Server node is defined by its port with the local test environment + # The message should be in the format: + # >3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$4\r\neeee\r + notification = RespTranslator.smigrating_to_resp( + "TEST_NOTIFICATION 12182 127.0.0.1:15380" + ) + self.proxy_helper.send_notification(pubsub.connection.port, notification) + res = self.proxy_helper.get_connections() + print(res) + + test_msg = pubsub.get_message(timeout=1) + print(test_msg) diff --git a/tests/test_maint_notifications.py b/tests/maint_notifications/test_maint_notifications.py similarity index 100% rename from tests/test_maint_notifications.py rename to tests/maint_notifications/test_maint_notifications.py diff --git a/tests/test_maint_notifications_handling.py b/tests/maint_notifications/test_maint_notifications_handling.py similarity index 99% rename from tests/test_maint_notifications_handling.py rename to tests/maint_notifications/test_maint_notifications_handling.py index 556b63d7e1..a61106dae1 100644 --- a/tests/test_maint_notifications_handling.py +++ b/tests/maint_notifications/test_maint_notifications_handling.py @@ -441,7 +441,7 @@ def _get_client( setup_pool_handler: Whether to set up pool handler for moving notifications (default: False) Returns: - tuple: (test_pool, test_redis_client) + test_redis_client """ config = ( maint_notifications_config