Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b26a03e
Extract additional interfaces and abstract classes
vladvildanov Aug 27, 2025
6bc4f71
Merge branch 'feat/active-active' of github.com:redis/redis-py into v…
vladvildanov Aug 28, 2025
bad9bcc
Added base async components
vladvildanov Aug 29, 2025
ae42bea
Added command executor
vladvildanov Sep 2, 2025
8fc74b9
Added recurring background tasks with event loop only
vladvildanov Sep 2, 2025
97c3cde
Added MultiDBClient
vladvildanov Sep 3, 2025
e376544
Added scenario and config tests
vladvildanov Sep 4, 2025
57f6d8b
Added pipeline and transaction support for MultiDBClient
vladvildanov Sep 4, 2025
25eebb9
Added pub/sub support for MultiDBClient
vladvildanov Sep 4, 2025
a82d8e7
Added check for couroutines methods for pub/sub
vladvildanov Sep 5, 2025
d38fb0a
Added OSS Cluster API support for MultiDBCLient
vladvildanov Sep 10, 2025
54db16b
Added support for Lag-Aware Healthcheck and OSS Cluster API
vladvildanov Sep 11, 2025
80e253a
Merge branch 'feat/active-active' of github.com:redis/redis-py into v…
vladvildanov Sep 11, 2025
4431861
Increased timeouts between tests
vladvildanov Sep 11, 2025
9ed1901
[Sync] Refactored healthcheck
vladvildanov Sep 12, 2025
9f08d38
[Async] Refactored healthcheck
vladvildanov Sep 12, 2025
3c45f6e
[Sync] Refactored Failover Strategy
vladvildanov Sep 15, 2025
bc598d0
[Async] Refactored Failover Strategy
vladvildanov Sep 15, 2025
d41440a
Changed default values according to a design doc
vladvildanov Sep 16, 2025
9550299
[Async] Added Strategy Executor
vladvildanov Sep 16, 2025
0b4b4f0
[Sync] Added Strategy Executor
vladvildanov Sep 16, 2025
761d17c
Merge branch 'feat/active-active' of github.com:redis/redis-py into v…
vladvildanov Sep 17, 2025
4dd4f0a
Apply comments
vladvildanov Sep 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 40 additions & 31 deletions redis/asyncio/multidb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from redis.asyncio.multidb.command_executor import DefaultCommandExecutor
from redis.asyncio.multidb.database import AsyncDatabase, Databases
from redis.asyncio.multidb.failure_detector import AsyncFailureDetector
from redis.asyncio.multidb.healthcheck import HealthCheck
from redis.asyncio.multidb.healthcheck import HealthCheck, HealthCheckPolicy
from redis.multidb.circuit import State as CBState, CircuitBreaker
from redis.asyncio.multidb.config import MultiDbConfig, DEFAULT_GRACE_PERIOD
from redis.background import BackgroundScheduler
from redis.commands import AsyncRedisModuleCommands, AsyncCoreCommands
from redis.multidb.exception import NoValidDatabaseException
from redis.multidb.exception import NoValidDatabaseException, UnhealthyDatabaseException
from redis.typing import KeyT, EncodableT, ChannelT

logger = logging.getLogger(__name__)
Expand All @@ -29,6 +29,10 @@ def __init__(self, config: MultiDbConfig):
self._health_checks.extend(config.health_checks)

self._health_check_interval = config.health_check_interval
self._health_check_policy: HealthCheckPolicy = config.health_check_policy.value(
config.health_check_probes,
config.health_check_delay
)
self._failure_detectors = config.default_failure_detectors()

if config.failure_detectors is not None:
Expand All @@ -46,6 +50,8 @@ def __init__(self, config: MultiDbConfig):
databases=self._databases,
command_retry=self._command_retry,
failover_strategy=self._failover_strategy,
failover_attempts=config.failover_attempts,
failover_delay=config.failover_delay,
event_dispatcher=self._event_dispatcher,
auto_fallback_interval=self._auto_fallback_interval,
)
Expand Down Expand Up @@ -244,42 +250,45 @@ async def _check_databases_health(
Runs health checks as a recurring task.
Runs health checks against all databases.
"""
for database, _ in self._databases:
async with self._hc_lock:
await self._check_db_health(database, on_error)
results = await asyncio.wait_for(
asyncio.gather(
*(
asyncio.create_task(self._check_db_health(database))
for database, _ in self._databases
),
return_exceptions=True,
),
timeout=self._health_check_interval,
)

async def _check_db_health(
self,
database: AsyncDatabase,
on_error: Optional[Callable[[Exception], Coroutine[Any, Any, None]]] = None,
) -> None:
for result in results:
if isinstance(result, UnhealthyDatabaseException):
unhealthy_db = result.database
unhealthy_db.circuit.state = CBState.OPEN

logger.exception(
'Health check failed, due to exception',
exc_info=result.original_exception
)

if on_error:
on_error(result.original_exception)

async def _check_db_health(self, database: AsyncDatabase,) -> bool:
"""
Runs health checks on the given database until first failure.
"""
is_healthy = True

# Health check will setup circuit state
for health_check in self._health_checks:
if not is_healthy:
# If one of the health checks failed, it's considered unhealthy
break
is_healthy = await self._health_check_policy.execute(self._health_checks, database)

try:
is_healthy = await health_check.check_health(database)
if not is_healthy:
if database.circuit.state != CBState.OPEN:
database.circuit.state = CBState.OPEN
return is_healthy
elif is_healthy and database.circuit.state != CBState.CLOSED:
database.circuit.state = CBState.CLOSED

if not is_healthy and database.circuit.state != CBState.OPEN:
database.circuit.state = CBState.OPEN
elif is_healthy and database.circuit.state != CBState.CLOSED:
database.circuit.state = CBState.CLOSED
except Exception as e:
if database.circuit.state != CBState.OPEN:
database.circuit.state = CBState.OPEN
is_healthy = False

logger.exception('Health check failed, due to exception', exc_info=e)

if on_error:
await on_error(e)
return is_healthy

def _on_circuit_state_change_callback(self, circuit: CircuitBreaker, old_state: CBState, new_state: CBState):
loop = asyncio.get_running_loop()
Expand Down
23 changes: 16 additions & 7 deletions redis/asyncio/multidb/command_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from redis.asyncio.multidb.database import Databases, AsyncDatabase, Database
from redis.asyncio.multidb.event import AsyncActiveDatabaseChanged, RegisterCommandFailure, \
ResubscribeOnActiveDatabaseChanged
from redis.asyncio.multidb.failover import AsyncFailoverStrategy
from redis.asyncio.multidb.failover import AsyncFailoverStrategy, FailoverStrategyExecutor, DefaultFailoverStrategyExecutor, \
DEFAULT_FAILOVER_ATTEMPTS, DEFAULT_FAILOVER_DELAY
from redis.asyncio.multidb.failure_detector import AsyncFailureDetector
from redis.multidb.circuit import State as CBState
from redis.asyncio.retry import Retry
Expand Down Expand Up @@ -62,8 +63,8 @@ def active_pubsub(self, pubsub: PubSub) -> None:

@property
@abstractmethod
def failover_strategy(self) -> AsyncFailoverStrategy:
"""Returns failover strategy."""
def failover_strategy_executor(self) -> FailoverStrategyExecutor:
"""Returns failover strategy executor."""
pass

@property
Expand Down Expand Up @@ -111,6 +112,8 @@ def __init__(
command_retry: Retry,
failover_strategy: AsyncFailoverStrategy,
event_dispatcher: EventDispatcherInterface,
failover_attempts: int = DEFAULT_FAILOVER_ATTEMPTS,
failover_delay: float = DEFAULT_FAILOVER_DELAY,
auto_fallback_interval: float = DEFAULT_AUTO_FALLBACK_INTERVAL,
):
"""
Expand All @@ -122,6 +125,8 @@ def __init__(
command_retry: Retry policy for failed command execution
failover_strategy: Strategy for handling database failover
event_dispatcher: Interface for dispatching events
failover_attempts: Number of failover attempts
failover_delay: Delay between failover attempts
auto_fallback_interval: Time interval in seconds between attempts to fall back to a primary database
"""
super().__init__(auto_fallback_interval)
Expand All @@ -132,7 +137,11 @@ def __init__(
self._databases = databases
self._failure_detectors = failure_detectors
self._command_retry = command_retry
self._failover_strategy = failover_strategy
self._failover_strategy_executor = DefaultFailoverStrategyExecutor(
failover_strategy,
failover_attempts,
failover_delay
)
self._event_dispatcher = event_dispatcher
self._active_database: Optional[Database] = None
self._active_pubsub: Optional[PubSub] = None
Expand Down Expand Up @@ -173,8 +182,8 @@ def active_pubsub(self, pubsub: PubSub) -> None:
self._active_pubsub = pubsub

@property
def failover_strategy(self) -> AsyncFailoverStrategy:
return self._failover_strategy
def failover_strategy_executor(self) -> FailoverStrategyExecutor:
return self._failover_strategy_executor

@property
def command_retry(self) -> Retry:
Expand Down Expand Up @@ -265,7 +274,7 @@ async def _check_active_database(self):
and self._next_fallback_attempt <= datetime.now()
)
):
await self.set_active_database(await self._failover_strategy.database())
await self.set_active_database(await self._failover_strategy_executor.execute())
self._schedule_next_fallback()

async def _on_command_fail(self, error, *args):
Expand Down
45 changes: 20 additions & 25 deletions redis/asyncio/multidb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,20 @@

from redis.asyncio import ConnectionPool, Redis, RedisCluster
from redis.asyncio.multidb.database import Databases, Database
from redis.asyncio.multidb.failover import AsyncFailoverStrategy, WeightBasedFailoverStrategy
from redis.asyncio.multidb.failure_detector import AsyncFailureDetector, FailureDetectorAsyncWrapper
from redis.asyncio.multidb.healthcheck import HealthCheck, DEFAULT_HEALTH_CHECK_RETRIES, DEFAULT_HEALTH_CHECK_BACKOFF, \
EchoHealthCheck
from redis.asyncio.multidb.failover import AsyncFailoverStrategy, WeightBasedFailoverStrategy, DEFAULT_FAILOVER_DELAY, \
DEFAULT_FAILOVER_ATTEMPTS
from redis.asyncio.multidb.failure_detector import AsyncFailureDetector, FailureDetectorAsyncWrapper, \
DEFAULT_FAILURES_THRESHOLD, DEFAULT_FAILURES_DURATION
from redis.asyncio.multidb.healthcheck import HealthCheck, EchoHealthCheck, DEFAULT_HEALTH_CHECK_INTERVAL, \
DEFAULT_HEALTH_CHECK_PROBES, DEFAULT_HEALTH_CHECK_DELAY, HealthCheckPolicies, DEFAULT_HEALTH_CHECK_POLICY
from redis.asyncio.retry import Retry
from redis.backoff import ExponentialWithJitterBackoff, AbstractBackoff, NoBackoff
from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
from redis.data_structure import WeightedList
from redis.event import EventDispatcherInterface, EventDispatcher
from redis.multidb.circuit import CircuitBreaker, PBCircuitBreakerAdapter
from redis.multidb.circuit import CircuitBreaker, PBCircuitBreakerAdapter, DEFAULT_GRACE_PERIOD
from redis.multidb.failure_detector import CommandFailureDetector

DEFAULT_GRACE_PERIOD = 5.0
DEFAULT_HEALTH_CHECK_INTERVAL = 5
DEFAULT_FAILURES_THRESHOLD = 3
DEFAULT_FAILURES_DURATION = 2
DEFAULT_FAILOVER_RETRIES = 3
DEFAULT_FAILOVER_BACKOFF = ExponentialWithJitterBackoff(cap=3)
DEFAULT_AUTO_FALLBACK_INTERVAL = -1
DEFAULT_AUTO_FALLBACK_INTERVAL = 120

def default_event_dispatcher() -> EventDispatcherInterface:
return EventDispatcher()
Expand Down Expand Up @@ -78,11 +74,11 @@ class MultiDbConfig:
failures_interval: Time interval for tracking database failures.
health_checks: Optional list of additional health checks performed on databases.
health_check_interval: Time interval for executing health checks.
health_check_retries: Number of retry attempts for performing health checks.
health_check_backoff: Backoff strategy for health check retries.
health_check_probes: Number of attempts to evaluate the health of a database.
health_check_delay: Delay between health check attempts.
failover_strategy: Optional strategy for handling database failover scenarios.
failover_retries: Number of retries allowed for failover operations.
failover_backoff: Backoff strategy for failover retries.
failover_attempts: Number of retries allowed for failover operations.
failover_delay: Delay between failover attempts.
auto_fallback_interval: Time interval to trigger automatic fallback.
event_dispatcher: Interface for dispatching events related to database operations.

Expand Down Expand Up @@ -113,11 +109,12 @@ class MultiDbConfig:
failures_interval: float = DEFAULT_FAILURES_DURATION
health_checks: Optional[List[HealthCheck]] = None
health_check_interval: float = DEFAULT_HEALTH_CHECK_INTERVAL
health_check_retries: int = DEFAULT_HEALTH_CHECK_RETRIES
health_check_backoff: AbstractBackoff = DEFAULT_HEALTH_CHECK_BACKOFF
health_check_probes: int = DEFAULT_HEALTH_CHECK_PROBES
health_check_delay: float = DEFAULT_HEALTH_CHECK_DELAY
health_check_policy: HealthCheckPolicies = DEFAULT_HEALTH_CHECK_POLICY
failover_strategy: Optional[AsyncFailoverStrategy] = None
failover_retries: int = DEFAULT_FAILOVER_RETRIES
failover_backoff: AbstractBackoff = DEFAULT_FAILOVER_BACKOFF
failover_attempts: int = DEFAULT_FAILOVER_ATTEMPTS
failover_delay: float = DEFAULT_FAILOVER_DELAY
auto_fallback_interval: float = DEFAULT_AUTO_FALLBACK_INTERVAL
event_dispatcher: EventDispatcherInterface = field(default_factory=default_event_dispatcher)

Expand Down Expand Up @@ -160,10 +157,8 @@ def default_failure_detectors(self) -> List[AsyncFailureDetector]:

def default_health_checks(self) -> List[HealthCheck]:
return [
EchoHealthCheck(retry=Retry(retries=self.health_check_retries, backoff=self.health_check_backoff)),
EchoHealthCheck(),
]

def default_failover_strategy(self) -> AsyncFailoverStrategy:
return WeightBasedFailoverStrategy(
retry=Retry(retries=self.failover_retries, backoff=self.failover_backoff),
)
return WeightBasedFailoverStrategy()
105 changes: 87 additions & 18 deletions redis/asyncio/multidb/failover.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import time
from abc import abstractmethod, ABC

from redis.asyncio.multidb.database import AsyncDatabase, Databases
from redis.multidb.circuit import State as CBState
from redis.asyncio.retry import Retry
from redis.data_structure import WeightedList
from redis.multidb.exception import NoValidDatabaseException
from redis.utils import dummy_fail_async
from redis.multidb.exception import NoValidDatabaseException, TemporaryUnavailableException

DEFAULT_FAILOVER_ATTEMPTS = 10
DEFAULT_FAILOVER_DELAY = 12

class AsyncFailoverStrategy(ABC):

Expand All @@ -20,30 +21,98 @@ def set_databases(self, databases: Databases) -> None:
"""Set the database strategy operates on."""
pass

class FailoverStrategyExecutor(ABC):

@property
@abstractmethod
def failover_attempts(self) -> int:
"""The number of failover attempts."""
pass

@property
@abstractmethod
def failover_delay(self) -> float:
"""The delay between failover attempts."""
pass

@property
@abstractmethod
def strategy(self) -> AsyncFailoverStrategy:
"""The strategy to execute."""
pass

@abstractmethod
async def execute(self) -> AsyncDatabase:
"""Execute the failover strategy."""
pass

class WeightBasedFailoverStrategy(AsyncFailoverStrategy):
"""
Failover strategy based on database weights.
"""
def __init__(
self,
retry: Retry
):
self._retry = retry
self._retry.update_supported_errors([NoValidDatabaseException])
def __init__(self):
self._databases = WeightedList()

async def database(self) -> AsyncDatabase:
return await self._retry.call_with_retry(
lambda: self._get_active_database(),
lambda _: dummy_fail_async()
)
for database, _ in self._databases:
if database.circuit.state == CBState.CLOSED:
return database

raise NoValidDatabaseException('No valid database available for communication')

def set_databases(self, databases: Databases) -> None:
self._databases = databases

async def _get_active_database(self) -> AsyncDatabase:
for database, _ in self._databases:
if database.circuit.state == CBState.CLOSED:
return database
class DefaultFailoverStrategyExecutor(FailoverStrategyExecutor):
"""
Executes given failover strategy.
"""
def __init__(
self,
strategy: AsyncFailoverStrategy,
failover_attempts: int = DEFAULT_FAILOVER_ATTEMPTS,
failover_delay: float = DEFAULT_FAILOVER_DELAY,
):
self._strategy = strategy
self._failover_attempts = failover_attempts
self._failover_delay = failover_delay
self._next_attempt_ts: int = 0
self._failover_counter: int = 0

@property
def failover_attempts(self) -> int:
return self._failover_attempts

@property
def failover_delay(self) -> float:
return self._failover_delay

@property
def strategy(self) -> AsyncFailoverStrategy:
return self._strategy

async def execute(self) -> AsyncDatabase:
try:
database = await self._strategy.database()
self._reset()
return database
except NoValidDatabaseException as e:
if self._next_attempt_ts == 0:
self._next_attempt_ts = time.time() + self._failover_delay
self._failover_counter += 1
elif time.time() >= self._next_attempt_ts:
self._next_attempt_ts += self._failover_delay
self._failover_counter += 1

if self._failover_counter > self._failover_attempts:
self._reset()
raise e
else:
raise TemporaryUnavailableException(
"No database connections currently available. "
"This is a temporary condition - please retry the operation."
)

raise NoValidDatabaseException('No valid database available for communication')
def _reset(self) -> None:
self._next_attempt_ts = 0
self._failover_counter = 0
Loading