Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dead weakref in sentinel connection causing ReferenceError (#2767) #2771

Merged
merged 4 commits into from Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES
@@ -1,3 +1,4 @@
* Fix dead weakref in sentinel conn (#2767)
* Fix #2754, adding a missing argument to SentinelManagedConnection
* Fix `xadd` command to accept non-negative `maxlen` including 0
* Revert #2104, #2673, add `disconnect_on_error` option to `read_response()` (issues #2506, #2624)
Expand Down
88 changes: 63 additions & 25 deletions redis/sentinel.py
Expand Up @@ -78,6 +78,54 @@ class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection):
pass


class SentinelConnectionPoolProxy:
def __init__(
self,
connection_pool,
is_master,
check_connection,
service_name,
sentinel_manager,
):
self.connection_pool_ref = weakref.ref(connection_pool)
self.is_master = is_master
self.check_connection = check_connection
self.service_name = service_name
self.sentinel_manager = sentinel_manager
self.reset()

def reset(self):
self.master_address = None
self.slave_rr_counter = None

def get_master_address(self):
master_address = self.sentinel_manager.discover_master(self.service_name)
if self.is_master and self.master_address != master_address:
self.master_address = master_address
# disconnect any idle connections so that they reconnect
# to the new master the next time that they are used.
connection_pool = self.connection_pool_ref()
if connection_pool is not None:
connection_pool.disconnect(inuse_connections=False)
return master_address

def rotate_slaves(self):
slaves = self.sentinel_manager.discover_slaves(self.service_name)
if slaves:
if self.slave_rr_counter is None:
self.slave_rr_counter = random.randint(0, len(slaves) - 1)
for _ in range(len(slaves)):
self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
slave = slaves[self.slave_rr_counter]
yield slave
# Fallback to the master connection
try:
yield self.get_master_address()
except MasterNotFoundError:
pass
raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")


class SentinelConnectionPool(ConnectionPool):
"""
Sentinel backed connection pool.
Expand All @@ -95,8 +143,15 @@ def __init__(self, service_name, sentinel_manager, **kwargs):
)
self.is_master = kwargs.pop("is_master", True)
self.check_connection = kwargs.pop("check_connection", False)
self.proxy = SentinelConnectionPoolProxy(
connection_pool=self,
is_master=self.is_master,
check_connection=self.check_connection,
service_name=service_name,
sentinel_manager=sentinel_manager,
)
super().__init__(**kwargs)
self.connection_kwargs["connection_pool"] = weakref.proxy(self)
self.connection_kwargs["connection_pool"] = self.proxy
self.service_name = service_name
self.sentinel_manager = sentinel_manager

Expand All @@ -106,8 +161,11 @@ def __repr__(self):

def reset(self):
super().reset()
self.master_address = None
self.slave_rr_counter = None
self.proxy.reset()

@property
def master_address(self):
return self.proxy.master_address

def owns_connection(self, connection):
check = not self.is_master or (
Expand All @@ -117,31 +175,11 @@ def owns_connection(self, connection):
return check and parent.owns_connection(connection)

def get_master_address(self):
master_address = self.sentinel_manager.discover_master(self.service_name)
if self.is_master:
if self.master_address != master_address:
self.master_address = master_address
# disconnect any idle connections so that they reconnect
# to the new master the next time that they are used.
self.disconnect(inuse_connections=False)
return master_address
return self.proxy.get_master_address()

def rotate_slaves(self):
"Round-robin slave balancer"
slaves = self.sentinel_manager.discover_slaves(self.service_name)
if slaves:
if self.slave_rr_counter is None:
self.slave_rr_counter = random.randint(0, len(slaves) - 1)
for _ in range(len(slaves)):
self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
slave = slaves[self.slave_rr_counter]
yield slave
# Fallback to the master connection
try:
yield self.get_master_address()
except MasterNotFoundError:
pass
raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
return self.proxy.rotate_slaves()


class Sentinel(SentinelCommands):
Expand Down
9 changes: 9 additions & 0 deletions tests/test_sentinel.py
Expand Up @@ -98,6 +98,15 @@ def test_discover_master_error(sentinel):
sentinel.discover_master("xxx")


@pytest.mark.onlynoncluster
def test_dead_pool(sentinel):
master = sentinel.master_for("mymaster", db=9)
conn = master.connection_pool.get_connection("_")
conn.disconnect()
del master
conn.connect()


@pytest.mark.onlynoncluster
def test_discover_master_sentinel_down(cluster, sentinel, master_ip):
# Put first sentinel 'foo' down
Expand Down