Skip to content

Commit

Permalink
This is a somewhat dumber scheme for Business Rule checking that trie…
Browse files Browse the repository at this point in the history
…s to achieve:

- Use celery workflows to do more up-front, avoiding .replace():
When checking a workbasket (or any collection of models), this attempts to
give everything to celery at once as one workflow, by doing a bit more processing up front.

The reasoning behind this, is that under the previous system with three layers of .replace(),
orchestration tasks would be competing with data checking tasks for resources from Celery.

Because .replace() was only called once some work was already done, celery didn't have
enough information to know that some tasks needed others to complete, the system could
fill up with orchestration tasks and have no resources left to run their subtasks.

The new system builds everything using celery primitves at the beginning so celery can
properly schedule everything.

- Reduce the amount of layers in the business rule checking tasks.
Instead of every business rule automatically getting a specialised business rule checker
class, this dumbs things down by keeping just the base classes for BusinessRuleChecker,
and LinkedBusinessRuleChecke (which used to be IndirectBusinessRuleChecker).

Business rules are now looked up by name and passed to the check_model_rule task,
while this has more parameters, there are less conceptual layers, and the task list in

Celery flower is now very easier to read,   XXXXXXXXXXXXXXXX

- Less layers in the business
  • Loading branch information
stuaxo committed Aug 3, 2022
1 parent 14b0920 commit 04f5a75
Show file tree
Hide file tree
Showing 33 changed files with 1,527 additions and 895 deletions.
317 changes: 147 additions & 170 deletions checks/checks.py
Original file line number Diff line number Diff line change
@@ -1,226 +1,203 @@
from functools import cached_property
from typing import Collection
from typing import Dict
from typing import Iterator
import logging
from collections import defaultdict
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Type
from typing import TypeVar

from django.conf import settings

from checks.models import TrackedModelCheck
from checks.models import TransactionCheck
from common.business_rules import ALL_RULES
from common.business_rules import BusinessRule
from common.business_rules import BusinessRuleViolation
from common.models.trackedmodel import TrackedModel
from common.models import TrackedModel
from common.models import Transaction
from common.models.utils import get_current_transaction
from common.models.utils import override_current_transaction

CheckResult = Tuple[bool, Optional[str]]

logger = logging.getLogger(__name__)

Self = TypeVar("Self")
CheckResult = Tuple[bool, Optional[str]]


class Checker:
"""
A ``Checker`` is an object that knows how to perform a certain kind of check
against a model.
Checkers can be applied against a model. The logic of the checker will be
run and the result recorded as a ``TrackedModelCheck``.
"""

@cached_property
def name(self) -> str:
"""
The name string that on a per-model basis uniquely identifies the
checker.
The name should be deterministic (i.e. not rely on the current
environment, memory locations or random data) so that the system can
record the name in the database and later use it to work out whether
this check has been run. The name doesn't need to include any details
about the model.
By default this is the name of the class, but it can include any other
non-model data that is unique to the checker. For a more complex
example, see ``IndirectBusinessRuleChecker.name``.
"""
return type(self).__name__

@classmethod
def checkers_for(cls: Type[Self], model: TrackedModel) -> Collection[Self]:
def run_rule(
cls,
rule: BusinessRule,
transaction: Transaction,
model: TrackedModel,
) -> CheckResult:
"""
Returns instances of this ``Checker`` that should apply to the model.
Run a single business rule on a single model.
What checks apply to a model is sometimes data-dependent, so it is the
responsibility of the ``Checker`` class to tell the system what
instances of itself it would expect to run against the model. For an
example, see ``IndirectBusinessRuleChecker.checkers_for``.
:return CheckResult, a Tuple(rule_passed: str, violation_reason: Optional[str]).
"""
raise NotImplementedError()
logger.debug(f"run_rule %s %s %s", model, rule, transaction.pk)
try:
rule(transaction).validate(model)
logger.debug(f"%s [tx:%s] %s [passed]", model, rule, transaction.pk)
return True, None
except BusinessRuleViolation as violation:
reason = violation.args[0]
logger.debug(f"%s [tx:%s] %s [failed]", model, rule, transaction.pk, reason)
return False, reason

def run(self, model: TrackedModel) -> CheckResult:
"""Runs Checker-dependent logic and returns an indication of success."""
raise NotImplementedError()
@classmethod
def apply_rule(
cls,
rule: BusinessRule,
transaction: Transaction,
model: TrackedModel,
):
"""
Applies the check to the model and records success.
def apply(self, model: TrackedModel, context: TransactionCheck):
"""Applies the check to the model and records success."""
:return: TrackedModelCheck instance containing the result of the check.
During debugging the developer can set settings.RAISE_BUSINESS_RULE_FAILURES
to True to raise business rule violations as exceptions.
"""
success, message = False, None
try:
with override_current_transaction(context.transaction):
success, message = self.run(model)
with override_current_transaction(transaction):
success, message = cls.run_rule(rule, transaction, model)
except Exception as e:
success, message = False, str(e)
if settings.RAISE_BUSINESS_RULE_FAILURES:
raise
finally:
return TrackedModelCheck.objects.create(
check, created = TrackedModelCheck.objects.get_or_create(
{
"successful": success,
"message": message,
"content_hash": model.content_hash().digest(),
},
model=model,
transaction_check=context,
check_name=self.name,
successful=success,
message=message,
check_name=rule.__name__,
)


class BusinessRuleChecker(Checker):
"""
A ``Checker`` that runs a ``BusinessRule`` against a model.
This class is expected to be sub-typed for a specific rule by a call to
``of()``.
Attributes:
checker_cache (dict): (class attribute) Cache of Business checkers created by ``of()``.
"""

rule: Type[BusinessRule]

_checker_cache: Dict[str, BusinessRule] = {}
if not created:
check.successful = success
check.message = message
check.content_hash = model.content_hash().digest()
check.save()
return check

@classmethod
def of(cls: Type, rule_type: Type[BusinessRule]) -> Type:
def apply_rule_cached(
cls,
rule: BusinessRule,
transaction: Transaction,
model: TrackedModel,
):
"""
Return a subclass of a Checker, e.g. BusinessRuleChecker,
IndirectBusinessRuleChecker that runs the passed in business rule.
If a matching TrackedModelCheck instance exists, returns it, otherwise
check rule, and return the result as a TrackedModelCheck instance.
Example, creating a BusinessRuleChecker for ME32:
:return: TrackedModelCheck instance containing the result of the check.
"""
try:
check = TrackedModelCheck.objects.get(
model=model,
check_name=rule.__name__,
)
except TrackedModelCheck.DoesNotExist:
logger.debug(
"apply_rule_cached (no existing check) %s, %s apply rule",
rule.__name__,
transaction,
)
return cls.apply_rule(rule, transaction, model)

# Re-run the rule if the check is not successful.
check_hash = bytes(check.content_hash)
model_hash = model.content_hash().digest()
if check_hash == model_hash:
logger.debug(
"apply_rule_cached (matching content hash) %s, tx: %s, using cached result %s",
rule.__name__,
transaction.pk,
check,
)
return check

>>> BusinessRuleChecker.of(measures.business_rules.ME32)
<class 'checks.checks.BusinessRuleCheckerOf[measures.business_rules.ME32]'>
logger.debug(
"apply_rule_cached (check.content_hash != model.content_hash()) %s != %s %s, %s apply rule",
check_hash,
model_hash,
rule.__name__,
transaction,
)
check.delete()
return cls.apply_rule(rule, transaction, model)

This API is usually called by .applicable_to, however this docstring should
illustrate what it does.

Checkers are created once and then cached in _checker_cache.
class BusinessRuleChecker(Checker):
"""Apply BusinessRules specified in a TrackedModels business_rules
attribute."""

As well as a small performance improvement, caching aids debugging by ensuring
the same checker instance is returned if the same cls is passed to ``of``.
@classmethod
def get_model_rules(cls, model: TrackedModel, rules: Optional[Set[str]] = None):
"""
checker_name = f"{cls.__name__}Of[{rule_type.__module__}.{rule_type.__name__}]"

# If the checker class was already created, return it.
checker_class = cls._checker_cache.get(checker_name)
if checker_class is not None:
return checker_class
# No existing checker was found, so create it:
:param model: TrackedModel instance
:param rules: Optional list of rule names to filter by.
:return: Dict mapping models to a set of the BusinessRules that apply to them.
"""
model_rules = defaultdict(set)

class BusinessRuleCheckerOf(cls):
# Creating this class explicitly in code is more readable than using type(...)
# Once created the name will be mangled to include the rule to be checked.
for rule in model.business_rules:
if rules is not None and rule.__name__ not in rules:
continue

f"""Apply the following checks as specified in {rule_type.__name__}"""
rule = rule_type
model_rules[model].add(rule)

def __repr__(self):
return f"<{checker_name}>"
return model_rules

BusinessRuleCheckerOf.__name__ = checker_name

cls._checker_cache[checker_name] = BusinessRuleCheckerOf
return BusinessRuleCheckerOf
class LinkedModelsBusinessRuleChecker(Checker):
"""Apply BusinessRules specified in a TrackedModels indirect_business_rules
attribute to models returned by get_linked_models on those rules."""

@classmethod
def checkers_for(cls: Type[Self], model: TrackedModel) -> Collection[Self]:
"""If the rule attribute on this BusinessRuleChecker matches any in the
supplied TrackedModel instance's business_rules, return it in a list,
otherwise there are no matches so return an empty list."""
if cls.rule in model.business_rules:
return [cls()]
return []

def run(self, model: TrackedModel) -> CheckResult:
"""
:return CheckResult, a Tuple(rule_passed: str, violation_reason: Optional[str]).
def apply_rule(
cls,
rule: BusinessRule,
transaction: Transaction,
model: TrackedModel,
):
"""
transaction = get_current_transaction()
try:
self.rule(transaction).validate(model)
return True, None
except BusinessRuleViolation as violation:
return False, violation.args[0]


class IndirectBusinessRuleChecker(BusinessRuleChecker):
"""
A ``Checker`` that runs a ``BusinessRule`` against a model that is linked to
the model being checked, and for which a change in the checked model could
result in a business rule failure against the linked model.
LinkedModelsBusinessRuleChecker assumes that the linked models are still
the current.
This is a base class: subclasses for checking specific rules are created by
calling ``of()``.
"""
versions (TODO - ensure a business rule checks this),
rule: Type[BusinessRule]
linked_model: TrackedModel

def __init__(self, linked_model: TrackedModel) -> None:
self.linked_model = linked_model
super().__init__()

@cached_property
def name(self) -> str:
# Include the identity of the linked model in the checker name, so that
# each linked model needs to be checked for all checks to be complete.
return f"{super().name}[{self.linked_model.pk}]"
The transaction to check is set to that of the model, which enables
"""
return super().apply_rule(rule, model.transaction, model)

@classmethod
def checkers_for(cls: Type[Self], model: TrackedModel) -> Collection[Self]:
"""Return a set of IndirectBusinessRuleCheckers for every model found on
rule.get_linked_models."""
rules = set()
transaction = get_current_transaction()
if cls.rule in model.indirect_business_rules:
for linked_model in cls.rule.get_linked_models(model, transaction):
rules.add(cls(linked_model))
return rules

def run(self, model: TrackedModel) -> CheckResult:
def get_model_rules(cls, model: TrackedModel, rules: Optional[Set] = None):
"""
Return the result of running super.run, passing self.linked_model, and.
return it as a CheckResult - a Tuple(rule_passed: str, violation_reason: Optional[str])
:param model: TrackedModel instance
:param rules: Optional list of rule names to filter by.
:return: Dict mapping models to a set of the BusinessRules that apply to them.
"""
result, message = super().run(self.linked_model)
message = f"{self.linked_model}: " + message if message else None
return result, message
tx = get_current_transaction()

model_rules = defaultdict(set)

for rule in [*model.indirect_business_rules]:
for linked_model in rule.get_linked_models(model, tx):
if rules is not None and rule.__name__ not in rules:
continue

def checker_types() -> Iterator[Type[Checker]]:
"""
Return all registered Checker types.
model_rules[linked_model].add(rule)

See ``checks.checks.BusinessRuleChecker.of``.
"""
for rule in ALL_RULES:
yield BusinessRuleChecker.of(rule)
yield IndirectBusinessRuleChecker.of(rule)
return model_rules


def applicable_to(model: TrackedModel) -> Iterator[Checker]:
"""Return instances of any Checker classes applicable to the supplied
TrackedModel instance."""
for checker_type in checker_types():
yield from checker_type.checkers_for(model)
# Checkers in priority list order, checkers for linked models come first.
ALL_CHECKERS = {
"LinkedModelsBusinessRuleChecker": LinkedModelsBusinessRuleChecker,
"BusinessRuleChecker": BusinessRuleChecker,
}
Loading

0 comments on commit 04f5a75

Please sign in to comment.