Skip to content

Commit f81206b

Browse files
authored
Refactored Healthcheck and Failover strategy logic (#3771)
* Extract additional interfaces and abstract classes * Added base async components * Added command executor * Added recurring background tasks with event loop only * Added MultiDBClient * Added scenario and config tests * Added pipeline and transaction support for MultiDBClient * Added pub/sub support for MultiDBClient * Added check for couroutines methods for pub/sub * Added OSS Cluster API support for MultiDBCLient * Added support for Lag-Aware Healthcheck and OSS Cluster API * Increased timeouts between tests * [Sync] Refactored healthcheck * [Async] Refactored healthcheck * [Sync] Refactored Failover Strategy * [Async] Refactored Failover Strategy * Changed default values according to a design doc * [Async] Added Strategy Executor * [Sync] Added Strategy Executor * Apply comments
1 parent 481d89e commit f81206b

30 files changed

+1577
-703
lines changed

redis/asyncio/multidb/client.py

Lines changed: 40 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66
from redis.asyncio.multidb.command_executor import DefaultCommandExecutor
77
from redis.asyncio.multidb.database import AsyncDatabase, Databases
88
from redis.asyncio.multidb.failure_detector import AsyncFailureDetector
9-
from redis.asyncio.multidb.healthcheck import HealthCheck
9+
from redis.asyncio.multidb.healthcheck import HealthCheck, HealthCheckPolicy
1010
from redis.multidb.circuit import State as CBState, CircuitBreaker
1111
from redis.asyncio.multidb.config import MultiDbConfig, DEFAULT_GRACE_PERIOD
1212
from redis.background import BackgroundScheduler
1313
from redis.commands import AsyncRedisModuleCommands, AsyncCoreCommands
14-
from redis.multidb.exception import NoValidDatabaseException
14+
from redis.multidb.exception import NoValidDatabaseException, UnhealthyDatabaseException
1515
from redis.typing import KeyT, EncodableT, ChannelT
1616

1717
logger = logging.getLogger(__name__)
@@ -29,6 +29,10 @@ def __init__(self, config: MultiDbConfig):
2929
self._health_checks.extend(config.health_checks)
3030

3131
self._health_check_interval = config.health_check_interval
32+
self._health_check_policy: HealthCheckPolicy = config.health_check_policy.value(
33+
config.health_check_probes,
34+
config.health_check_delay
35+
)
3236
self._failure_detectors = config.default_failure_detectors()
3337

3438
if config.failure_detectors is not None:
@@ -46,6 +50,8 @@ def __init__(self, config: MultiDbConfig):
4650
databases=self._databases,
4751
command_retry=self._command_retry,
4852
failover_strategy=self._failover_strategy,
53+
failover_attempts=config.failover_attempts,
54+
failover_delay=config.failover_delay,
4955
event_dispatcher=self._event_dispatcher,
5056
auto_fallback_interval=self._auto_fallback_interval,
5157
)
@@ -244,42 +250,45 @@ async def _check_databases_health(
244250
Runs health checks as a recurring task.
245251
Runs health checks against all databases.
246252
"""
247-
for database, _ in self._databases:
248-
async with self._hc_lock:
249-
await self._check_db_health(database, on_error)
253+
results = await asyncio.wait_for(
254+
asyncio.gather(
255+
*(
256+
asyncio.create_task(self._check_db_health(database))
257+
for database, _ in self._databases
258+
),
259+
return_exceptions=True,
260+
),
261+
timeout=self._health_check_interval,
262+
)
250263

251-
async def _check_db_health(
252-
self,
253-
database: AsyncDatabase,
254-
on_error: Optional[Callable[[Exception], Coroutine[Any, Any, None]]] = None,
255-
) -> None:
264+
for result in results:
265+
if isinstance(result, UnhealthyDatabaseException):
266+
unhealthy_db = result.database
267+
unhealthy_db.circuit.state = CBState.OPEN
268+
269+
logger.exception(
270+
'Health check failed, due to exception',
271+
exc_info=result.original_exception
272+
)
273+
274+
if on_error:
275+
on_error(result.original_exception)
276+
277+
async def _check_db_health(self, database: AsyncDatabase,) -> bool:
256278
"""
257279
Runs health checks on the given database until first failure.
258280
"""
259-
is_healthy = True
260-
261281
# Health check will setup circuit state
262-
for health_check in self._health_checks:
263-
if not is_healthy:
264-
# If one of the health checks failed, it's considered unhealthy
265-
break
282+
is_healthy = await self._health_check_policy.execute(self._health_checks, database)
266283

267-
try:
268-
is_healthy = await health_check.check_health(database)
284+
if not is_healthy:
285+
if database.circuit.state != CBState.OPEN:
286+
database.circuit.state = CBState.OPEN
287+
return is_healthy
288+
elif is_healthy and database.circuit.state != CBState.CLOSED:
289+
database.circuit.state = CBState.CLOSED
269290

270-
if not is_healthy and database.circuit.state != CBState.OPEN:
271-
database.circuit.state = CBState.OPEN
272-
elif is_healthy and database.circuit.state != CBState.CLOSED:
273-
database.circuit.state = CBState.CLOSED
274-
except Exception as e:
275-
if database.circuit.state != CBState.OPEN:
276-
database.circuit.state = CBState.OPEN
277-
is_healthy = False
278-
279-
logger.exception('Health check failed, due to exception', exc_info=e)
280-
281-
if on_error:
282-
await on_error(e)
291+
return is_healthy
283292

284293
def _on_circuit_state_change_callback(self, circuit: CircuitBreaker, old_state: CBState, new_state: CBState):
285294
loop = asyncio.get_running_loop()

redis/asyncio/multidb/command_executor.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
from redis.asyncio.multidb.database import Databases, AsyncDatabase, Database
99
from redis.asyncio.multidb.event import AsyncActiveDatabaseChanged, RegisterCommandFailure, \
1010
ResubscribeOnActiveDatabaseChanged
11-
from redis.asyncio.multidb.failover import AsyncFailoverStrategy
11+
from redis.asyncio.multidb.failover import AsyncFailoverStrategy, FailoverStrategyExecutor, DefaultFailoverStrategyExecutor, \
12+
DEFAULT_FAILOVER_ATTEMPTS, DEFAULT_FAILOVER_DELAY
1213
from redis.asyncio.multidb.failure_detector import AsyncFailureDetector
1314
from redis.multidb.circuit import State as CBState
1415
from redis.asyncio.retry import Retry
@@ -62,8 +63,8 @@ def active_pubsub(self, pubsub: PubSub) -> None:
6263

6364
@property
6465
@abstractmethod
65-
def failover_strategy(self) -> AsyncFailoverStrategy:
66-
"""Returns failover strategy."""
66+
def failover_strategy_executor(self) -> FailoverStrategyExecutor:
67+
"""Returns failover strategy executor."""
6768
pass
6869

6970
@property
@@ -111,6 +112,8 @@ def __init__(
111112
command_retry: Retry,
112113
failover_strategy: AsyncFailoverStrategy,
113114
event_dispatcher: EventDispatcherInterface,
115+
failover_attempts: int = DEFAULT_FAILOVER_ATTEMPTS,
116+
failover_delay: float = DEFAULT_FAILOVER_DELAY,
114117
auto_fallback_interval: float = DEFAULT_AUTO_FALLBACK_INTERVAL,
115118
):
116119
"""
@@ -122,6 +125,8 @@ def __init__(
122125
command_retry: Retry policy for failed command execution
123126
failover_strategy: Strategy for handling database failover
124127
event_dispatcher: Interface for dispatching events
128+
failover_attempts: Number of failover attempts
129+
failover_delay: Delay between failover attempts
125130
auto_fallback_interval: Time interval in seconds between attempts to fall back to a primary database
126131
"""
127132
super().__init__(auto_fallback_interval)
@@ -132,7 +137,11 @@ def __init__(
132137
self._databases = databases
133138
self._failure_detectors = failure_detectors
134139
self._command_retry = command_retry
135-
self._failover_strategy = failover_strategy
140+
self._failover_strategy_executor = DefaultFailoverStrategyExecutor(
141+
failover_strategy,
142+
failover_attempts,
143+
failover_delay
144+
)
136145
self._event_dispatcher = event_dispatcher
137146
self._active_database: Optional[Database] = None
138147
self._active_pubsub: Optional[PubSub] = None
@@ -173,8 +182,8 @@ def active_pubsub(self, pubsub: PubSub) -> None:
173182
self._active_pubsub = pubsub
174183

175184
@property
176-
def failover_strategy(self) -> AsyncFailoverStrategy:
177-
return self._failover_strategy
185+
def failover_strategy_executor(self) -> FailoverStrategyExecutor:
186+
return self._failover_strategy_executor
178187

179188
@property
180189
def command_retry(self) -> Retry:
@@ -265,7 +274,7 @@ async def _check_active_database(self):
265274
and self._next_fallback_attempt <= datetime.now()
266275
)
267276
):
268-
await self.set_active_database(await self._failover_strategy.database())
277+
await self.set_active_database(await self._failover_strategy_executor.execute())
269278
self._schedule_next_fallback()
270279

271280
async def _on_command_fail(self, error, *args):

redis/asyncio/multidb/config.py

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,20 @@
55

66
from redis.asyncio import ConnectionPool, Redis, RedisCluster
77
from redis.asyncio.multidb.database import Databases, Database
8-
from redis.asyncio.multidb.failover import AsyncFailoverStrategy, WeightBasedFailoverStrategy
9-
from redis.asyncio.multidb.failure_detector import AsyncFailureDetector, FailureDetectorAsyncWrapper
10-
from redis.asyncio.multidb.healthcheck import HealthCheck, DEFAULT_HEALTH_CHECK_RETRIES, DEFAULT_HEALTH_CHECK_BACKOFF, \
11-
EchoHealthCheck
8+
from redis.asyncio.multidb.failover import AsyncFailoverStrategy, WeightBasedFailoverStrategy, DEFAULT_FAILOVER_DELAY, \
9+
DEFAULT_FAILOVER_ATTEMPTS
10+
from redis.asyncio.multidb.failure_detector import AsyncFailureDetector, FailureDetectorAsyncWrapper, \
11+
DEFAULT_FAILURES_THRESHOLD, DEFAULT_FAILURES_DURATION
12+
from redis.asyncio.multidb.healthcheck import HealthCheck, EchoHealthCheck, DEFAULT_HEALTH_CHECK_INTERVAL, \
13+
DEFAULT_HEALTH_CHECK_PROBES, DEFAULT_HEALTH_CHECK_DELAY, HealthCheckPolicies, DEFAULT_HEALTH_CHECK_POLICY
1214
from redis.asyncio.retry import Retry
13-
from redis.backoff import ExponentialWithJitterBackoff, AbstractBackoff, NoBackoff
15+
from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
1416
from redis.data_structure import WeightedList
1517
from redis.event import EventDispatcherInterface, EventDispatcher
16-
from redis.multidb.circuit import CircuitBreaker, PBCircuitBreakerAdapter
18+
from redis.multidb.circuit import CircuitBreaker, PBCircuitBreakerAdapter, DEFAULT_GRACE_PERIOD
1719
from redis.multidb.failure_detector import CommandFailureDetector
1820

19-
DEFAULT_GRACE_PERIOD = 5.0
20-
DEFAULT_HEALTH_CHECK_INTERVAL = 5
21-
DEFAULT_FAILURES_THRESHOLD = 3
22-
DEFAULT_FAILURES_DURATION = 2
23-
DEFAULT_FAILOVER_RETRIES = 3
24-
DEFAULT_FAILOVER_BACKOFF = ExponentialWithJitterBackoff(cap=3)
25-
DEFAULT_AUTO_FALLBACK_INTERVAL = -1
21+
DEFAULT_AUTO_FALLBACK_INTERVAL = 120
2622

2723
def default_event_dispatcher() -> EventDispatcherInterface:
2824
return EventDispatcher()
@@ -78,11 +74,11 @@ class MultiDbConfig:
7874
failures_interval: Time interval for tracking database failures.
7975
health_checks: Optional list of additional health checks performed on databases.
8076
health_check_interval: Time interval for executing health checks.
81-
health_check_retries: Number of retry attempts for performing health checks.
82-
health_check_backoff: Backoff strategy for health check retries.
77+
health_check_probes: Number of attempts to evaluate the health of a database.
78+
health_check_delay: Delay between health check attempts.
8379
failover_strategy: Optional strategy for handling database failover scenarios.
84-
failover_retries: Number of retries allowed for failover operations.
85-
failover_backoff: Backoff strategy for failover retries.
80+
failover_attempts: Number of retries allowed for failover operations.
81+
failover_delay: Delay between failover attempts.
8682
auto_fallback_interval: Time interval to trigger automatic fallback.
8783
event_dispatcher: Interface for dispatching events related to database operations.
8884
@@ -113,11 +109,12 @@ class MultiDbConfig:
113109
failures_interval: float = DEFAULT_FAILURES_DURATION
114110
health_checks: Optional[List[HealthCheck]] = None
115111
health_check_interval: float = DEFAULT_HEALTH_CHECK_INTERVAL
116-
health_check_retries: int = DEFAULT_HEALTH_CHECK_RETRIES
117-
health_check_backoff: AbstractBackoff = DEFAULT_HEALTH_CHECK_BACKOFF
112+
health_check_probes: int = DEFAULT_HEALTH_CHECK_PROBES
113+
health_check_delay: float = DEFAULT_HEALTH_CHECK_DELAY
114+
health_check_policy: HealthCheckPolicies = DEFAULT_HEALTH_CHECK_POLICY
118115
failover_strategy: Optional[AsyncFailoverStrategy] = None
119-
failover_retries: int = DEFAULT_FAILOVER_RETRIES
120-
failover_backoff: AbstractBackoff = DEFAULT_FAILOVER_BACKOFF
116+
failover_attempts: int = DEFAULT_FAILOVER_ATTEMPTS
117+
failover_delay: float = DEFAULT_FAILOVER_DELAY
121118
auto_fallback_interval: float = DEFAULT_AUTO_FALLBACK_INTERVAL
122119
event_dispatcher: EventDispatcherInterface = field(default_factory=default_event_dispatcher)
123120

@@ -160,10 +157,8 @@ def default_failure_detectors(self) -> List[AsyncFailureDetector]:
160157

161158
def default_health_checks(self) -> List[HealthCheck]:
162159
return [
163-
EchoHealthCheck(retry=Retry(retries=self.health_check_retries, backoff=self.health_check_backoff)),
160+
EchoHealthCheck(),
164161
]
165162

166163
def default_failover_strategy(self) -> AsyncFailoverStrategy:
167-
return WeightBasedFailoverStrategy(
168-
retry=Retry(retries=self.failover_retries, backoff=self.failover_backoff),
169-
)
164+
return WeightBasedFailoverStrategy()

redis/asyncio/multidb/failover.py

Lines changed: 87 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1+
import time
12
from abc import abstractmethod, ABC
23

34
from redis.asyncio.multidb.database import AsyncDatabase, Databases
45
from redis.multidb.circuit import State as CBState
5-
from redis.asyncio.retry import Retry
66
from redis.data_structure import WeightedList
7-
from redis.multidb.exception import NoValidDatabaseException
8-
from redis.utils import dummy_fail_async
7+
from redis.multidb.exception import NoValidDatabaseException, TemporaryUnavailableException
98

9+
DEFAULT_FAILOVER_ATTEMPTS = 10
10+
DEFAULT_FAILOVER_DELAY = 12
1011

1112
class AsyncFailoverStrategy(ABC):
1213

@@ -20,30 +21,98 @@ def set_databases(self, databases: Databases) -> None:
2021
"""Set the database strategy operates on."""
2122
pass
2223

24+
class FailoverStrategyExecutor(ABC):
25+
26+
@property
27+
@abstractmethod
28+
def failover_attempts(self) -> int:
29+
"""The number of failover attempts."""
30+
pass
31+
32+
@property
33+
@abstractmethod
34+
def failover_delay(self) -> float:
35+
"""The delay between failover attempts."""
36+
pass
37+
38+
@property
39+
@abstractmethod
40+
def strategy(self) -> AsyncFailoverStrategy:
41+
"""The strategy to execute."""
42+
pass
43+
44+
@abstractmethod
45+
async def execute(self) -> AsyncDatabase:
46+
"""Execute the failover strategy."""
47+
pass
48+
2349
class WeightBasedFailoverStrategy(AsyncFailoverStrategy):
2450
"""
2551
Failover strategy based on database weights.
2652
"""
27-
def __init__(
28-
self,
29-
retry: Retry
30-
):
31-
self._retry = retry
32-
self._retry.update_supported_errors([NoValidDatabaseException])
53+
def __init__(self):
3354
self._databases = WeightedList()
3455

3556
async def database(self) -> AsyncDatabase:
36-
return await self._retry.call_with_retry(
37-
lambda: self._get_active_database(),
38-
lambda _: dummy_fail_async()
39-
)
57+
for database, _ in self._databases:
58+
if database.circuit.state == CBState.CLOSED:
59+
return database
60+
61+
raise NoValidDatabaseException('No valid database available for communication')
4062

4163
def set_databases(self, databases: Databases) -> None:
4264
self._databases = databases
4365

44-
async def _get_active_database(self) -> AsyncDatabase:
45-
for database, _ in self._databases:
46-
if database.circuit.state == CBState.CLOSED:
47-
return database
66+
class DefaultFailoverStrategyExecutor(FailoverStrategyExecutor):
67+
"""
68+
Executes given failover strategy.
69+
"""
70+
def __init__(
71+
self,
72+
strategy: AsyncFailoverStrategy,
73+
failover_attempts: int = DEFAULT_FAILOVER_ATTEMPTS,
74+
failover_delay: float = DEFAULT_FAILOVER_DELAY,
75+
):
76+
self._strategy = strategy
77+
self._failover_attempts = failover_attempts
78+
self._failover_delay = failover_delay
79+
self._next_attempt_ts: int = 0
80+
self._failover_counter: int = 0
81+
82+
@property
83+
def failover_attempts(self) -> int:
84+
return self._failover_attempts
85+
86+
@property
87+
def failover_delay(self) -> float:
88+
return self._failover_delay
89+
90+
@property
91+
def strategy(self) -> AsyncFailoverStrategy:
92+
return self._strategy
93+
94+
async def execute(self) -> AsyncDatabase:
95+
try:
96+
database = await self._strategy.database()
97+
self._reset()
98+
return database
99+
except NoValidDatabaseException as e:
100+
if self._next_attempt_ts == 0:
101+
self._next_attempt_ts = time.time() + self._failover_delay
102+
self._failover_counter += 1
103+
elif time.time() >= self._next_attempt_ts:
104+
self._next_attempt_ts += self._failover_delay
105+
self._failover_counter += 1
106+
107+
if self._failover_counter > self._failover_attempts:
108+
self._reset()
109+
raise e
110+
else:
111+
raise TemporaryUnavailableException(
112+
"No database connections currently available. "
113+
"This is a temporary condition - please retry the operation."
114+
)
48115

49-
raise NoValidDatabaseException('No valid database available for communication')
116+
def _reset(self) -> None:
117+
self._next_attempt_ts = 0
118+
self._failover_counter = 0

0 commit comments

Comments
 (0)