Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions redis/asyncio/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ class SlaveNotFoundError(ConnectionError):
pass


# Alias for REPLICA terminology (Redis 5.0+)
ReplicaNotFoundError = SlaveNotFoundError


class SentinelManagedConnection(Connection):
def __init__(self, **kwargs):
self.connection_pool = kwargs.pop("connection_pool")
Expand Down Expand Up @@ -166,6 +170,11 @@ async def rotate_slaves(self) -> AsyncIterator:
pass
raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")

async def rotate_replicas(self) -> AsyncIterator:
"""Round-robin replica balancer"""
async for replica in self.rotate_slaves():
yield replica


class Sentinel(AsyncSentinelCommands):
"""
Expand Down Expand Up @@ -318,6 +327,12 @@ def filter_slaves(
slaves_alive.append((slave["ip"], slave["port"]))
return slaves_alive

def filter_replicas(
self, replicas: Iterable[Mapping]
) -> Sequence[Tuple[EncodableT, EncodableT]]:
"""Remove replicas that are in an ODOWN or SDOWN state"""
return self.filter_slaves(replicas)

async def discover_slaves(
self, service_name: str
) -> Sequence[Tuple[EncodableT, EncodableT]]:
Expand All @@ -332,6 +347,12 @@ async def discover_slaves(
return slaves
return []

async def discover_replicas(
self, service_name: str
) -> Sequence[Tuple[EncodableT, EncodableT]]:
"""Returns a list of alive replicas for service ``service_name``"""
return await self.discover_slaves(service_name)

def master_for(
self,
service_name: str,
Expand Down Expand Up @@ -402,3 +423,34 @@ def slave_for(
connection_pool = connection_pool_class(service_name, self, **connection_kwargs)
# The Redis object "owns" the pool
return redis_class.from_pool(connection_pool)

def replica_for(
self,
service_name: str,
redis_class: Type[Redis] = Redis,
connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool,
**kwargs,
):
"""
Returns redis client instance for the ``service_name`` replica(s).

A SentinelConnectionPool class is used to retrieve the replica's
address before establishing a new connection.

By default clients will be a :py:class:`~redis.Redis` instance.
Specify a different class to the ``redis_class`` argument if you
desire something different.

The ``connection_pool_class`` specifies the connection pool to use.
The SentinelConnectionPool will be used by default.

All other keyword arguments are merged with any connection_kwargs
passed to this class and passed to the connection pool as keyword
arguments to be used to initialize Redis connections.
"""
return self.slave_for(
service_name,
redis_class=redis_class,
connection_pool_class=connection_pool_class,
**kwargs,
)
47 changes: 47 additions & 0 deletions redis/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class SlaveNotFoundError(ConnectionError):
pass


# Alias for REPLICA terminology (Redis 5.0+)
ReplicaNotFoundError = SlaveNotFoundError


class SentinelManagedConnection(Connection):
def __init__(self, **kwargs):
self.connection_pool = kwargs.pop("connection_pool")
Expand Down Expand Up @@ -198,6 +202,10 @@ def rotate_slaves(self):
"Round-robin slave balancer"
return self.proxy.rotate_slaves()

def rotate_replicas(self):
"Round-robin replica balancer"
return self.rotate_slaves()


class Sentinel(SentinelCommands):
"""
Expand Down Expand Up @@ -343,6 +351,10 @@ def filter_slaves(self, slaves):
slaves_alive.append((slave["ip"], slave["port"]))
return slaves_alive

def filter_replicas(self, replicas):
"Remove replicas that are in an ODOWN or SDOWN state"
return self.filter_slaves(replicas)

def discover_slaves(self, service_name):
"Returns a list of alive slaves for service ``service_name``"
for sentinel in self.sentinels:
Expand All @@ -355,6 +367,10 @@ def discover_slaves(self, service_name):
return slaves
return []

def discover_replicas(self, service_name):
"Returns a list of alive replicas for service ``service_name``"
return self.discover_slaves(service_name)

def master_for(
self,
service_name,
Expand Down Expand Up @@ -423,3 +439,34 @@ def slave_for(
return redis_class.from_pool(
connection_pool_class(service_name, self, **connection_kwargs)
)

def replica_for(
self,
service_name,
redis_class=Redis,
connection_pool_class=SentinelConnectionPool,
**kwargs,
):
"""
Returns redis client instance for the ``service_name`` replica(s).

A SentinelConnectionPool class is used to retrieve the replica's
address before establishing a new connection.

By default clients will be a :py:class:`~redis.Redis` instance.
Specify a different class to the ``redis_class`` argument if you
desire something different.

The ``connection_pool_class`` specifies the connection pool to use.
The SentinelConnectionPool will be used by default.

All other keyword arguments are merged with any connection_kwargs
passed to this class and passed to the connection pool as keyword
arguments to be used to initialize Redis connections.
"""
return self.slave_for(
service_name,
redis_class=redis_class,
connection_pool_class=connection_pool_class,
**kwargs,
)
63 changes: 63 additions & 0 deletions tests/test_asyncio/test_sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from redis import exceptions
from redis.asyncio.sentinel import (
MasterNotFoundError,
ReplicaNotFoundError,
Sentinel,
SentinelConnectionPool,
SlaveNotFoundError,
Expand Down Expand Up @@ -396,3 +397,65 @@ async def test_sentinel_commands_with_strict_redis_client(request):
assert isinstance(await client.sentinel_ckquorum("redis-py-test"), bool)

await client.close()


# Tests for REPLICA aliases (Redis 5.0+ terminology)
@pytest.mark.onlynoncluster
async def test_replica_not_found_error_alias():
"""Test that ReplicaNotFoundError is an alias for SlaveNotFoundError"""
assert ReplicaNotFoundError is SlaveNotFoundError


@pytest.mark.onlynoncluster
async def test_replica_for_alias(cluster, sentinel):
"""Test that replica_for() is an alias for slave_for()"""
cluster.slaves = [
{"ip": "127.0.0.1", "port": 6379, "is_odown": False, "is_sdown": False}
]
async with sentinel.replica_for("mymaster", db=9) as replica:
assert await replica.ping()


@pytest.mark.onlynoncluster
async def test_discover_replicas_alias(cluster, sentinel):
"""Test that discover_replicas() is an alias for discover_slaves()"""
cluster.slaves = [
{"ip": "slave0", "port": 1234, "is_odown": False, "is_sdown": False},
{"ip": "slave1", "port": 1234, "is_odown": False, "is_sdown": False},
]
# discover_replicas should return the same result as discover_slaves
replicas = await sentinel.discover_replicas("mymaster")
slaves = await sentinel.discover_slaves("mymaster")
assert replicas == slaves
assert replicas == [("slave0", 1234), ("slave1", 1234)]


@pytest.mark.onlynoncluster
async def test_filter_replicas_alias(cluster, sentinel):
"""Test that filter_replicas() is an alias for filter_slaves()"""
replicas = [
{"ip": "replica0", "port": 1234, "is_odown": False, "is_sdown": False},
{"ip": "replica1", "port": 1234, "is_odown": True, "is_sdown": False},
]
# filter_replicas should return the same result as filter_slaves
filtered_replicas = sentinel.filter_replicas(replicas)
filtered_slaves = sentinel.filter_slaves(replicas)
assert filtered_replicas == filtered_slaves
assert filtered_replicas == [("replica0", 1234)]


@pytest.mark.onlynoncluster
async def test_rotate_replicas_alias(cluster, sentinel, master_ip):
"""Test that rotate_replicas() is an alias for rotate_slaves()"""
cluster.slaves = [
{"ip": "slave0", "port": 6379, "is_odown": False, "is_sdown": False},
{"ip": "slave1", "port": 6379, "is_odown": False, "is_sdown": False},
]
pool = SentinelConnectionPool("mymaster", sentinel)
rotator = pool.rotate_replicas()
assert await rotator.__anext__() in (("slave0", 6379), ("slave1", 6379))
assert await rotator.__anext__() in (("slave0", 6379), ("slave1", 6379))
# Fallback to master
assert await rotator.__anext__() == (master_ip, 6379)
with pytest.raises(SlaveNotFoundError):
await rotator.__anext__()
63 changes: 63 additions & 0 deletions tests/test_sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from redis import exceptions
from redis.sentinel import (
MasterNotFoundError,
ReplicaNotFoundError,
Sentinel,
SentinelConnectionPool,
SlaveNotFoundError,
Expand Down Expand Up @@ -373,3 +374,65 @@ def test_sentinel_commands_with_strict_redis_client(request):
assert isinstance(client.sentinel_ckquorum("redis-py-test"), bool)

client.close()


# Tests for REPLICA aliases (Redis 5.0+ terminology)
@pytest.mark.onlynoncluster
def test_replica_not_found_error_alias():
"""Test that ReplicaNotFoundError is an alias for SlaveNotFoundError"""
assert ReplicaNotFoundError is SlaveNotFoundError


@pytest.mark.onlynoncluster
def test_replica_for_alias(cluster, sentinel):
"""Test that replica_for() is an alias for slave_for()"""
cluster.slaves = [
{"ip": "127.0.0.1", "port": 6379, "is_odown": False, "is_sdown": False}
]
replica = sentinel.replica_for("mymaster", db=9)
assert replica.ping()


@pytest.mark.onlynoncluster
def test_discover_replicas_alias(cluster, sentinel):
"""Test that discover_replicas() is an alias for discover_slaves()"""
cluster.slaves = [
{"ip": "slave0", "port": 1234, "is_odown": False, "is_sdown": False},
{"ip": "slave1", "port": 1234, "is_odown": False, "is_sdown": False},
]
# discover_replicas should return the same result as discover_slaves
replicas = sentinel.discover_replicas("mymaster")
slaves = sentinel.discover_slaves("mymaster")
assert replicas == slaves
assert replicas == [("slave0", 1234), ("slave1", 1234)]


@pytest.mark.onlynoncluster
def test_filter_replicas_alias(cluster, sentinel):
"""Test that filter_replicas() is an alias for filter_slaves()"""
replicas = [
{"ip": "replica0", "port": 1234, "is_odown": False, "is_sdown": False},
{"ip": "replica1", "port": 1234, "is_odown": True, "is_sdown": False},
]
# filter_replicas should return the same result as filter_slaves
filtered_replicas = sentinel.filter_replicas(replicas)
filtered_slaves = sentinel.filter_slaves(replicas)
assert filtered_replicas == filtered_slaves
assert filtered_replicas == [("replica0", 1234)]


@pytest.mark.onlynoncluster
def test_rotate_replicas_alias(cluster, sentinel, master_ip):
"""Test that rotate_replicas() is an alias for rotate_slaves()"""
cluster.slaves = [
{"ip": "slave0", "port": 6379, "is_odown": False, "is_sdown": False},
{"ip": "slave1", "port": 6379, "is_odown": False, "is_sdown": False},
]
pool = SentinelConnectionPool("mymaster", sentinel)
rotator = pool.rotate_replicas()
assert next(rotator) in (("slave0", 6379), ("slave1", 6379))
assert next(rotator) in (("slave0", 6379), ("slave1", 6379))
# Fallback to master
assert next(rotator) == (master_ip, 6379)
with pytest.raises(SlaveNotFoundError):
next(rotator)