Skip to content

Commit

Permalink
Migration:
Browse files Browse the repository at this point in the history
Migrate
TrackedModelChecks to new structure.
remove TransactionCheck.

Start moving business rules into the database, and provide sync_business_rules to do that, along with a mechanism to do this in tests.
  • Loading branch information
stuaxo committed Aug 26, 2022
1 parent c12f576 commit 6eed927
Show file tree
Hide file tree
Showing 25 changed files with 1,189 additions and 551 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/django.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ jobs:
run: |
python manage.py makemigrations --dry-run --check
- name: Check for business rule data migrations
env:
DATABASE_URL: postgres://postgres:postgres@localhost:5432/postgres
DJANGO_SETTINGS_MODULE: settings.test
run: |
python manage.py sync_business_rules --check
- name: Run tests
env:
DATABASE_URL: postgres://postgres:postgres@localhost:5432/postgres
Expand Down
222 changes: 57 additions & 165 deletions checks/checks.py
Original file line number Diff line number Diff line change
@@ -1,219 +1,111 @@
import abc
import logging
import typing
from collections import defaultdict
from typing import Optional
from typing import Set
from typing import Tuple

from django.conf import settings

from checks.models import BusinessRuleModel
from checks.models import BusinessRuleResult
from checks.models import TrackedModelCheck
from common.business_rules import BusinessRule
from common.business_rules import BusinessRuleViolation
from common.models import TrackedModel
from common.models import Transaction
from common.models.utils import get_current_transaction
from common.models.utils import override_current_transaction

logger = logging.getLogger(__name__)

CheckResult = Tuple[bool, Optional[str]]


class Checker:
class Checker(abc.ABC):
@classmethod
def run_rule(
cls,
rule: BusinessRule,
transaction: Transaction,
@abc.abstractmethod
def get_model_rule_mapping(
cls: abc.abstractclassmethod,
model: TrackedModel,
) -> CheckResult:
"""
Run a single business rule on a single model.
:return CheckResult, a Tuple(rule_passed: str, violation_reason: Optional[str]).
"""
logger.debug(f"run_rule %s %s %s", model, rule, transaction.pk)
try:
rule(transaction).validate(model)
logger.debug(f"%s [tx:%s] %s [passed]", model, rule, transaction.pk)
return True, None
except BusinessRuleViolation as violation:
reason = violation.args[0]
logger.debug(f"%s [tx:%s] %s [failed]", model, rule, transaction.pk, reason)
return False, reason
rules: Optional[Set[str]] = None,
) -> typing.Dict[TrackedModel, Set[str]]:
"""Implementing classes should return a dict mapping classes to sets of
business rules that apply to them."""
return {}

@classmethod
def apply_rule(
def apply_rules(
cls,
rule: BusinessRule,
rules: typing.Sequence[BusinessRule],
transaction: Transaction,
model: TrackedModel,
):
"""
Applies the rule to the model and records success in a
TrackedModelCheck.
TODO - Get rules_to_run - set of rules that have not been run.
"""
# model.content_hash().digest()

If a TrackedModelCheck already exists with a matching content checksum it
will be updated, otherwise a new one will be created.
# rule_models = {
# rule_model.name: rule_model for rule_model in type(model).get_business_rule_models()
# }
# TrackedModel check represents and ongoing check

:return: TrackedModelCheck instance containing the result of the check.
# To minimise the amount of queries, data is fetched up front and results are batched where possible.
rule_models = [*BusinessRuleModel.from_rules(rules)]

During debugging the developer can set settings.RAISE_BUSINESS_RULE_FAILURES
to True to raise business rule violations as exceptions.
"""
success, message = False, None
try:
with override_current_transaction(transaction):
success, message = cls.run_rule(rule, transaction, model)
except Exception as e:
success, message = False, str(e)
if settings.RAISE_BUSINESS_RULE_FAILURES:
# RAISE_BUSINESS_RULE_FAILURES can be set by the developer to raise
# Exceptions.
raise
finally:
check, created = TrackedModelCheck.objects.get_or_create(
{
"successful": success,
"message": message,
"content_hash": model.content_hash().digest(),
},
model=model,
check_name=rule.__name__,
)
if not created:
check.successful = success
check.message = message
check.content_hash = model.content_hash().digest()
check.save()
return check
head_transaction = Transaction.objects.approved().last()
check, created = TrackedModelCheck.objects.get_or_create(
model=model,
head_transaction=head_transaction,
# content_hash=model.content_hash().digest(),
)

@classmethod
def apply_rule_cached(
cls,
rule: BusinessRule,
transaction: Transaction,
model: TrackedModel,
):
"""
If a matching TrackedModelCheck instance exists, returns it, otherwise
check rule, and return the result as a TrackedModelCheck instance.
# TODO: Get exclude existing rules
results = [
rule_model.get_result(model.transaction, model)
for rule_model in rule_models
]

:return: TrackedModelCheck instance containing the result of the check.
"""
try:
check = TrackedModelCheck.objects.get(
model=model,
check_name=rule.__name__,
)
except TrackedModelCheck.DoesNotExist:
logger.debug(
"apply_rule_cached (no existing check) %s, %s apply rule",
rule.__name__,
transaction,
)
return cls.apply_rule(rule, transaction, model)

# Re-run the rule if the content checksum no longer matches that of the previous test.
check_hash = bytes(check.content_hash)
model_hash = model.content_hash().digest()
if check_hash == model_hash:
logger.debug(
"apply_rule_cached (matching content hash) %s, tx: %s, using cached result %s",
rule.__name__,
transaction.pk,
check,
)
return check

logger.debug(
"apply_rule_cached (check.content_hash != model.content_hash()) %s != %s %s, %s apply rule",
check_hash,
model_hash,
rule.__name__,
transaction,
)
check.delete()
return cls.apply_rule(rule, transaction, model)
results = BusinessRuleResult.objects.bulk_create(results)

check.results.add(*results)
print(results)
return check


class BusinessRuleChecker(Checker):
"""Apply BusinessRules specified in a TrackedModels business_rules
attribute."""
"""A``Checker`` that runs a ``BusinessRule`` against a model."""

@classmethod
def apply_rule(
def get_model_rule_mapping(
cls,
rule: BusinessRule,
transaction: Transaction,
model: TrackedModel,
rules: Optional[Set[str]] = None,
):
"""
Run the current business rule on the model.
:return: TrackedModelCheck instance containing the result of the check.
:raises: ValueError if the rule is not in the model's business_rules attribute
To get a list of applicable rules, get_model_rules can be used.
"""
if rule not in model.business_rules:
raise ValueError(
f"{model} does not have {rule} in its business_rules attribute.",
)

return super().apply_rule(rule, transaction, model)
Return a dict mapping business rules to the passed in model.
@classmethod
def get_model_rules(cls, model: TrackedModel, rules: Optional[Set[str]] = None):
"""
This returns a dict, with the passed in model used as a key (this allows LinkedModelsBusinessRuleChecker to map models other than the passed in model to rules.)
:param model: TrackedModel instance
:param rules: Optional list of rule names to filter by.
:return: Dict mapping models to a set of the BusinessRules that apply to them.
:return: Dict with one entry for the passed in model the values are the rule instances to apply.
"""
model_rules = defaultdict(set)
if rules is None:
return {model: set(model.business_rules)}

for rule in model.business_rules:
if rules is not None and rule.__name__ not in rules:
continue

model_rules[model].add(rule)

# Downcast to a dict - this API (and unit testing) a little more sane.
return {**model_rules}
# User passed in a certain set of rule names to run, filter the business rules by these names
filtered_rules = {
rule for rule in model.business_rules if rule.__name__ in rules
}
return {model: filtered_rules}


class LinkedModelsBusinessRuleChecker(Checker):
"""Apply BusinessRules specified in a TrackedModels indirect_business_rules
attribute to models returned by get_linked_models on those rules."""

@classmethod
def apply_rule(
cls,
rule: BusinessRule,
transaction: Transaction,
model: TrackedModel,
):
"""
LinkedModelsBusinessRuleChecker assumes that the linked models are
still.
the current versions (TODO - ensure a business rule checks this),
:return: TrackedModelCheck instance containing the result of the check.
:raises: ValueError if the rule is not in the model's indirect_business_rules attribute
get_model_rules should be called to get a list of applicable rules and them models they apply to.
"""
if rule not in model.indirect_business_rules:
raise ValueError(
f"{model} does not have {rule} in its indirect_business_rules attribute.",
)

return super().apply_rule(rule, model.transaction, model)
"""A ``Checker`` that runs a ``BusinessRule`` against a model that is linked
to the model being checked, and for which a change in the checked model
could result in a business rule failure against the linked model."""

@classmethod
def get_model_rules(cls, model: TrackedModel, rules: Optional[Set] = None):
def get_model_rule_mapping(cls, model: TrackedModel, rules: Optional[Set] = None):
"""
:param model: Initial TrackedModel instance
:param rules: Optional list of rule names to filter by.
Expand Down
Empty file added checks/management/__init__.py
Empty file.
Empty file.
40 changes: 40 additions & 0 deletions checks/management/commands/list_business_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from collections import defaultdict

from django.core.management import BaseCommand

from checks.models import BusinessRuleModel
from common.business_rules import ALL_RULES


class Command(BaseCommand):
"""Display the business rules in the system and database."""

def handle(self, *app_labels, **options):
self.stdout.write("Rule Name, In System, In Database, Status")

# Create a dictionary of rule_names, then a couple of flags
# to determine status.
rule_info = defaultdict(dict)
for rule_name in BusinessRuleModel.objects.values("name"):
rule_info[rule_name["name"]]["in_database"] = True

for rule_name in ALL_RULES.keys():
rule_info[rule_name]["in_system"] = True

for rule_name, info in rule_info.items():
in_database = info.get("in_database", False)
in_system = info.get("in_system", False)

if in_database and in_system:
status = "In Sync"
elif in_database:
status = "Pending Removal"
elif in_system:
status = "Pending Addition"

self.stdout.write(
f"{rule_name},"
f" {'Y' if in_system else 'N'},"
f" {'Y' if in_database else 'N'},"
f" {status}",
)
Loading

0 comments on commit 6eed927

Please sign in to comment.