Skip to content

Commit

Permalink
regroup tests for tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
mathiasertl committed Apr 20, 2024
1 parent 44ad73a commit 9b8d53b
Show file tree
Hide file tree
Showing 13 changed files with 333 additions and 147 deletions.
6 changes: 5 additions & 1 deletion ca/django_ca/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,9 @@ class ExtendedKeyUsageOID(_ExtendedKeyUsageOID):
)

#: Map of hash algorithm types in cryptography to standard hash algorithm names.
#:
#: Keys are the types from :py:attr:`~django_ca.typehints.AllowedHashTypes`, values are the matching names
#: from :py:attr:`~django_ca.typehints.HashAlgorithms`.
HASH_ALGORITHM_NAMES: MappingProxyType[type[AllowedHashTypes], HashAlgorithms] = MappingProxyType(
{
hashes.SHA224: "SHA-224",
Expand All @@ -356,7 +359,8 @@ class ExtendedKeyUsageOID(_ExtendedKeyUsageOID):
}
)

#: Mapping of hash algorithm names to hash algorithm types (the inverse of HASH_ALGORITHM_NAMES).
#: Map of hash algorithm names to hash algorithm types (the inverse of
#: :py:attr:`~django_ca.constants.HASH_ALGORITHM_NAMES`).
HASH_ALGORITHM_TYPES: MappingProxyType[HashAlgorithms, type[AllowedHashTypes]] = MappingProxyType(
{v: k for k, v in HASH_ALGORITHM_NAMES.items()}
)
Expand Down
3 changes: 1 addition & 2 deletions ca/django_ca/pydantic/type_aliases.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
"""Reusable type aliases for Pydantic models."""

import base64
import re
from datetime import timedelta
from typing import Annotated, Any, TypeVar

Expand Down Expand Up @@ -110,7 +109,7 @@ def _get_cryptography_schema(
str,
BeforeValidator(int_to_hex_parser),
AfterValidator(str.upper),
Field(min_length=1, max_length=40, pattern=re.compile("^[A-F0-9]+$")),
Field(min_length=1, max_length=40, pattern="^[A-F0-9]+$"),
]

_timedelta_json_schema = core_schema.chain_schema(
Expand Down
27 changes: 19 additions & 8 deletions ca/django_ca/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def cache_crls(
serials: Optional[Iterable[str]] = None, key_backend_options: Optional[dict[str, dict[str, JSON]]] = None
) -> None:
"""Task to cache the CRLs for all CAs."""
if serials is None: # pragma: no cover; just to make mypy happy
if serials is None:
serials = []
if key_backend_options is None:
key_backend_options = {}
Expand All @@ -115,7 +115,7 @@ def cache_crls(
for serial in serials:
try:
run_task(cache_crl, serial, key_backend_options=key_backend_options.get(serial, {}))
except Exception:
except Exception: # pylint: disable=broad-exception-caught
# NOTE: When using Celery, an exception will only be raised here if task.delay() itself raises an
# exception, e.g. if the connection to the broker fails. Without celery, exceptions in cache_crl()
# are raised here directly.
Expand Down Expand Up @@ -181,17 +181,28 @@ def generate_ocsp_key(


@shared_task
def generate_ocsp_keys(**kwargs: Any) -> None:
def generate_ocsp_keys(
serials: Optional[Iterable[str]] = None, key_backend_options: Optional[dict[str, dict[str, JSON]]] = None
) -> None:
"""Task to generate an OCSP keys for all usable CAs."""
kwargs.setdefault("key_backend_options", {})
for serial in CertificateAuthority.objects.usable().values_list("serial", flat=True):
if serials is None:
serials = []
if key_backend_options is None:
key_backend_options = {}

if not serials:
serials = typing.cast(
Iterable[str], CertificateAuthority.objects.usable().values_list("serial", flat=True)
)

for serial in serials:
try:
run_task(generate_ocsp_key, serial, **kwargs)
except Exception:
run_task(generate_ocsp_key, serial, key_backend_options=key_backend_options.get(serial, {}))
except Exception: # pylint: disable=broad-exception-caught
# NOTE: When using Celery, an exception will only be raised here if task.delay() itself raises an
# exception, e.g. if the connection to the broker fails. Without celery, exceptions in
# generate_ocsp_key() are raised here directly.
log.exception("Error caching CRL for %s", serial)
log.exception("Error creating OCSP responder key for %s", serial)


@shared_task
Expand Down
Empty file.
44 changes: 44 additions & 0 deletions ca/django_ca/tests/tasks/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
# <http://www.gnu.org/licenses/>.

"""Minor assertions for tasks."""

from cryptography import x509
from cryptography.hazmat.primitives.serialization import Encoding

from django.core.cache import cache

from django_ca.models import CertificateAuthority
from django_ca.utils import get_crl_cache_key


def assert_crl(ca: CertificateAuthority, crl: x509.CertificateRevocationList) -> None:
"""Test some basic characteristics of the CRL.
.. NOTE:: Shorter version of main fixture, testing only some basic stuff.
"""
if ca.algorithm is None:
assert crl.signature_hash_algorithm is None
else:
assert isinstance(crl.signature_hash_algorithm, type(ca.algorithm))


def assert_crls(ca: CertificateAuthority) -> None:
"""Assert that the correct CRLs have been generated."""
key = get_crl_cache_key(ca.serial, Encoding.DER, "ca")
crl = x509.load_der_x509_crl(cache.get(key))
assert_crl(ca, crl)

key = get_crl_cache_key(ca.serial, Encoding.DER, "user")
crl = x509.load_der_x509_crl(cache.get(key))
assert_crl(ca, crl)
37 changes: 37 additions & 0 deletions ca/django_ca/tests/tasks/test_cache_crl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
# <http://www.gnu.org/licenses/>.

"""Test the cache_crl task."""

from collections.abc import Iterator

from django.core.cache import cache

import pytest

from django_ca.models import CertificateAuthority
from django_ca.tasks import cache_crl
from django_ca.tests.tasks.conftest import assert_crls


@pytest.fixture(autouse=True)
def _cache() -> Iterator[None]:
# Clear cache for every test
yield
cache.clear()


def test_basic(usable_root: CertificateAuthority) -> None:
"""Test the most basic invocation."""
cache_crl(usable_root.serial)
assert_crls(usable_root)
84 changes: 84 additions & 0 deletions ca/django_ca/tests/tasks/test_cache_crls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
# <http://www.gnu.org/licenses/>.

"""Test the cache_crls task."""

import base64
import logging
from collections.abc import Iterator
from unittest import mock

from cryptography.hazmat.primitives.serialization import Encoding

from django.core.cache import cache

import pytest
from _pytest.logging import LogCaptureFixture

from django_ca.models import CertificateAuthority
from django_ca.tasks import cache_crls
from django_ca.tests.base.constants import CERT_DATA, TIMESTAMPS
from django_ca.tests.tasks.conftest import assert_crls
from django_ca.utils import get_crl_cache_key

pytestmark = [pytest.mark.freeze_time(TIMESTAMPS["everything_valid"])]


@pytest.fixture(autouse=True)
def _cache() -> Iterator[None]:
# Clear cache for every test
yield
cache.clear()


def test_all_crls(usable_cas: list[CertificateAuthority]) -> None:
"""Test caching when all CAs are valid."""
cache_crls()

for ca in usable_cas:
assert_crls(ca)


@pytest.mark.freeze_time(TIMESTAMPS["everything_expired"])
def test_with_expired_certificate_authorities(usable_cas: list[CertificateAuthority]) -> None:
"""Test that nothing is cashed if all CAs are expired."""
cache_crls()

for ca in usable_cas:
key = get_crl_cache_key(ca.serial, Encoding.DER, "ca")
assert cache.get(key) is None


def test_with_key_options(usable_pwd: CertificateAuthority) -> None:
"""Test passing the password explicitly."""
cache_crls([usable_pwd.serial], {usable_pwd.serial: {"password": CERT_DATA["pwd"]["password"]}})
assert_crls(usable_pwd)


def test_with_invalid_password(usable_pwd: CertificateAuthority) -> None:
"""Test passing an invalid password."""
password = base64.b64encode(b"wrong").decode()
cache_crls([usable_pwd.serial], {usable_pwd.serial: {"password": password}})
key = get_crl_cache_key(usable_pwd.serial, Encoding.DER, "ca")
assert cache.get(key) is None


@pytest.mark.usefixtures("root")
def test_with_exception_child_task(caplog: LogCaptureFixture) -> None:
"""Test exceptions for the task are logged."""
with (
mock.patch("django_ca.tasks.run_task", side_effect=Exception("error")),
caplog.at_level(logging.INFO),
):
cache_crls()
assert "Error caching CRL" in caplog.text
72 changes: 72 additions & 0 deletions ca/django_ca/tests/tasks/test_generate_ocsp_key.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
# <http://www.gnu.org/licenses/>.

"""Test the generate_ocsp_key task."""

import base64
from datetime import timedelta

from django.core.files.storage import storages

import pytest

from django_ca import ca_settings
from django_ca.models import Certificate, CertificateAuthority
from django_ca.tasks import generate_ocsp_key
from django_ca.tests.base.constants import CERT_DATA, TIMESTAMPS

pytestmark = [pytest.mark.freeze_time(TIMESTAMPS["everything_valid"])]


def test_with_no_parameters(usable_root: CertificateAuthority) -> None:
"""Test creating a single key."""
storage = storages[ca_settings.CA_DEFAULT_STORAGE_ALIAS]
generate_ocsp_key(usable_root.serial)
assert storage.exists(f"ocsp/{usable_root.serial}.key") is True
assert storage.exists(f"ocsp/{usable_root.serial}.pem") is True


def test_responder_key_validity(usable_root: CertificateAuthority) -> None:
"""Test that the ocsp_responder_key_validity field works."""
qs = Certificate.objects.filter(profile="ocsp", ca=usable_root)
usable_root.ocsp_responder_key_validity = 10
usable_root.save()
assert qs.exists() is False

generate_ocsp_key(usable_root.serial)
cert = qs.get()
assert cert.expires == TIMESTAMPS["everything_valid"] + timedelta(days=10)


def test_with_explicit_password(usable_pwd: CertificateAuthority) -> None:
"""Test explicitly passing a password."""
storage = storages[ca_settings.CA_DEFAULT_STORAGE_ALIAS]
generate_ocsp_key(
usable_pwd.serial, key_backend_options={usable_pwd.serial: {"password": CERT_DATA["pwd"]["password"]}}
)
assert storage.exists(f"ocsp/{usable_pwd.serial}.key") is True
assert storage.exists(f"ocsp/{usable_pwd.serial}.pem") is True


def test_no_renewal_required(usable_root: CertificateAuthority) -> None:
"""Test that keys are not renewed and None is returned in this case."""
assert generate_ocsp_key(usable_root.serial) is not None
assert generate_ocsp_key(usable_root.serial) is None


def test_with_wrong_password(usable_pwd: CertificateAuthority) -> None:
"""Test passing the wrong password."""
password = base64.b64encode(b"wrong").decode()
with pytest.raises(ValueError, match=r"^Could not decrypt private key - bad password\?$"):
generate_ocsp_key(usable_pwd.serial, key_backend_options={"password": password})
assert Certificate.objects.filter(profile="ocsp", ca=usable_pwd).exists() is False
60 changes: 60 additions & 0 deletions ca/django_ca/tests/tasks/test_generate_ocsp_keys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
# <http://www.gnu.org/licenses/>.

"""Test the generate_ocsp_keys task."""

import base64
import logging
from unittest import mock

from django.core.files.storage import storages

import pytest
from _pytest.logging import LogCaptureFixture

from django_ca import ca_settings
from django_ca.models import CertificateAuthority
from django_ca.tasks import generate_ocsp_keys
from django_ca.tests.base.constants import TIMESTAMPS

pytestmark = [pytest.mark.freeze_time(TIMESTAMPS["everything_valid"])]


def test_generate_ocsp_keys_all(usable_cas: list[CertificateAuthority]) -> None:
"""Test creating all keys at once."""
generate_ocsp_keys()
storage = storages[ca_settings.CA_DEFAULT_STORAGE_ALIAS]

for ca in usable_cas:
assert storage.exists(f"ocsp/{ca.serial}.key") is True
assert storage.exists(f"ocsp/{ca.serial}.pem") is True


@pytest.mark.usefixtures("root")
def test_generate_ocsp_keys_with_error(caplog: LogCaptureFixture) -> None:
"""Test case where child-task throws an error."""
with (
mock.patch("django_ca.tasks.run_task", side_effect=Exception("error")),
caplog.at_level(logging.INFO),
):
generate_ocsp_keys()
assert "Error creating OCSP responder key for" in caplog.text


def test_with_invalid_password(usable_pwd: CertificateAuthority) -> None:
"""Test passing an invalid password."""
password = base64.b64encode(b"wrong").decode()
storage = storages[ca_settings.CA_DEFAULT_STORAGE_ALIAS]
generate_ocsp_keys([usable_pwd.serial], {usable_pwd.serial: {"password": password}})
assert storage.exists(f"ocsp/{usable_pwd.serial}.key") is False
assert storage.exists(f"ocsp/{usable_pwd.serial}.pem") is False

0 comments on commit 9b8d53b

Please sign in to comment.