Skip to content

Commit

Permalink
Merge pull request #2363 from etrauschke/asn_utils
Browse files Browse the repository at this point in the history
should have _asn1_* utility functions in a common place
  • Loading branch information
reaperhulk committed Sep 24, 2015
2 parents b99a359 + 20a05d9 commit 6272e95
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 73 deletions.
47 changes: 47 additions & 0 deletions src/cryptography/hazmat/backends/openssl/backend.py
Expand Up @@ -6,6 +6,7 @@

import calendar
import collections
import datetime
import itertools
from contextlib import contextmanager

Expand Down Expand Up @@ -1941,6 +1942,52 @@ def _public_key_bytes(self, encoding, format, evp_pkey, cdata):
assert res == 1
return self._read_mem_bio(bio)

def _asn1_integer_to_int(self, asn1_int):
bn = self._lib.ASN1_INTEGER_to_BN(asn1_int, self._ffi.NULL)
assert bn != self._ffi.NULL
bn = self._ffi.gc(bn, self._lib.BN_free)
return self._bn_to_int(bn)

def _asn1_string_to_bytes(self, asn1_string):
return self._ffi.buffer(asn1_string.data, asn1_string.length)[:]

def _asn1_string_to_ascii(self, asn1_string):
return self._asn1_string_to_bytes(asn1_string).decode("ascii")

def _asn1_string_to_utf8(self, asn1_string):
buf = self._ffi.new("unsigned char **")
res = self._lib.ASN1_STRING_to_UTF8(buf, asn1_string)
assert res >= 0
assert buf[0] != self._ffi.NULL
buf = self._ffi.gc(
buf, lambda buffer: self._lib.OPENSSL_free(buffer[0])
)
return self._ffi.buffer(buf[0], res)[:].decode('utf8')

def _asn1_to_der(self, asn1_type):
buf = self._ffi.new("unsigned char **")
res = self._lib.i2d_ASN1_TYPE(asn1_type, buf)
assert res >= 0
assert buf[0] != self._ffi.NULL
buf = self._ffi.gc(
buf, lambda buffer: self._lib.OPENSSL_free(buffer[0])
)
return self._ffi.buffer(buf[0], res)[:]

def _parse_asn1_time(self, asn1_time):
assert asn1_time != self._ffi.NULL
generalized_time = self._lib.ASN1_TIME_to_generalizedtime(
asn1_time, self._ffi.NULL
)
assert generalized_time != self._ffi.NULL
generalized_time = self._ffi.gc(
generalized_time, self._lib.ASN1_GENERALIZEDTIME_free
)
time = self._asn1_string_to_ascii(
self._ffi.cast("ASN1_STRING *", generalized_time)
)
return datetime.datetime.strptime(time, "%Y%m%d%H%M%SZ")


class GetCipherByName(object):
def __init__(self, fmt):
Expand Down
89 changes: 16 additions & 73 deletions src/cryptography/hazmat/backends/openssl/x509.py
Expand Up @@ -4,7 +4,6 @@

from __future__ import absolute_import, division, print_function

import datetime
import ipaddress
from email.utils import parseaddr

Expand All @@ -30,49 +29,12 @@ def _obj2txt(backend, obj):
return backend._ffi.buffer(buf, res)[:].decode()


def _asn1_integer_to_int(backend, asn1_int):
bn = backend._lib.ASN1_INTEGER_to_BN(asn1_int, backend._ffi.NULL)
assert bn != backend._ffi.NULL
bn = backend._ffi.gc(bn, backend._lib.BN_free)
return backend._bn_to_int(bn)


def _asn1_string_to_bytes(backend, asn1_string):
return backend._ffi.buffer(asn1_string.data, asn1_string.length)[:]


def _asn1_string_to_ascii(backend, asn1_string):
return _asn1_string_to_bytes(backend, asn1_string).decode("ascii")


def _asn1_string_to_utf8(backend, asn1_string):
buf = backend._ffi.new("unsigned char **")
res = backend._lib.ASN1_STRING_to_UTF8(buf, asn1_string)
assert res >= 0
assert buf[0] != backend._ffi.NULL
buf = backend._ffi.gc(
buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0])
)
return backend._ffi.buffer(buf[0], res)[:].decode('utf8')


def _asn1_to_der(backend, asn1_type):
buf = backend._ffi.new("unsigned char **")
res = backend._lib.i2d_ASN1_TYPE(asn1_type, buf)
assert res >= 0
assert buf[0] != backend._ffi.NULL
buf = backend._ffi.gc(
buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0])
)
return backend._ffi.buffer(buf[0], res)[:]


def _decode_x509_name_entry(backend, x509_name_entry):
obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry)
assert obj != backend._ffi.NULL
data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry)
assert data != backend._ffi.NULL
value = _asn1_string_to_utf8(backend, data)
value = backend._asn1_string_to_utf8(data)
oid = _obj2txt(backend, obj)

return x509.NameAttribute(x509.ObjectIdentifier(oid), value)
Expand Down Expand Up @@ -101,7 +63,7 @@ def _decode_general_names(backend, gns):

def _decode_general_name(backend, gn):
if gn.type == backend._lib.GEN_DNS:
data = _asn1_string_to_bytes(backend, gn.d.dNSName)
data = backend._asn1_string_to_bytes(gn.d.dNSName)
if data.startswith(b"*."):
# This is a wildcard name. We need to remove the leading wildcard,
# IDNA decode, then re-add the wildcard. Wildcard characters should
Expand All @@ -118,7 +80,7 @@ def _decode_general_name(backend, gn):

return x509.DNSName(decoded)
elif gn.type == backend._lib.GEN_URI:
data = _asn1_string_to_ascii(backend, gn.d.uniformResourceIdentifier)
data = backend._asn1_string_to_ascii(gn.d.uniformResourceIdentifier)
parsed = urllib_parse.urlparse(data)
hostname = idna.decode(parsed.hostname)
if parsed.port:
Expand All @@ -142,7 +104,7 @@ def _decode_general_name(backend, gn):
oid = _obj2txt(backend, gn.d.registeredID)
return x509.RegisteredID(x509.ObjectIdentifier(oid))
elif gn.type == backend._lib.GEN_IPADD:
data = _asn1_string_to_bytes(backend, gn.d.iPAddress)
data = backend._asn1_string_to_bytes(gn.d.iPAddress)
data_len = len(data)
if data_len == 8 or data_len == 32:
# This is an IPv4 or IPv6 Network and not a single IP. This
Expand Down Expand Up @@ -173,7 +135,7 @@ def _decode_general_name(backend, gn):
_decode_x509_name(backend, gn.d.directoryName)
)
elif gn.type == backend._lib.GEN_EMAIL:
data = _asn1_string_to_ascii(backend, gn.d.rfc822Name)
data = backend._asn1_string_to_ascii(gn.d.rfc822Name)
name, address = parseaddr(data)
parts = address.split(u"@")
if name or not address:
Expand All @@ -192,7 +154,7 @@ def _decode_general_name(backend, gn):
)
elif gn.type == backend._lib.GEN_OTHERNAME:
type_id = _obj2txt(backend, gn.d.otherName.type_id)
value = _asn1_to_der(backend, gn.d.otherName.value)
value = backend._asn1_to_der(gn.d.otherName.value)
return x509.OtherName(x509.ObjectIdentifier(type_id), value)
else:
# x400Address or ediPartyName
Expand Down Expand Up @@ -294,7 +256,7 @@ def version(self):
def serial(self):
asn1_int = self._backend._lib.X509_get_serialNumber(self._x509)
assert asn1_int != self._backend._ffi.NULL
return _asn1_integer_to_int(self._backend, asn1_int)
return self._backend._asn1_integer_to_int(asn1_int)

def public_key(self):
pkey = self._backend._lib.X509_get_pubkey(self._x509)
Expand All @@ -306,27 +268,12 @@ def public_key(self):
@property
def not_valid_before(self):
asn1_time = self._backend._lib.X509_get_notBefore(self._x509)
return self._parse_asn1_time(asn1_time)
return self._backend._parse_asn1_time(asn1_time)

@property
def not_valid_after(self):
asn1_time = self._backend._lib.X509_get_notAfter(self._x509)
return self._parse_asn1_time(asn1_time)

def _parse_asn1_time(self, asn1_time):
assert asn1_time != self._backend._ffi.NULL
generalized_time = self._backend._lib.ASN1_TIME_to_generalizedtime(
asn1_time, self._backend._ffi.NULL
)
assert generalized_time != self._backend._ffi.NULL
generalized_time = self._backend._ffi.gc(
generalized_time, self._backend._lib.ASN1_GENERALIZEDTIME_free
)
time = _asn1_string_to_ascii(
self._backend,
self._backend._ffi.cast("ASN1_STRING *", generalized_time)
)
return datetime.datetime.strptime(time, "%Y%m%d%H%M%SZ")
return self._backend._parse_asn1_time(asn1_time)

@property
def issuer(self):
Expand Down Expand Up @@ -410,12 +357,10 @@ def _decode_user_notice(backend, un):
notice_reference = None

if un.exptext != backend._ffi.NULL:
explicit_text = _asn1_string_to_utf8(backend, un.exptext)
explicit_text = backend._asn1_string_to_utf8(un.exptext)

if un.noticeref != backend._ffi.NULL:
organization = _asn1_string_to_utf8(
backend, un.noticeref.organization
)
organization = backend._asn1_string_to_utf8(un.noticeref.organization)

num = backend._lib.sk_ASN1_INTEGER_num(
un.noticeref.noticenos
Expand All @@ -425,9 +370,7 @@ def _decode_user_notice(backend, un):
asn1_int = backend._lib.sk_ASN1_INTEGER_value(
un.noticeref.noticenos, i
)
notice_num = _asn1_integer_to_int(
backend, asn1_int
)
notice_num = backend._asn1_integer_to_int(asn1_int)
notice_numbers.append(notice_num)

notice_reference = x509.NoticeReference(
Expand All @@ -449,7 +392,7 @@ def _decode_basic_constraints(backend, bc_st):
if basic_constraints.pathlen == backend._ffi.NULL:
path_length = None
else:
path_length = _asn1_integer_to_int(backend, basic_constraints.pathlen)
path_length = backend._asn1_integer_to_int(basic_constraints.pathlen)

return x509.BasicConstraints(ca, path_length)

Expand Down Expand Up @@ -482,8 +425,8 @@ def _decode_authority_key_identifier(backend, akid):
)

if akid.serial != backend._ffi.NULL:
authority_cert_serial_number = _asn1_integer_to_int(
backend, akid.serial
authority_cert_serial_number = backend._asn1_integer_to_int(
akid.serial
)

return x509.AuthorityKeyIdentifier(
Expand Down Expand Up @@ -690,7 +633,7 @@ def _decode_crl_distribution_points(backend, cdps):
def _decode_inhibit_any_policy(backend, asn1_int):
asn1_int = backend._ffi.cast("ASN1_INTEGER *", asn1_int)
asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free)
skip_certs = _asn1_integer_to_int(backend, asn1_int)
skip_certs = backend._asn1_integer_to_int(asn1_int)
return x509.InhibitAnyPolicy(skip_certs)


Expand Down

0 comments on commit 6272e95

Please sign in to comment.