Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 4 check valid chain #6

Merged
merged 2 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ install_requires =
django-choices
django-privates
pyopenssl
cryptograpy
certifi
tests_require =
pytest
pytest-django
Expand Down
15 changes: 14 additions & 1 deletion simple_certmanager/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ class CertificateAdmin(PrivateMediaMixin, admin.ModelAdmin):
form = CertificateAdminForm

fields = ("label", "type", "public_certificate", "private_key")
list_display = ("get_label", "type", "expiry_date", "is_valid_key_pair")
list_display = (
"get_label",
"type",
"expiry_date",
"is_valid_key_pair",
"has_valid_chain",
)
list_filter = ("label", "type")
search_fields = ("label", "type")

Expand All @@ -37,3 +43,10 @@ def is_valid_key_pair(self, obj=None):
return obj.is_valid_key_pair()
except FileNotFoundError:
return None

@admin.display(description=_("valid chain"), boolean=True)
def has_valid_chain(self, obj: Certificate):
try:
return obj.has_valid_chain()
except FileNotFoundError:
return None
8 changes: 7 additions & 1 deletion simple_certmanager/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from .constants import CertificateTypes
from .mixins import DeleteFileFieldFilesMixin
from .utils import pretty_print_certificate_components
from .utils import check_pem, pretty_print_certificate_components


class Certificate(DeleteFileFieldFilesMixin, models.Model):
Expand Down Expand Up @@ -97,3 +97,9 @@ def is_valid_key_pair(self) -> Optional[bool]:
return True

is_valid_key_pair.boolean = True

def has_valid_chain(self) -> Optional[bool]:
with self.public_certificate.open(mode="rb") as f:
return check_pem(f.read())

has_valid_chain.boolean = True
54 changes: 54 additions & 0 deletions simple_certmanager/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,60 @@
from os import PathLike
from typing import Generator, Optional, Union

import certifi
from cryptography import x509
from OpenSSL import crypto


def pretty_print_certificate_components(x509name) -> str:
components = [
(label.decode("utf-8"), value.decode("utf-8"))
for (label, value) in x509name.get_components()
]
return ", ".join([f"{label}: {value}" for (label, value) in components])


def split_pem(pem: bytes) -> Generator[bytes, None, None]:
"Split a concatenated pem into its constituent parts"
mark = b"-----END CERTIFICATE-----"
if mark not in pem:
return
end = pem.find(mark) + len(mark)
yield pem[:end]
yield from split_pem(pem[end:])


def load_pem_chain(pem: bytes) -> Generator[x509.Certificate, None, None]:
for data in split_pem(pem):
yield x509.load_pem_x509_certificate(data)


def check_pem(
pem: bytes,
ca: Union[bytes, str, PathLike] = certifi.where(),
ca_path: Optional[Union[str, PathLike]] = None,
) -> bool:
"""Simple (possibly incomplete) sanity check on pem chain.

If the pam passes this check it MAY be valid for use. This is only intended
to catch blatant misconfigurations early. This gives NO guarantees on
security nor authenticity.
"""
# We need still need to use pyOpenSSL primitives for this:
# https://github.com/pyca/cryptography/issues/6229
# https://github.com/pyca/cryptography/issues/2381

# Establish roots
store = crypto.X509Store()
store.load_locations(ca, ca_path)

leaf, *chain = map(crypto.X509.from_cryptography, load_pem_chain(pem))

# Create a context
ctx = crypto.X509StoreContext(store, leaf, chain)
try:
ctx.verify_certificate()
except crypto.X509StoreContextError:
return False
else:
return True
181 changes: 181 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import datetime
from pathlib import Path

import pytest
from cryptography import x509
from cryptography.hazmat.primitives import asymmetric, hashes, serialization


@pytest.fixture(scope="session")
def root_key() -> asymmetric.rsa.RSAPrivateKey:
"RSA key for the RootCA"
key = gen_key()
# with (Path(__file__).parent / "data" / "test.key").open("rb") as f:
# return serialization.load_pem_private_key(f.read(), password=None)
return key


@pytest.fixture(scope="session")
def root_cert(root_key) -> x509.Certificate:
"Certificate for the RootCA"
return mkcert(
x509.Name(
[
x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, "NL"),
x509.NameAttribute(x509.oid.NameOID.STATE_OR_PROVINCE_NAME, "NH"),
x509.NameAttribute(x509.oid.NameOID.LOCALITY_NAME, "Amsterdam"),
x509.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME, "Root CA"),
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "rootca.example.org"),
]
),
root_key,
)


@pytest.fixture
def leaf_pem(root_cert: x509.Certificate, root_key) -> bytes:
"A valid pem encoded certificate directly issued by the Root CA"
leaf_cert = mkcert(
subject=x509.Name(
[
x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, "NL"),
x509.NameAttribute(
x509.oid.NameOID.STATE_OR_PROVINCE_NAME, "Some-State"
),
x509.NameAttribute(
x509.oid.NameOID.ORGANIZATION_NAME, "Internet Widgits Pty Ltd"
),
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "widgits.example.org"),
]
),
subject_key=gen_key(),
issuer=root_cert.subject,
issuer_key=root_key,
can_issue=False,
)
return to_pem(leaf_cert)


@pytest.fixture
def chain_pem(root_cert: x509.Certificate, root_key) -> bytes:
"A valid pem encoded full certificate chain"
inter_key = gen_key()
intermediate_cert = mkcert(
subject=x509.Name(
[
x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, "NL"),
x509.NameAttribute(
x509.oid.NameOID.ORGANIZATION_NAME, "Men in the Middle Ltd"
),
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "mitm.example.org"),
]
),
subject_key=inter_key,
issuer=root_cert.subject,
issuer_key=root_key,
can_issue=True,
)
leaf_cert = mkcert(
subject=x509.Name(
[
x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, "NL"),
x509.NameAttribute(
x509.oid.NameOID.STATE_OR_PROVINCE_NAME, "Some-State"
),
x509.NameAttribute(
x509.oid.NameOID.ORGANIZATION_NAME, "Internet Widgits Pty Ltd"
),
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "widgits.example.org"),
]
),
subject_key=gen_key(),
issuer=intermediate_cert.subject,
issuer_key=inter_key,
can_issue=False,
)
return b"".join(map(to_pem, [leaf_cert, intermediate_cert]))


@pytest.fixture
def broken_chain_pem(root_cert: x509.Certificate, root_key):
"""An invalid pem encoded full certificate chain.

The intermediate is no a valid issuer.
"""
inter_key = gen_key()
intermediate_cert = mkcert(
subject=x509.Name(
[
x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, "NL"),
x509.NameAttribute(
x509.oid.NameOID.ORGANIZATION_NAME, "Men in the Middle Ltd"
),
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "mitm.example.org"),
]
),
subject_key=inter_key,
issuer=root_cert.subject,
issuer_key=root_key,
can_issue=False, # Middle isn't allowed to issue certs.
)
leaf_cert = mkcert(
subject=x509.Name(
[
x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, "NL"),
x509.NameAttribute(
x509.oid.NameOID.STATE_OR_PROVINCE_NAME, "Some-State"
),
x509.NameAttribute(
x509.oid.NameOID.ORGANIZATION_NAME, "Internet Widgits Pty Ltd"
),
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "widgits.example.org"),
]
),
subject_key=gen_key(),
issuer=intermediate_cert.subject,
issuer_key=inter_key,
can_issue=False,
)
return b"".join(map(to_pem, [leaf_cert, intermediate_cert]))


@pytest.fixture(scope="session")
def root_ca_path(root_cert, tmp_path_factory) -> Path:
"A path to a temporary .pem for the Root CA"
cert_path = tmp_path_factory.mktemp("fake_pki") / "fake_ca_cert.pem"
with cert_path.open("wb") as f:
f.write(to_pem(root_cert))
return cert_path


def mkcert(subject, subject_key, issuer=None, issuer_key=None, can_issue=True):
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer if issuer else subject)
.public_key(subject_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.utcnow())
.not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=1))
.add_extension(
x509.SubjectAlternativeName([x509.DNSName("localhost")]),
critical=False,
)
)

if can_issue:
cert = cert.add_extension(
x509.BasicConstraints(ca=True, path_length=None),
critical=True,
)

cert = cert.sign(issuer_key if issuer_key else subject_key, hashes.SHA256())
return cert


def to_pem(cert: x509.Certificate) -> bytes:
return cert.public_bytes(serialization.Encoding.PEM)


def gen_key():
return asymmetric.rsa.generate_private_key(public_exponent=0x10001, key_size=2048)
17 changes: 17 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from simple_certmanager.utils import check_pem


def test_check_pem_checks_directly_issued(leaf_pem, root_ca_path):
assert check_pem(leaf_pem, ca=root_ca_path)


def test_check_pem_fails_unrooted_pem(leaf_pem):
assert not check_pem(leaf_pem)


def test_check_pem_checks_chain(chain_pem, root_ca_path):
assert check_pem(chain_pem, ca=root_ca_path)


def test_check_pem_fails_bad_chain(broken_chain_pem, root_ca_path):
assert not check_pem(broken_chain_pem, ca=root_ca_path)