Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions packages/core/minos-microservice-saga/minos/saga/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,28 @@
)
from .definitions import (
ConditionalSagaStep,
ConditionalSagaStepDecoratorMeta,
ConditionalSagaStepDecoratorWrapper,
ElseThenAlternative,
ElseThenAlternativeDecoratorMeta,
ElseThenAlternativeDecoratorWrapper,
IfThenAlternative,
IfThenAlternativeDecoratorMeta,
IfThenAlternativeDecoratorWrapper,
LocalSagaStep,
LocalSagaStepDecoratorMeta,
LocalSagaStepDecoratorWrapper,
RemoteSagaStep,
RemoteSagaStepDecoratorMeta,
RemoteSagaStepDecoratorWrapper,
Saga,
SagaDecoratorMeta,
SagaDecoratorWrapper,
SagaOperation,
SagaOperationDecorator,
SagaStep,
SagaStepDecoratorMeta,
SagaStepDecoratorWrapper,
)
from .exceptions import (
AlreadyCommittedException,
Expand All @@ -27,6 +42,7 @@
MultipleOnExecuteException,
MultipleOnFailureException,
MultipleOnSuccessException,
OrderPrecedenceException,
SagaException,
SagaExecutionAlreadyExecutedException,
SagaExecutionException,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,34 @@
from .operations import (
SagaOperation,
SagaOperationDecorator,
)
from .saga import (
Saga,
SagaDecoratorMeta,
SagaDecoratorWrapper,
)
from .steps import (
ConditionalSagaStep,
ConditionalSagaStepDecoratorMeta,
ConditionalSagaStepDecoratorWrapper,
ElseThenAlternative,
ElseThenAlternativeDecoratorMeta,
ElseThenAlternativeDecoratorWrapper,
IfThenAlternative,
IfThenAlternativeDecoratorMeta,
IfThenAlternativeDecoratorWrapper,
LocalSagaStep,
LocalSagaStepDecoratorMeta,
LocalSagaStepDecoratorWrapper,
RemoteSagaStep,
RemoteSagaStepDecoratorMeta,
RemoteSagaStepDecoratorWrapper,
SagaStep,
SagaStepDecoratorMeta,
SagaStepDecoratorWrapper,
)
from .types import (
ConditionCallback,
LocalCallback,
RequestCallBack,
ResponseCallBack,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
"""Operation module."""

from __future__ import (
annotations,
)

from collections.abc import (
Callable,
Iterable,
)
from typing import (
TYPE_CHECKING,
Any,
Callable,
Generic,
Iterable,
Optional,
TypeVar,
Union,
)

if TYPE_CHECKING:
from .steps import SagaStepDecoratorMeta

from minos.common import (
classname,
import_module,
Expand All @@ -24,6 +32,27 @@
T = TypeVar("T", bound=Callable)


class SagaOperationDecorator(Generic[T]):
"""Saga Operation Decorator class."""

def __init__(self, attr_name: str = None, step_meta: SagaStepDecoratorMeta = None, *args, **kwargs):
if attr_name is None:
raise ValueError(f"The 'attr_name' must not be {None!r}.")
if step_meta is None:
raise ValueError(f"The 'step_meta' must not be {None!r}.")

self._step_meta = step_meta
self._attr_name = attr_name

self._args = args
self._kwargs = kwargs

def __call__(self, func: T) -> T:
operation = SagaOperation(func, *self._args, **self._kwargs)
setattr(self._step_meta, self._attr_name, operation)
return func


class SagaOperation(Generic[T]):
"""Saga Step Operation class."""

Expand Down Expand Up @@ -80,9 +109,15 @@ def from_raw(cls, raw: Optional[Union[dict[str, Any], SagaOperation[T]]], **kwar
current["parameters"] = SagaContext.from_avro_str(current["parameters"])
return cls(**current)

def __hash__(self):
return hash(tuple(self))

def __eq__(self, other: SagaOperation) -> bool:
return type(self) == type(other) and tuple(self) == tuple(other)

def __repr__(self) -> str:
return f"{type(self).__name__}{tuple(self)}"

def __iter__(self) -> Iterable:
yield from (
self.callback,
Expand Down
112 changes: 102 additions & 10 deletions packages/core/minos-microservice-saga/minos/saga/definitions/saga.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,37 @@
"""Saga definitions module."""

from __future__ import (
annotations,
)

import warnings
from collections.abc import (
Iterable,
)
from inspect import (
getmembers,
)
from operator import (
attrgetter,
)
from typing import (
Any,
Iterable,
Optional,
Type,
Protocol,
TypeVar,
Union,
runtime_checkable,
)

from cached_property import (
cached_property,
)

from ..exceptions import (
AlreadyCommittedException,
AlreadyOnSagaException,
EmptySagaException,
OrderPrecedenceException,
SagaNotCommittedException,
)
from .operations import (
Expand All @@ -26,32 +42,90 @@
LocalSagaStep,
RemoteSagaStep,
SagaStep,
SagaStepDecoratorWrapper,
)
from .types import (
LocalCallback,
RequestCallBack,
)


@runtime_checkable
class SagaDecoratorWrapper(Protocol):
"""Saga Decorator Wrapper class."""

meta: SagaDecoratorMeta


class SagaDecoratorMeta:
"""Saga Decorator Meta class."""

_inner: type
_definition: Saga

def __init__(self, func: type, saga: Saga):
self._inner = func
self._definition = saga

@cached_property
def definition(self) -> Saga:
"""Get the saga definition.

:return: A ``Saga`` instance.
"""
steps = getmembers(self._inner, predicate=lambda x: isinstance(x, SagaStepDecoratorWrapper))
steps = list(map(lambda member: member[1].meta.definition, steps))
for step in steps:
if step.order is None:
raise OrderPrecedenceException(f"The {step!r} step does not have 'order' value.")
steps.sort(key=attrgetter("order"))

for step in steps:
self._definition.add_step(step)
self._definition.commit()

return self._definition


TP = TypeVar("TP", bound=type)


class Saga:
"""Saga class.

The purpose of this class is to define a sequence of operations among microservices.
"""

# noinspection PyUnusedLocal
def __init__(self, *args, steps: list[SagaStep] = None, committed: bool = False, commit: None = None, **kwargs):
def __init__(
self, *args, steps: list[SagaStep] = None, committed: Optional[bool] = None, commit: None = None, **kwargs
):
self.steps = list()
self.committed = False

if steps is None:
steps = list()
for step in steps:
self.add_step(step)

self.steps = steps
if committed is None:
committed = len(steps)
self.committed = committed

if commit is not None:
warnings.warn(f"Commit callback is being deprecated. Use {self.local_step!r} instead", DeprecationWarning)
self.local_step(commit)
self.committed = True

def __call__(self, type_: TP) -> Union[TP, SagaDecoratorWrapper]:
"""Decorate the given type.

:param type_: The type to be decorated.
:return: The decorated type.
"""
type_.meta = SagaDecoratorMeta(type_, self)
return type_

@classmethod
def from_raw(cls, raw: Union[dict[str, Any], Saga], **kwargs) -> Saga:
"""Build a new ``Saga`` instance from raw.
Expand All @@ -77,7 +151,7 @@ def conditional_step(self, step: Optional[ConditionalSagaStep] = None) -> Condit
:param step: The step to be added. If `None` is provided then a new one will be created.
:return: A ``SagaStep`` instance.
"""
return self._add_step(ConditionalSagaStep, step)
return self.add_step(step, ConditionalSagaStep)

def local_step(
self, step: Optional[Union[LocalCallback, SagaOperation[LocalCallback], LocalSagaStep]] = None, **kwargs
Expand All @@ -90,12 +164,12 @@ def local_step(
"""
if step is not None and not isinstance(step, SagaStep) and not isinstance(step, SagaOperation):
step = SagaOperation(step, **kwargs)
return self._add_step(LocalSagaStep, step)
return self.add_step(step, LocalSagaStep)

def step(
self, step: Optional[Union[RequestCallBack, SagaOperation[RequestCallBack], RemoteSagaStep]] = None, **kwargs
) -> RemoteSagaStep:
"""Add a new remote step step.
"""Add a new remote step.

:param step: The step to be added. If `None` is provided then a new one will be created.
:param kwargs: Additional named parameters.
Expand All @@ -107,17 +181,23 @@ def step(
def remote_step(
self, step: Optional[Union[RequestCallBack, SagaOperation[RequestCallBack], RemoteSagaStep]] = None, **kwargs
) -> RemoteSagaStep:
"""Add a new remote step step.
"""Add a new remote step.

:param step: The step to be added. If `None` is provided then a new one will be created.
:param kwargs: Additional named parameters.
:return: A ``SagaStep`` instance.
"""
if step is not None and not isinstance(step, SagaStep) and not isinstance(step, SagaOperation):
step = SagaOperation(step, **kwargs)
return self._add_step(RemoteSagaStep, step)
return self.add_step(step, RemoteSagaStep)

def add_step(self, step: Optional[Union[SagaOperation, T]], step_cls: type[T] = SagaStep) -> T:
"""Add a new step.

def _add_step(self, step_cls: Type[T], step: Optional[Union[SagaOperation, T]]) -> T:
:param step: The step to be added.
:param step_cls: The step class (for validation purposes).
:return: The added step.
"""
if self.committed:
raise AlreadyCommittedException("It is not possible to add more steps to an already committed saga.")

Expand All @@ -129,9 +209,21 @@ def _add_step(self, step_cls: Type[T], step: Optional[Union[SagaOperation, T]])
if step.saga is not None:
raise AlreadyOnSagaException()
step.saga = self

else:
step = step_cls(step, saga=self)

if step.order is None:
if self.steps:
step.order = self.steps[-1].order + 1
else:
step.order = 1

if self.steps and step.order <= self.steps[-1].order:
raise OrderPrecedenceException(
f"Unsatisfied precedence constraints. Previous: {self.steps[-1].order} Current: {step.order} "
)

self.steps.append(step)
return step

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
from .abc import (
SagaStep,
SagaStepDecoratorMeta,
SagaStepDecoratorWrapper,
)
from .conditional import (
ConditionalSagaStep,
ConditionalSagaStepDecoratorMeta,
ConditionalSagaStepDecoratorWrapper,
ElseThenAlternative,
ElseThenAlternativeDecoratorMeta,
ElseThenAlternativeDecoratorWrapper,
IfThenAlternative,
IfThenAlternativeDecoratorMeta,
IfThenAlternativeDecoratorWrapper,
)
from .local import (
LocalSagaStep,
LocalSagaStepDecoratorMeta,
LocalSagaStepDecoratorWrapper,
)
from .remote import (
RemoteSagaStep,
RemoteSagaStepDecoratorMeta,
RemoteSagaStepDecoratorWrapper,
)
Loading