5252 WatchError ,
5353)
5454from redis .lock import Lock
55- from redis .maint_notifications import MaintNotificationsConfig
55+ from redis .maint_notifications import (
56+ MaintNotificationsConfig ,
57+ OSSMaintNotificationsHandler ,
58+ )
5659from redis .retry import Retry
5760from redis .utils import (
61+ check_protocol_version ,
5862 deprecated_args ,
5963 dict_merge ,
6064 list_keys_to_dict ,
@@ -214,6 +218,67 @@ def cleanup_kwargs(**kwargs):
214218 return connection_kwargs
215219
216220
221+ class MaintNotificationsAbstractRedisCluster :
222+ """
223+ Abstract class for handling maintenance notifications logic.
224+ This class is expected to be used as base class together with RedisCluster.
225+
226+ This class is intended to be used with multiple inheritance!
227+
228+ All logic related to maintenance notifications is encapsulated in this class.
229+ """
230+
231+ def __init__ (
232+ self ,
233+ maint_notifications_config : Optional [MaintNotificationsConfig ],
234+ ** kwargs ,
235+ ):
236+ # Initialize maintenance notifications
237+ is_protocol_supported = check_protocol_version (kwargs .get ("protocol" ), 3 )
238+
239+ if (
240+ maint_notifications_config
241+ and maint_notifications_config .enabled
242+ and not is_protocol_supported
243+ ):
244+ raise RedisError (
245+ "Maintenance notifications handlers on connection are only supported with RESP version 3"
246+ )
247+ if maint_notifications_config is None and is_protocol_supported :
248+ maint_notifications_config = MaintNotificationsConfig ()
249+
250+ self .maint_notifications_config = maint_notifications_config
251+
252+ if self .maint_notifications_config and self .maint_notifications_config .enabled :
253+ self ._oss_cluster_maint_notifications_handler = (
254+ OSSMaintNotificationsHandler (self , self .maint_notifications_config )
255+ )
256+ # Update connection kwargs for all future nodes connections
257+ self ._update_connection_kwargs_for_maint_notifications (
258+ self ._oss_cluster_maint_notifications_handler
259+ )
260+ # Update existing nodes connections - they are created as part of the RedisCluster constructor
261+ for node in self .get_nodes ():
262+ node .redis_connection .connection_pool .update_maint_notifications_config (
263+ self .maint_notifications_config ,
264+ oss_cluster_maint_notifications_handler = self ._oss_cluster_maint_notifications_handler ,
265+ )
266+ else :
267+ self ._oss_cluster_maint_notifications_handler = None
268+
269+ def _update_connection_kwargs_for_maint_notifications (
270+ self , oss_cluster_maint_notifications_handler : OSSMaintNotificationsHandler
271+ ):
272+ """
273+ Update the connection kwargs for all future connections.
274+ """
275+ self .nodes_manager .connection_kwargs .update (
276+ {
277+ "oss_cluster_maint_notifications_handler" : oss_cluster_maint_notifications_handler ,
278+ }
279+ )
280+
281+
217282class AbstractRedisCluster :
218283 RedisClusterRequestTTL = 16
219284
@@ -461,7 +526,9 @@ def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
461526 self .nodes_manager .default_node = random .choice (replicas )
462527
463528
464- class RedisCluster (AbstractRedisCluster , RedisClusterCommands ):
529+ class RedisCluster (
530+ AbstractRedisCluster , MaintNotificationsAbstractRedisCluster , RedisClusterCommands
531+ ):
465532 @classmethod
466533 def from_url (cls , url : str , ** kwargs : Any ) -> "RedisCluster" :
467534 """
@@ -612,8 +679,7 @@ def __init__(
612679 `redis.maint_notifications.MaintNotificationsConfig` for details.
613680 Only supported with RESP3.
614681 If not provided and protocol is RESP3, the maintenance notifications
615- will be enabled by default (logic is included in the NodesManager
616- initialization).
682+ will be enabled by default.
617683 :**kwargs:
618684 Extra arguments that will be sent into Redis instance when created
619685 (See Official redis-py doc for supported kwargs - the only limitation
@@ -695,9 +761,16 @@ def __init__(
695761 kwargs .get ("decode_responses" , False ),
696762 )
697763 protocol = kwargs .get ("protocol" , None )
698- if (cache_config or cache ) and protocol not in [ 3 , "3" ] :
764+ if (cache_config or cache ) and not check_protocol_version ( protocol , 3 ) :
699765 raise RedisError ("Client caching is only supported with RESP version 3" )
700766
767+ if maint_notifications_config and not check_protocol_version (protocol , 3 ):
768+ raise RedisError (
769+ "Maintenance notifications are only supported with RESP version 3"
770+ )
771+ if check_protocol_version (protocol , 3 ) and maint_notifications_config is None :
772+ maint_notifications_config = MaintNotificationsConfig ()
773+
701774 self .command_flags = self .__class__ .COMMAND_FLAGS .copy ()
702775 self .node_flags = self .__class__ .NODE_FLAGS .copy ()
703776 self .read_from_replicas = read_from_replicas
@@ -709,6 +782,7 @@ def __init__(
709782 else :
710783 self ._event_dispatcher = event_dispatcher
711784 self .startup_nodes = startup_nodes
785+
712786 self .nodes_manager = NodesManager (
713787 startup_nodes = startup_nodes ,
714788 from_url = from_url ,
@@ -763,6 +837,10 @@ def __init__(
763837 self ._aggregate_nodes = None
764838 self ._lock = threading .RLock ()
765839
840+ MaintNotificationsAbstractRedisCluster .__init__ (
841+ self , maint_notifications_config , ** kwargs
842+ )
843+
766844 def __enter__ (self ):
767845 return self
768846
@@ -1638,9 +1716,7 @@ def __init__(
16381716 cache_config : Optional [CacheConfig ] = None ,
16391717 cache_factory : Optional [CacheFactoryInterface ] = None ,
16401718 event_dispatcher : Optional [EventDispatcher ] = None ,
1641- maint_notifications_config : Optional [
1642- MaintNotificationsConfig
1643- ] = MaintNotificationsConfig (),
1719+ maint_notifications_config : Optional [MaintNotificationsConfig ] = None ,
16441720 ** kwargs ,
16451721 ):
16461722 self .nodes_cache : Dict [str , Redis ] = {}
@@ -1885,11 +1961,29 @@ def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
18851961
18861962 return target_node
18871963
1888- def initialize (self ):
1964+ def initialize (
1965+ self ,
1966+ additional_startup_nodes_info : List [Tuple [str , int ]] = [],
1967+ disconnect_startup_nodes_pools : bool = True ,
1968+ ):
18891969 """
18901970 Initializes the nodes cache, slots cache and redis connections.
18911971 :startup_nodes:
18921972 Responsible for discovering other nodes in the cluster
1973+ :disconnect_startup_nodes_pools:
1974+ Whether to disconnect the connection pool of the startup nodes
1975+ after the initialization is complete. This is useful when the
1976+ startup nodes are not part of the cluster and we want to avoid
1977+ keeping the connection open.
1978+ :additional_startup_nodes_info:
1979+ Additional nodes to add temporarily to the startup nodes.
1980+ The additional nodes will be used just in the process of extraction of the slots
1981+ and nodes information from the cluster.
1982+ This is useful when we want to add new nodes to the cluster
1983+ and initialize the client
1984+ with them.
1985+ The format of the list is a list of tuples, where each tuple contains
1986+ the host and port of the node.
18931987 """
18941988 self .reset ()
18951989 tmp_nodes_cache = {}
@@ -1899,9 +1993,25 @@ def initialize(self):
18991993 fully_covered = False
19001994 kwargs = self .connection_kwargs
19011995 exception = None
1996+
1997+ # Create cache if it's not provided and cache config is set
1998+ # should be done before initializing the first connection
1999+ # so that it will be applied to all connections
2000+ if self ._cache is None and self ._cache_config is not None :
2001+ if self ._cache_factory is None :
2002+ self ._cache = CacheFactory (self ._cache_config ).get_cache ()
2003+ else :
2004+ self ._cache = self ._cache_factory .get_cache ()
2005+
2006+ additional_startup_nodes = [
2007+ ClusterNode (host , port ) for host , port in additional_startup_nodes_info
2008+ ]
19022009 # Convert to tuple to prevent RuntimeError if self.startup_nodes
19032010 # is modified during iteration
1904- for startup_node in tuple (self .startup_nodes .values ()):
2011+ for startup_node in (
2012+ * self .startup_nodes .values (),
2013+ * additional_startup_nodes ,
2014+ ):
19052015 try :
19062016 if startup_node .redis_connection :
19072017 r = startup_node .redis_connection
@@ -1917,7 +2027,11 @@ def initialize(self):
19172027 # Make sure cluster mode is enabled on this node
19182028 try :
19192029 cluster_slots = str_if_bytes (r .execute_command ("CLUSTER SLOTS" ))
1920- r .connection_pool .disconnect ()
2030+ if disconnect_startup_nodes_pools :
2031+ # Disconnect the connection pool to avoid keeping the connection open
2032+ # For some cases we might not want to disconnect current pool and
2033+ # lose in flight commands responses
2034+ r .connection_pool .disconnect ()
19212035 except ResponseError :
19222036 raise RedisClusterException (
19232037 "Cluster mode is not enabled on this node"
@@ -1998,12 +2112,6 @@ def initialize(self):
19982112 f"one reachable node: { str (exception )} "
19992113 ) from exception
20002114
2001- if self ._cache is None and self ._cache_config is not None :
2002- if self ._cache_factory is None :
2003- self ._cache = CacheFactory (self ._cache_config ).get_cache ()
2004- else :
2005- self ._cache = self ._cache_factory .get_cache ()
2006-
20072115 # Create Redis connections to all nodes
20082116 self .create_redis_connections (list (tmp_nodes_cache .values ()))
20092117
0 commit comments