Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#67 Introduced events/listeners in breakers #74

Merged
merged 16 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
uses: actions/cache@v2
with:
path: ~/.local
key: poetry-1.2.1-0
key: poetry-1.4.1-0

- uses: snok/install-poetry@v1
with:
Expand Down
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
SOURCE?=hyx docs/snippets
TESTS?=tests

.PHONY: help

help:
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'

install: ## Install project dependencies
@poetry install

clean: ## Clean temporary files
@echo "🧹 Cleaning temporary files.."
@rm -rf dist
Expand All @@ -22,6 +30,8 @@ lint: ## Lint source code
@poetry run ruff --fix $(SOURCE) $(TESTS)
@echo "🧹 Black"
@poetry run black $(SOURCE) $(TESTS)
@echo "🧹 Ruff"
@ruff --fix $(SOURCE) $(TESTS)
@echo "🧽 MyPy"
@poetry run mypy --pretty $(SOURCE) $(TESTS)

Expand Down
3 changes: 2 additions & 1 deletion hyx/circuitbreaker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from hyx.circuitbreaker.api import consecutive_breaker
from hyx.circuitbreaker.events import BreakerListener, register_breaker_listener

__all__ = ("consecutive_breaker",)
__all__ = ("consecutive_breaker", "BreakerListener", "register_breaker_listener")
17 changes: 16 additions & 1 deletion hyx/circuitbreaker/api.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import functools
from types import TracebackType
from typing import Any, Optional, Type, cast
from typing import Any, Optional, Sequence, Type, cast

from hyx.circuitbreaker.events import _BREAKER_LISTENERS, BreakerListener
from hyx.circuitbreaker.managers import ConsecutiveCircuitBreaker
from hyx.circuitbreaker.states import BreakerState
from hyx.circuitbreaker.typing import DelayT
from hyx.events import EventDispatcher, EventManager, get_default_name
from hyx.typing import ExceptionsT, FuncT


Expand Down Expand Up @@ -46,14 +48,27 @@ def __init__(
failure_threshold: int = 5,
recovery_time_secs: DelayT = 30,
recovery_threshold: int = 3,
listeners: Optional[Sequence[BreakerListener]] = None,
name: Optional[str] = None,
event_manager: Optional["EventManager"] = None,
) -> None:
event_dispatcher = EventDispatcher[ConsecutiveCircuitBreaker, BreakerListener](
listeners,
_BREAKER_LISTENERS,
event_manager=event_manager,
)

self._manager = ConsecutiveCircuitBreaker(
name=name or get_default_name(),
exceptions=exceptions,
failure_threshold=failure_threshold,
recovery_time_secs=recovery_time_secs,
recovery_threshold=recovery_threshold,
event_dispatcher=event_dispatcher.as_listener,
)

event_dispatcher.set_component(self._manager)

@property
def state(self) -> "BreakerState":
return self._manager.state
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import dataclasses
from typing import TYPE_CHECKING, Optional

from hyx.circuitbreaker.typing import DelayT
from hyx.typing import ExceptionsT

if TYPE_CHECKING:
from hyx.circuitbreaker import BreakerListener


@dataclasses.dataclass
class BreakerConfig:
class BreakerContext:
breaker_name: Optional[str]
exceptions: ExceptionsT
failure_threshold: int
recovery_time_secs: DelayT
recovery_threshold: int
event_dispatcher: "BreakerListener"
50 changes: 50 additions & 0 deletions hyx/circuitbreaker/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from typing import TYPE_CHECKING, Union

from hyx.circuitbreaker.context import BreakerContext
from hyx.circuitbreaker.managers import ConsecutiveCircuitBreaker
from hyx.events import ListenerFactoryT, ListenerRegistry

if TYPE_CHECKING:
from hyx.circuitbreaker.states import BreakerState, FailingState, RecoveringState, WorkingState

_BREAKER_LISTENERS: ListenerRegistry["ConsecutiveCircuitBreaker", "BreakerListener"] = ListenerRegistry()


class BreakerListener:
# TODO: add on success and on exception methods

async def on_working(
self,
context: BreakerContext,
current_state: "BreakerState",
next_state: "WorkingState",
) -> None:
...

async def on_recovering(
self,
context: BreakerContext,
current_state: "BreakerState",
next_state: "RecoveringState",
) -> None:
...

async def on_failing(
self,
context: BreakerContext,
current_state: "BreakerState",
next_state: "FailingState",
) -> None:
...

async def on_success(self, context: BreakerContext, state: "BreakerState") -> None:
...


def register_breaker_listener(listener: Union[BreakerListener, ListenerFactoryT]) -> None:
"""
Register a listener that will listen to all circuit breaker components in the system
"""
global _BREAKER_LISTENERS

_BREAKER_LISTENERS.register(listener)
23 changes: 16 additions & 7 deletions hyx/circuitbreaker/managers.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,42 @@
from typing import Any, Optional
from typing import TYPE_CHECKING, Any, Optional

from hyx.circuitbreaker.config import BreakerConfig
from hyx.circuitbreaker.context import BreakerContext
from hyx.circuitbreaker.states import BreakerState, WorkingState
from hyx.circuitbreaker.typing import DelayT
from hyx.typing import ExceptionsT, FuncT

if TYPE_CHECKING:
from hyx.circuitbreaker import BreakerListener


class ConsecutiveCircuitBreaker:
"""
Watch for consecutive exceptions that exceed a given threshold
"""

__slots__ = ("_config", "_state")
__slots__ = ("_context", "_name", "_state", "_event_dispatcher")

def __init__(
self,
name: str,
exceptions: ExceptionsT,
failure_threshold: int,
recovery_time_secs: DelayT,
recovery_threshold: int,
event_dispatcher: "BreakerListener",
) -> None:
self._config = BreakerConfig(
self._name = name

self._context = BreakerContext(
breaker_name=name,
exceptions=exceptions,
failure_threshold=failure_threshold,
recovery_time_secs=recovery_time_secs,
recovery_threshold=recovery_threshold,
event_dispatcher=event_dispatcher,
)

self._state: BreakerState = WorkingState(self._config)
self._state: BreakerState = WorkingState(self._context)

@property
def state(self) -> BreakerState:
Expand All @@ -40,7 +49,7 @@ async def acquire(self) -> None:
await self._transit_state(await self._state.before_execution())

async def release(self, exception: Optional[BaseException]) -> None:
if exception and isinstance(exception, self._config.exceptions):
if exception and isinstance(exception, self._context.exceptions):
await self._transit_state(await self._state.on_exception())
raise exception

Expand All @@ -55,7 +64,7 @@ async def __call__(self, func: FuncT) -> Any:
await self._transit_state(await self._state.on_success())

return result
except self._config.exceptions as e:
except self._context.exceptions as e:
await self._transit_state(await self._state.on_exception())
# breaker is not hiding the error like retry or fallback
raise e
48 changes: 31 additions & 17 deletions hyx/circuitbreaker/states.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from datetime import datetime, timedelta
from typing import Optional

from hyx.circuitbreaker.config import BreakerConfig
from hyx.circuitbreaker.context import BreakerContext
from hyx.circuitbreaker.exceptions import BreakerFailing


class BreakerState:
NAME: str = "base"

__slots__ = ("_config",)
__slots__ = ("_context", "_event_dispatcher")

def __init__(self, config: BreakerConfig) -> None:
self._config = config
def __init__(self, context: BreakerContext) -> None:
self._context = context

@property
def name(self) -> str:
Expand All @@ -38,8 +38,8 @@ class WorkingState(BreakerState):

__slots__ = ("_consecutive_exceptions",)

def __init__(self, config: BreakerConfig) -> None:
super().__init__(config)
def __init__(self, context: BreakerContext) -> None:
super().__init__(context)

self._consecutive_exceptions: int = 0

Expand All @@ -55,6 +55,7 @@ async def on_success(self) -> "BreakerState":
Reset the failure counter
"""
self._reset_exceptions_count()
await self._context.event_dispatcher.on_success(self._context, self)

return self

Expand All @@ -64,8 +65,11 @@ async def on_exception(self) -> "BreakerState":
"""
self._consecutive_exceptions += 1

if self._consecutive_exceptions >= self._config.failure_threshold:
return FailingState(self._config)
if self._consecutive_exceptions >= self._context.failure_threshold:
failing_state = FailingState(self._context)
await self._context.event_dispatcher.on_failing(self._context, self, failing_state)

return failing_state

return self

Expand All @@ -85,8 +89,8 @@ class FailingState(BreakerState):
"_failing_until",
)

def __init__(self, config: BreakerConfig) -> None:
super().__init__(config)
def __init__(self, context: BreakerContext) -> None:
super().__init__(context)

self._failing_since = self._get_failing_since()
self._failing_until = self._get_failing_until(self._failing_since)
Expand All @@ -96,7 +100,7 @@ def _get_failing_since() -> datetime:
return datetime.utcnow()

def _get_failing_until(self, since: datetime) -> datetime:
return since + timedelta(seconds=self._config.recovery_time_secs)
return since + timedelta(seconds=self._context.recovery_time_secs)

@property
def until(self) -> datetime:
Expand Down Expand Up @@ -128,7 +132,10 @@ async def before_execution(self) -> "BreakerState":
if self.remain:
raise BreakerFailing("Circuit Breaker is in the failing state")

return RecoveringState(self._config)
recovering_state = RecoveringState(self._context)
await self._context.event_dispatcher.on_recovering(self._context, self, recovering_state)

return recovering_state


class RecoveringState(BreakerState):
Expand All @@ -144,8 +151,8 @@ class RecoveringState(BreakerState):

__slots__ = ("_consecutive_successes",)

def __init__(self, config: BreakerConfig) -> None:
super().__init__(config)
def __init__(self, context: BreakerContext) -> None:
super().__init__(context)

self._consecutive_successes: int = 0

Expand All @@ -155,11 +162,18 @@ def consecutive_successes(self) -> int:

async def on_success(self) -> "BreakerState":
self._consecutive_successes += 1
await self._context.event_dispatcher.on_success(self._context, self)

if self.consecutive_successes >= self._config.recovery_threshold:
return WorkingState(self._config)
if self.consecutive_successes >= self._context.recovery_threshold:
working_state = WorkingState(self._context)
await self._context.event_dispatcher.on_working(self._context, self, working_state)

return working_state

return self

async def on_exception(self) -> "BreakerState":
return FailingState(self._config)
failing_state = FailingState(self._context)
await self._context.event_dispatcher.on_failing(self._context, self, failing_state)

return failing_state
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ line-length = 120
target-version = ['py39']

[tool.ruff]

line-length = 120

select = [
Expand Down
Loading
Loading