Skip to content

Commit

Permalink
Centralize check registration / running logic (#85)
Browse files Browse the repository at this point in the history
* Add central check registry / runner

* Add pytest fixture to automatically clear check registry

* Use centralized check registry in Flask

* Use centralized check registry in Sanic

* Use correct path for django test urlconf

* Delete Django level_to_text helper

* Delete commented-out default Django check

* Refactor reset_checks django fixture

- remove version check
- autouse so that checks are cleared after every test run

* Use centralized check runner in Django

* Tests for running checks

* Add flask tests for registering migration check with app

* Adjust wording in docstring and logs

* Remove superfluous return

* Restore legacy methods with deprecation warning

* Rename init_check to register_partial

* Fix posargs passing to pytest

* Remove problematic test

* Revert "Remove problematic test"

This reverts commit 80ef729.

* Use a fixture to add logger handlers for request.summary

* Reset logging before loading ini

* Run black 24.1a1

---------

Co-authored-by: Mathieu Leplatre <mathieu@mozilla.com>
  • Loading branch information
grahamalama and leplatrem committed Dec 15, 2023
1 parent a7cad34 commit 886ac45
Show file tree
Hide file tree
Showing 14 changed files with 676 additions and 294 deletions.
8 changes: 8 additions & 0 deletions src/dockerflow/checks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,11 @@
Warning,
level_to_text,
)
from .registry import ( # noqa
clear_checks,
get_checks,
register,
register_partial,
run_checks,
run_checks_async,
)
213 changes: 213 additions & 0 deletions src/dockerflow/checks/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at http://mozilla.org/MPL/2.0/.
import asyncio
import functools
import inspect
import logging
from dataclasses import dataclass
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple

from .messages import CheckMessage, level_to_text

logger = logging.getLogger(__name__)

CheckFn = Callable[..., List[CheckMessage]]

_REGISTERED_CHECKS = {}


def _iscoroutinefunction_or_partial(obj):
"""
Determine if the provided object is a coroutine function or a partial function
that wraps a coroutine function.
This function should be removed when we drop support for Python 3.7, as this is
handled directly by `inspect.iscoroutinefunction` in Python 3.8.
"""
while isinstance(obj, functools.partial):
obj = obj.func
return inspect.iscoroutinefunction(obj)


def register(func=None, name=None):
"""
Register a check callback to be executed from
the heartbeat endpoint.
"""
if func is None:
return functools.partial(register, name=name)

if name is None:
name = func.__name__

logger.debug("Register Dockerflow check %s", name)

if _iscoroutinefunction_or_partial(func):

@functools.wraps(func)
async def decorated_function_asyc(*args, **kwargs):
logger.debug("Called Dockerflow check %s", name)
return await func(*args, **kwargs)

_REGISTERED_CHECKS[name] = decorated_function_asyc
return decorated_function_asyc

@functools.wraps(func)
def decorated_function(*args, **kwargs):
logger.debug("Called Dockerflow check %s", name)
return func(*args, **kwargs)

_REGISTERED_CHECKS[name] = decorated_function
return decorated_function


def register_partial(func, *args, name=None):
"""
Registers a given check callback that will be called with the provided
arguments using `functools.partial()`. For example:
.. code-block:: python
dockerflow.register_partial(check_redis_connected, redis)
"""
if name is None:
name = func.__name__

logger.debug("Register Dockerflow check %s with partially applied arguments" % name)
partial = functools.wraps(func)(functools.partial(func, *args))
return register(func=partial, name=name)


def get_checks():
return _REGISTERED_CHECKS


def clear_checks():
global _REGISTERED_CHECKS
_REGISTERED_CHECKS = dict()


@dataclass
class ChecksResults:
"""
Represents the results of running checks.
This data class holds the results of running a collection of checks. It includes
details about each check's outcome, their statuses, and the overall result level.
:param details: A dictionary containing detailed information about each check's
outcome, with check names as keys and dictionaries of details as values.
:type details: Dict[str, Dict[str, Any]]
:param statuses: A dictionary containing the status of each check, with check names
as keys and statuses as values (e.g., 'pass', 'fail', 'warning').
:type statuses: Dict[str, str]
:param level: An integer representing the overall result level of the checks
:type level: int
"""

details: Dict[str, Dict[str, Any]]
statuses: Dict[str, str]
level: int


async def _run_check_async(check):
name, check_fn = check
if _iscoroutinefunction_or_partial(check_fn):
errors = await check_fn()
else:
loop = asyncio.get_event_loop()
errors = await loop.run_in_executor(None, check_fn)

return (name, errors)


async def run_checks_async(
checks: Iterable[Tuple[str, CheckFn]],
silenced_check_ids: Optional[Iterable[str]] = None,
) -> ChecksResults:
"""
Run checks concurrently and return the results.
Executes a collection of checks concurrently, supporting both synchronous and
asynchronous checks. The results include the outcome of each check and can be
further processed.
:param checks: An iterable of tuples where each tuple contains a check name and a
check function.
:type checks: Iterable[Tuple[str, CheckFn]]
:param silenced_check_ids: A list of check IDs that should be omitted from the
results.
:type silenced_check_ids: List[str]
:return: An instance of ChecksResults containing detailed information about each
check's outcome, their statuses, and the overall result level.
:rtype: ChecksResults
"""
if silenced_check_ids is None:
silenced_check_ids = []

tasks = (_run_check_async(check) for check in checks)
results = await asyncio.gather(*tasks)
return _build_results_payload(results, silenced_check_ids)


def run_checks(
checks: Iterable[Tuple[str, CheckFn]],
silenced_check_ids: Optional[Iterable[str]] = None,
) -> ChecksResults:
"""
Run checks synchronously and return the results.
Executes a collection of checks and returns the results. The results include the
outcome of each check and can be further processed.
:param checks: An iterable of tuples where each tuple contains a check name and a
check function.
:type checks: Iterable[Tuple[str, CheckFn]]
:param silenced_check_ids: A list of check IDs that should be omitted from the
results.
:type silenced_check_ids: List[str]
:return: An instance of ChecksResults containing detailed information about each
check's outcome, their statuses, and the overall result level.
:rtype: ChecksResults
"""
if silenced_check_ids is None:
silenced_check_ids = []
results = [(name, check()) for name, check in checks]
return _build_results_payload(results, silenced_check_ids)


def _build_results_payload(
checks_results: Iterable[Tuple[str, Iterable[CheckMessage]]],
silenced_check_ids,
):
details = {}
statuses = {}
max_level = 0

for name, errors in checks_results:
# Log check results with appropriate level.
for error in errors:
logger.log(error.level, "%s: %s", error.id, error.msg)

errors = [e for e in errors if e.id not in silenced_check_ids]
level = max([0] + [e.level for e in errors])

detail = {
"status": level_to_text(level),
"level": level,
"messages": {e.id: e.msg for e in errors},
}
statuses[name] = level_to_text(level)
max_level = max(max_level, level)
if level > 0:
details[name] = detail

return ChecksResults(statuses=statuses, details=details, level=max_level)
13 changes: 0 additions & 13 deletions src/dockerflow/django/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,6 @@
from .. import health


def level_to_text(level):
statuses = {
0: "ok",
checks.messages.DEBUG: "debug",
checks.messages.INFO: "info",
checks.messages.WARNING: "warning",
checks.messages.ERROR: "error",
checks.messages.CRITICAL: "critical",
}
return statuses.get(level, "unknown")


def check_database_connected(app_configs, **kwargs):
"""
A Django check to see if connecting to the configured default
Expand Down Expand Up @@ -119,7 +107,6 @@ def register():
[
"dockerflow.django.checks.check_database_connected",
"dockerflow.django.checks.check_migrations_applied",
# 'dockerflow.django.checks.check_redis_connected',
],
)
for check_path in check_paths:
Expand Down
54 changes: 18 additions & 36 deletions src/dockerflow/django/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
import logging

from django.conf import settings
from django.core import checks
from django.core.checks.registry import registry as django_check_registry
from django.http import HttpResponse, HttpResponseNotFound, JsonResponse
from django.utils.module_loading import import_string

from .checks import level_to_text
from dockerflow import checks

from .signals import heartbeat_failed, heartbeat_passed

version_callback = getattr(
Expand Down Expand Up @@ -47,44 +48,25 @@ def heartbeat(request):
Any check that returns a warning or worse (error, critical) will
return a 500 response.
"""
all_checks = checks.registry.registry.get_checks(
include_deployment_checks=not settings.DEBUG
checks_to_run = (
(check.__name__, lambda: check(app_configs=None))
for check in django_check_registry.get_checks(
include_deployment_checks=not settings.DEBUG
)
)

details = {}
statuses = {}
level = 0

for check in all_checks:
check_level, check_errors = heartbeat_check_detail(check)
level_text = level_to_text(check_level)
statuses[check.__name__] = level_text
level = max(level, check_level)
if level > 0:
for error in check_errors:
logger.log(error.level, "%s: %s", error.id, error.msg)
details[check.__name__] = {
"status": level_text,
"level": level,
"messages": {e.id: e.msg for e in check_errors},
}

if level < checks.messages.ERROR:
check_results = checks.run_checks(
checks_to_run,
silenced_check_ids=settings.SILENCED_SYSTEM_CHECKS,
)
if check_results.level < checks.ERROR:
status_code = 200
heartbeat_passed.send(sender=heartbeat, level=level)
heartbeat_passed.send(sender=heartbeat, level=check_results.level)
else:
status_code = 500
heartbeat_failed.send(sender=heartbeat, level=level)
heartbeat_failed.send(sender=heartbeat, level=check_results.level)

payload = {"status": level_to_text(level)}
payload = {"status": checks.level_to_text(check_results.level)}
if settings.DEBUG:
payload["checks"] = statuses
payload["details"] = details
payload["checks"] = check_results.statuses
payload["details"] = check_results.details
return JsonResponse(payload, status=status_code)


def heartbeat_check_detail(check):
errors = check(app_configs=None)
errors = list(filter(lambda e: e.id not in settings.SILENCED_SYSTEM_CHECKS, errors))
level = max([0] + [e.level for e in errors])
return level, errors
Loading

0 comments on commit 886ac45

Please sign in to comment.