Skip to content

Commit

Permalink
Allow accessing a connection's verfied certificate chain (#894)
Browse files Browse the repository at this point in the history
* Allow accessing a connection's verfied certificate chain

Add X509StoreContext.get_verified_chain using X509_STORE_CTX_get1_chain.
Add Connection.get_verified_chain using SSL_get0_verified_chain if
available (ie OpenSSL 1.1+) and X509StoreContext.get_verified_chain
otherwise.
Fixes #740.

* TLSv1_METHOD -> SSLv23_METHOD

* Use X509_up_ref instead of X509_dup

* Add _openssl_assert where appropriate

* SSL_get_peer_cert_chain should not be null

* Reformat with black

* Fix <OpenSSL.crypto.X509 object at 0x7fdbb59e8050> != <OpenSSL.crypto.X509 object at 0x7fdbb59daad0>

* Add Changelog entry

* Remove _add_chain
  • Loading branch information
ShaneHarvey committed Aug 5, 2020
1 parent bb971ae commit 33c5499
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 11 deletions.
6 changes: 3 additions & 3 deletions CHANGELOG.rst
Expand Up @@ -21,14 +21,14 @@ Deprecations:

- Deprecated ``OpenSSL.crypto.loads_pkcs7`` and ``OpenSSL.crypto.loads_pkcs12``.

*none*


Changes:
^^^^^^^^

- Added ``Context.set_keylog_callback`` to log key material.
`#910 <https://github.com/pyca/pyopenssl/pull/910>`_
- Added ``OpenSSL.SSL.Connection.get_verified_chain`` to retrieve the
verified certificate chain of the peer.
`#894 <https://github.com/pyca/pyopenssl/pull/894>`_.


19.1.0 (2019-11-18)
Expand Down
61 changes: 54 additions & 7 deletions src/OpenSSL/SSL.py
Expand Up @@ -28,6 +28,7 @@
X509Name,
X509,
X509Store,
X509StoreContext,
)

__all__ = [
Expand Down Expand Up @@ -2126,6 +2127,22 @@ def get_peer_certificate(self):
return X509._from_raw_x509_ptr(cert)
return None

@staticmethod
def _cert_stack_to_list(cert_stack):
"""
Internal helper to convert a STACK_OF(X509) to a list of X509
instances.
"""
result = []
for i in range(_lib.sk_X509_num(cert_stack)):
cert = _lib.sk_X509_value(cert_stack, i)
_openssl_assert(cert != _ffi.NULL)
res = _lib.X509_up_ref(cert)
_openssl_assert(res >= 1)
pycert = X509._from_raw_x509_ptr(cert)
result.append(pycert)
return result

def get_peer_cert_chain(self):
"""
Retrieve the other side's certificate (if any)
Expand All @@ -2137,13 +2154,43 @@ def get_peer_cert_chain(self):
if cert_stack == _ffi.NULL:
return None

result = []
for i in range(_lib.sk_X509_num(cert_stack)):
# TODO could incref instead of dup here
cert = _lib.X509_dup(_lib.sk_X509_value(cert_stack, i))
pycert = X509._from_raw_x509_ptr(cert)
result.append(pycert)
return result
return self._cert_stack_to_list(cert_stack)

def get_verified_chain(self):
"""
Retrieve the verified certificate chain of the peer including the
peer's end entity certificate. It must be called after a session has
been successfully established. If peer verification was not successful
the chain may be incomplete, invalid, or None.
:return: A list of X509 instances giving the peer's verified
certificate chain, or None if it does not have one.
.. versionadded:: 20.0
"""
if hasattr(_lib, "SSL_get0_verified_chain"):
# OpenSSL 1.1+
cert_stack = _lib.SSL_get0_verified_chain(self._ssl)
if cert_stack == _ffi.NULL:
return None

return self._cert_stack_to_list(cert_stack)

pycert = self.get_peer_certificate()
if pycert is None:
return None

# Should never be NULL because the peer presented a certificate.
cert_stack = _lib.SSL_get_peer_cert_chain(self._ssl)
_openssl_assert(cert_stack != _ffi.NULL)

pystore = self._context.get_cert_store()
if pystore is None:
return None

pystorectx = X509StoreContext(pystore, pycert)
pystorectx._chain = cert_stack
return pystorectx.get_verified_chain()

def want_read(self):
"""
Expand Down
42 changes: 41 additions & 1 deletion src/OpenSSL/crypto.py
Expand Up @@ -1712,6 +1712,7 @@ def __init__(self, store, certificate):
self._store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free)
self._store = store
self._cert = certificate
self._chain = _ffi.NULL
# Make the store context available for use after instantiating this
# class by initializing it now. Per testing, subsequent calls to
# :meth:`_init` have no adverse affect.
Expand All @@ -1725,7 +1726,7 @@ def _init(self):
:meth:`_cleanup` will leak memory.
"""
ret = _lib.X509_STORE_CTX_init(
self._store_ctx, self._store._store, self._cert._x509, _ffi.NULL
self._store_ctx, self._store._store, self._cert._x509, self._chain
)
if ret <= 0:
_raise_current_error()
Expand Down Expand Up @@ -1797,6 +1798,45 @@ def verify_certificate(self):
if ret <= 0:
raise self._exception_from_context()

def get_verified_chain(self):
"""
Verify a certificate in a context and return the complete validated
chain.
:raises X509StoreContextError: If an error occurred when validating a
certificate in the context. Sets ``certificate`` attribute to
indicate which certificate caused the error.
.. versionadded:: 20.0
"""
# Always re-initialize the store context in case
# :meth:`verify_certificate` is called multiple times.
#
# :meth:`_init` is called in :meth:`__init__` so _cleanup is called
# before _init to ensure memory is not leaked.
self._cleanup()
self._init()
ret = _lib.X509_verify_cert(self._store_ctx)
if ret <= 0:
self._cleanup()
raise self._exception_from_context()

# Note: X509_STORE_CTX_get1_chain returns a deep copy of the chain.
cert_stack = _lib.X509_STORE_CTX_get1_chain(self._store_ctx)
_openssl_assert(cert_stack != _ffi.NULL)

result = []
for i in range(_lib.sk_X509_num(cert_stack)):
cert = _lib.sk_X509_value(cert_stack, i)
_openssl_assert(cert != _ffi.NULL)
pycert = X509._from_raw_x509_ptr(cert)
result.append(pycert)

# Free the stack but not the members which are freed by the X509 class.
_lib.sk_X509_free(cert_stack)
self._cleanup()
return result


def load_certificate(type, buffer):
"""
Expand Down
35 changes: 35 additions & 0 deletions tests/test_crypto.py
Expand Up @@ -3849,6 +3849,41 @@ def test_verify_with_time(self):

assert exc.value.args[0][2] == "certificate has expired"

def test_get_verified_chain(self):
"""
`get_verified_chain` returns the verified chain.
"""
store = X509Store()
store.add_cert(self.root_cert)
store.add_cert(self.intermediate_cert)
store_ctx = X509StoreContext(store, self.intermediate_server_cert)
chain = store_ctx.get_verified_chain()
assert len(chain) == 3
intermediate_subject = self.intermediate_server_cert.get_subject()
assert chain[0].get_subject() == intermediate_subject
assert chain[1].get_subject() == self.intermediate_cert.get_subject()
assert chain[2].get_subject() == self.root_cert.get_subject()
# Test reuse
chain = store_ctx.get_verified_chain()
assert len(chain) == 3
assert chain[0].get_subject() == intermediate_subject
assert chain[1].get_subject() == self.intermediate_cert.get_subject()
assert chain[2].get_subject() == self.root_cert.get_subject()

def test_get_verified_chain_invalid_chain_no_root(self):
"""
`get_verified_chain` raises error when cert verification fails.
"""
store = X509Store()
store.add_cert(self.intermediate_cert)
store_ctx = X509StoreContext(store, self.intermediate_server_cert)

with pytest.raises(X509StoreContextError) as exc:
store_ctx.get_verified_chain()

assert exc.value.args[0][2] == "unable to get issuer certificate"
assert exc.value.certificate.get_subject().CN == "intermediate"


class TestSignVerify(object):
"""
Expand Down
57 changes: 57 additions & 0 deletions tests/test_ssl.py
Expand Up @@ -2445,6 +2445,63 @@ def test_get_peer_cert_chain_none(self):
interact_in_memory(client, server)
assert None is server.get_peer_cert_chain()

def test_get_verified_chain(self):
"""
`Connection.get_verified_chain` returns a list of certificates
which the connected server returned for the certification verification.
"""
chain = _create_certificate_chain()
[(cakey, cacert), (ikey, icert), (skey, scert)] = chain

serverContext = Context(SSLv23_METHOD)
serverContext.use_privatekey(skey)
serverContext.use_certificate(scert)
serverContext.add_extra_chain_cert(icert)
serverContext.add_extra_chain_cert(cacert)
server = Connection(serverContext, None)
server.set_accept_state()

# Create the client
clientContext = Context(SSLv23_METHOD)
# cacert is self-signed so the client must trust it for verification
# to succeed.
clientContext.get_cert_store().add_cert(cacert)
clientContext.set_verify(VERIFY_PEER, verify_cb)
client = Connection(clientContext, None)
client.set_connect_state()

interact_in_memory(client, server)

chain = client.get_verified_chain()
assert len(chain) == 3
assert "Server Certificate" == chain[0].get_subject().CN
assert "Intermediate Certificate" == chain[1].get_subject().CN
assert "Authority Certificate" == chain[2].get_subject().CN

def test_get_verified_chain_none(self):
"""
`Connection.get_verified_chain` returns `None` if the peer sends
no certificate chain.
"""
ctx = Context(SSLv23_METHOD)
ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
server = Connection(ctx, None)
server.set_accept_state()
client = Connection(Context(SSLv23_METHOD), None)
client.set_connect_state()
interact_in_memory(client, server)
assert None is server.get_verified_chain()

def test_get_verified_chain_unconnected(self):
"""
`Connection.get_verified_chain` returns `None` when used with an object
which has not been connected.
"""
ctx = Context(SSLv23_METHOD)
server = Connection(ctx, None)
assert None is server.get_verified_chain()

def test_get_session_unconnected(self):
"""
`Connection.get_session` returns `None` when used with an object
Expand Down

0 comments on commit 33c5499

Please sign in to comment.