Skip to content

Commit

Permalink
represent SubjectAltNames as cryptography.x509.GeneralName objects, n…
Browse files Browse the repository at this point in the history
…ot strings. (#6537)

This fixes #6536
  • Loading branch information
mhils committed Dec 14, 2023
1 parent c2f1aa1 commit b659ea0
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 71 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
([#6492](https://github.com/mitmproxy/mitmproxy/pull/6492), @xBZZZZ)
* Enhance documentation and add alert log messages when stream_large_bodies and modify_body are set
([#6514](https://github.com/mitmproxy/mitmproxy/pull/6514), @rosydawn6)
* Subject Alternative Names are now represented as `cryptography.x509.GeneralNames` instead of `list[str]`
across the codebase. This fixes a regression introduced in mitmproxy 10.1.1 related to punycode domain encoding.
([#6537](https://github.com/mitmproxy/mitmproxy/pull/6537), @mhils)


## 14 November 2023: mitmproxy 10.1.5
Expand Down
24 changes: 18 additions & 6 deletions mitmproxy/addons/tlsconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from aioquic.h3.connection import H3_ALPN
from aioquic.tls import CipherSuite
from cryptography import x509
from OpenSSL import crypto
from OpenSSL import SSL

Expand Down Expand Up @@ -488,31 +489,42 @@ def get_cert(self, conn_context: context.Context) -> certs.CertStoreEntry:
This function determines the Common Name (CN), Subject Alternative Names (SANs) and Organization Name
our certificate should have and then fetches a matching cert from the certstore.
"""
altnames: list[str] = []
altnames: list[x509.GeneralName] = []
organization: str | None = None

# Use upstream certificate if available.
if ctx.options.upstream_cert and conn_context.server.certificate_list:
upstream_cert = conn_context.server.certificate_list[0]
if upstream_cert.cn:
altnames.append(upstream_cert.cn)
altnames.append(_ip_or_dns_name(upstream_cert.cn))
altnames.extend(upstream_cert.altnames)
if upstream_cert.organization:
organization = upstream_cert.organization

# Add SNI or our local IP address.
if conn_context.client.sni:
altnames.append(conn_context.client.sni)
altnames.append(_ip_or_dns_name(conn_context.client.sni))
else:
altnames.append(conn_context.client.sockname[0])
altnames.append(_ip_or_dns_name(conn_context.client.sockname[0]))

# If we already know of a server address, include that in the SANs as well.
if conn_context.server.address:
altnames.append(conn_context.server.address[0])
altnames.append(_ip_or_dns_name(conn_context.server.address[0]))

# only keep first occurrence of each hostname
altnames = list(dict.fromkeys(altnames))

# RFC 2818: If a subjectAltName extension of type dNSName is present, that MUST be used as the identity.
# In other words, the Common Name is irrelevant then.
return self.certstore.get_cert(altnames[0], altnames, organization)
cn = next((str(x.value) for x in altnames), None)
return self.certstore.get_cert(cn, altnames, organization)


def _ip_or_dns_name(val: str) -> x509.GeneralName:
"""Convert a string into either an x509.IPAddress or x509.DNSName object."""
try:
ip = ipaddress.ip_address(val)
except ValueError:
return x509.DNSName(val)
else:
return x509.IPAddress(ip)
87 changes: 58 additions & 29 deletions mitmproxy/certs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import ipaddress
import os
import sys
import warnings
from collections.abc import Iterable
from dataclasses import dataclass
from pathlib import Path
from typing import cast
Expand Down Expand Up @@ -58,7 +60,8 @@ def __eq__(self, other):
return self.fingerprint() == other.fingerprint()

def __repr__(self):
return f"<Cert(cn={self.cn!r}, altnames={self.altnames!r})>"
altnames = [str(x.value) for x in self.altnames]
return f"<Cert(cn={self.cn!r}, altnames={altnames!r})>"

def __hash__(self):
return self._cert.__hash__()
Expand Down Expand Up @@ -147,20 +150,18 @@ def organization(self) -> str | None:
return None

@property
def altnames(self) -> list[str]:
def altnames(self) -> x509.GeneralNames:
"""
Get all SubjectAlternativeName DNS altnames.
"""
try:
ext = self._cert.extensions.get_extension_for_class(
sans = self._cert.extensions.get_extension_for_class(
x509.SubjectAlternativeName
).value
except x509.ExtensionNotFound:
return []
return x509.GeneralNames([])
else:
return ext.get_values_for_type(x509.DNSName) + [
str(x) for x in ext.get_values_for_type(x509.IPAddress)
]
return x509.GeneralNames(sans)


def _name_to_keyval(name: x509.Name) -> list[tuple[str, str]]:
Expand Down Expand Up @@ -224,11 +225,41 @@ def create_ca(
return private_key, cert


def _fix_legacy_sans(sans: Iterable[x509.GeneralName] | list[str]) -> x509.GeneralNames:
"""
SANs used to be a list of strings in mitmproxy 10.1 and below, but now they're a list of GeneralNames.
This function converts the old format to the new one.
"""
if isinstance(sans, x509.GeneralNames):
return sans
elif (
isinstance(sans, list) and len(sans) > 0 and isinstance(sans[0], str)
): # pragma: no cover
warnings.warn(
"Passing SANs as a list of strings is deprecated.",
DeprecationWarning,
stacklevel=2,
)

ss: list[x509.GeneralName] = []
for x in cast(list[str], sans):
try:
ip = ipaddress.ip_address(x)
except ValueError:
x = x.encode("idna").decode()
ss.append(x509.DNSName(x))
else:
ss.append(x509.IPAddress(ip))
return x509.GeneralNames(ss)
else:
return x509.GeneralNames(sans)


def dummy_cert(
privkey: rsa.RSAPrivateKey,
cacert: x509.Certificate,
commonname: str | None,
sans: list[str],
sans: Iterable[x509.GeneralName],
organization: str | None = None,
) -> Cert:
"""
Expand Down Expand Up @@ -264,18 +295,10 @@ def dummy_cert(
builder = builder.subject_name(x509.Name(subject))
builder = builder.serial_number(x509.random_serial_number())

ss: list[x509.GeneralName] = []
for x in sans:
try:
ip = ipaddress.ip_address(x)
except ValueError:
x = x.encode("idna").decode()
ss.append(x509.DNSName(x))
else:
ss.append(x509.IPAddress(ip))
# RFC 5280 §4.2.1.6: subjectAltName is critical if subject is empty.
builder = builder.add_extension(
x509.SubjectAlternativeName(ss), critical=not is_valid_commonname
x509.SubjectAlternativeName(_fix_legacy_sans(sans)),
critical=not is_valid_commonname,
)

# https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.1
Expand All @@ -301,7 +324,7 @@ class CertStoreEntry:


TCustomCertId = str # manually provided certs (e.g. mitmproxy's --certs)
TGeneratedCertId = tuple[Optional[str], tuple[str, ...]] # (common_name, sans)
TGeneratedCertId = tuple[Optional[str], x509.GeneralNames] # (common_name, sans)
TCertId = Union[TCustomCertId, TGeneratedCertId]

DHParams = NewType("DHParams", bytes)
Expand Down Expand Up @@ -485,26 +508,31 @@ def add_cert(self, entry: CertStoreEntry, *names: str) -> None:
if entry.cert.cn:
self.certs[entry.cert.cn] = entry
for i in entry.cert.altnames:
self.certs[i] = entry
self.certs[str(i.value)] = entry
for i in names:
self.certs[i] = entry

@staticmethod
def asterisk_forms(dn: str) -> list[str]:
def asterisk_forms(dn: str | x509.GeneralName) -> list[str]:
"""
Return all asterisk forms for a domain. For example, for www.example.com this will return
[b"www.example.com", b"*.example.com", b"*.com"]. The single wildcard "*" is omitted.
"""
parts = dn.split(".")
ret = [dn]
for i in range(1, len(parts)):
ret.append("*." + ".".join(parts[i:]))
return ret
if isinstance(dn, str):
parts = dn.split(".")
ret = [dn]
for i in range(1, len(parts)):
ret.append("*." + ".".join(parts[i:]))
return ret
elif isinstance(dn, x509.DNSName):
return CertStore.asterisk_forms(dn.value)
else:
return [str(dn.value)]

def get_cert(
self,
commonname: str | None,
sans: list[str],
sans: Iterable[x509.GeneralName],
organization: str | None = None,
) -> CertStoreEntry:
"""
Expand All @@ -515,14 +543,15 @@ def get_cert(
organization: Organization name for the generated certificate.
"""
sans = _fix_legacy_sans(sans)

potential_keys: list[TCertId] = []
if commonname:
potential_keys.extend(self.asterisk_forms(commonname))
for s in sans:
potential_keys.extend(self.asterisk_forms(s))
potential_keys.append("*")
potential_keys.append((commonname, tuple(sans)))
potential_keys.append((commonname, sans))

name = next(filter(lambda key: key in self.certs, potential_keys), None)
if name:
Expand All @@ -540,7 +569,7 @@ def get_cert(
chain_file=self.default_chain_file,
chain_certs=self.default_chain_certs,
)
self.certs[(commonname, tuple(sans))] = entry
self.certs[(commonname, sans)] = entry
self.expire(entry)

return entry
Expand Down
2 changes: 1 addition & 1 deletion mitmproxy/tools/console/flowdetailview.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def flowdetails(state, flow: mitmproxy.flow.Flow):
]

if c.altnames:
parts.append(("Alt names", ", ".join(c.altnames)))
parts.append(("Alt names", ", ".join(str(x.value) for x in c.altnames)))
text.extend(common.format_keyvals(parts, indent=4))

if cc is not None:
Expand Down
2 changes: 1 addition & 1 deletion mitmproxy/tools/web/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def cert_to_json(certs: Sequence[certs.Cert]) -> dict | None:
"serial": str(cert.serial),
"subject": cert.subject,
"issuer": cert.issuer,
"altnames": cert.altnames,
"altnames": [str(x.value) for x in cert.altnames],
}


Expand Down
25 changes: 15 additions & 10 deletions test/mitmproxy/addons/test_tlsconfig.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import ipaddress
import ssl
import time
from pathlib import Path
Expand Down Expand Up @@ -128,20 +129,24 @@ def test_get_cert(self, tdata):
ctx.server.certificate_list = [certs.Cert.from_pem(f.read())]
entry = ta.get_cert(ctx)
assert entry.cert.cn == "example.mitmproxy.org"
assert entry.cert.altnames == [
"example.mitmproxy.org",
"server-address.example",
"127.0.0.1",
]
assert entry.cert.altnames == x509.GeneralNames(
[
x509.DNSName("example.mitmproxy.org"),
x509.IPAddress(ipaddress.ip_address("127.0.0.1")),
x509.DNSName("server-address.example"),
]
)

# And now we also incorporate SNI.
ctx.client.sni = "sni.example"
entry = ta.get_cert(ctx)
assert entry.cert.altnames == [
"example.mitmproxy.org",
"sni.example",
"server-address.example",
]
assert entry.cert.altnames == x509.GeneralNames(
[
x509.DNSName("example.mitmproxy.org"),
x509.DNSName("sni.example"),
x509.DNSName("server-address.example"),
]
)

with open(tdata.path("mitmproxy/data/invalid-subject.pem"), "rb") as f:
ctx.server.certificate_list = [certs.Cert.from_pem(f.read())]
Expand Down

0 comments on commit b659ea0

Please sign in to comment.