Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

certificate verification against known fingerprint

  • Loading branch information...
commit 60fedaaadf69cf205c7d18925f32a0af2f8e3edf 1 parent be10e16
@t-8ch t-8ch authored
View
29 test/with_dummyserver/test_https.py
@@ -145,6 +145,35 @@ def test_verify_specific_hostname(self):
https_pool.verify_hostname = 'localhost'
https_pool.request('GET', '/')
+ def test_verify_fingerprint_md5(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_REQUIRED')
+
+ https_pool.ca_certs = DEFAULT_CA
+ https_pool.verify_fingerprint = 'CA:84:E1:AD0E5a:ef:2f:C3:09' \
+ ':E7:30:F8:CD:C8:5B'
+ https_pool.request('GET', '/')
+
+ def test_verify_fingerprint_sha1(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_REQUIRED')
+
+ https_pool.ca_certs = DEFAULT_CA
+ https_pool.verify_fingerprint = 'CC:45:6A:90:82:F7FF:C0:8218:8e:' \
+ '7A:F2:8A:D7:1E:07:33:67:DE'
+ https_pool.request('GET', '/')
+
+ def test_verify_invalid_fingerprint(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_REQUIRED')
+
+ https_pool.ca_certs = DEFAULT_CA
+ https_pool.verify_fingerprint = 'AA:AA:AA:AA:AA:AAAA:AA:AAAA:AA:' \
+ 'AA:AA:AA:AA:AA:AA:AA:AA:AA'
+
+ self.assertRaises(SSLError,
+ https_pool.request, 'GET', '/')
+
if __name__ == '__main__':
unittest.main()
View
24 urllib3/connectionpool.py
@@ -9,7 +9,7 @@
import errno
from socket import error as SocketError, timeout as SocketTimeout
-from .util import resolve_cert_reqs, resolve_ssl_version
+from .util import resolve_cert_reqs, resolve_ssl_version, match_fingerprint
try: # Python 3
from http.client import HTTPConnection, HTTPException
@@ -80,14 +80,15 @@ class VerifiedHTTPSConnection(HTTPSConnection):
ca_certs = None
ssl_version = None
- def set_cert(self, key_file=None, cert_file=None,
- cert_reqs=None, ca_certs=None, verify_hostname=None):
+ def set_cert(self, key_file=None, cert_file=None, cert_reqs=None,
+ ca_certs=None, verify_hostname=None, verify_fingerprint=None):
self.key_file = key_file
self.cert_file = cert_file
self.cert_reqs = cert_reqs
self.ca_certs = ca_certs
self.verify_hostname = verify_hostname
+ self.verify_fingerprint = verify_fingerprint
def connect(self):
# Add certificate verification
@@ -105,8 +106,12 @@ def connect(self):
ssl_version=resolved_ssl_version)
if resolved_cert_reqs != ssl.CERT_NONE:
- match_hostname(self.sock.getpeercert(),
- self.verify_hostname or self.host)
+ if self.verify_fingerprint:
+ match_fingerprint(self.sock.getpeercert(binary_form=True),
+ self.verify_fingerprint)
+ else:
+ match_hostname(self.sock.getpeercert(),
+ self.verify_hostname or self.host)
## Pool objects
@@ -504,7 +509,7 @@ class HTTPSConnectionPool(HTTPConnectionPool):
instead of :class:`httplib.HTTPSConnection`.
The ``key_file``, ``cert_file``, ``cert_reqs``, ``ca_certs``,
- ``ssl_version`` and ``verify_hostname``
+ ``ssl_version`` ``verify_fingerprint`` and ``verify_hostname``
are only used if :mod:`ssl` is available and are fed into
:meth:`urllib3.util.ssl_wrap_socket` to upgrade the connection socket
into an SSL socket.
@@ -516,7 +521,8 @@ def __init__(self, host, port=None,
strict=False, timeout=None, maxsize=1,
block=False, headers=None,
key_file=None, cert_file=None, cert_reqs=None,
- ca_certs=None, ssl_version=None, verify_hostname=None):
+ ca_certs=None, ssl_version=None, verify_hostname=None,
+ verify_fingerprint=None):
HTTPConnectionPool.__init__(self, host, port,
strict, timeout, maxsize,
@@ -527,6 +533,7 @@ def __init__(self, host, port=None,
self.ca_certs = ca_certs
self.ssl_version = ssl_version
self.verify_hostname = verify_hostname
+ self.verify_fingerprint = verify_fingerprint
def _new_conn(self):
"""
@@ -550,7 +557,8 @@ def _new_conn(self):
strict=self.strict)
connection.set_cert(key_file=self.key_file, cert_file=self.cert_file,
cert_reqs=self.cert_reqs, ca_certs=self.ca_certs,
- verify_hostname=self.verify_hostname)
+ verify_hostname=self.verify_hostname,
+ verify_fingerprint=self.verify_fingerprint)
connection.ssl_version = self.ssl_version
View
40 urllib3/util.py
@@ -8,6 +8,8 @@
from base64 import b64encode
from collections import namedtuple
from socket import error as SocketError
+from hashlib import md5, sha1
+from binascii import hexlify, unhexlify
try:
from select import poll, POLLIN
@@ -24,6 +26,7 @@
import ssl
from ssl import wrap_socket, CERT_NONE, SSLError, PROTOCOL_SSLv23
+ from .exceptions import SSLError
from ssl import SSLContext # Modern SSL?
from ssl import HAS_SNI # Has SNI?
except ImportError:
@@ -302,6 +305,43 @@ def resolve_ssl_version(candidate):
return candidate
+
+def match_fingerprint(remote, local):
+ """
+ Compares if both supplied fingerprints match.
+
+ remote -- binary
+ local -- hexstring, can be separated by colons
+ """
+
+ # maps the raw byte length of a digest to its hash function
+ hashfunc_map = {
+ 16: md5,
+ 20: sha1
+ }
+
+ norm_local = local.replace(':', '').lower()
+
+ div, mod = divmod(len(norm_local), 2)
+
+ if mod != 0 or div not in hashfunc_map:
+ raise SSLError('Fingerprint is of invalid length')
+
+ # need encode() here for py32, works on py2 and p33
+ norm_local = unhexlify(norm_local.encode())
+
+ hashfunc = hashfunc_map[len(norm_local)]
+
+ # binary
+ norm_remote = hashfunc(remote).digest()
+
+ if not norm_remote == norm_local:
+ raise SSLError('Fingerprints did not match!\n'
+ 'Supplied: {0}\n'
+ 'Actual : {1}'.format(hexlify(norm_local),
+ hexlify(norm_remote)))
+
+
if SSLContext is not None: # Python 3.2+
def ssl_wrap_socket(sock, keyfile=None, certfile=None, cert_reqs=None,
ca_certs=None, server_hostname=None,
Please sign in to comment.
Something went wrong with that request. Please try again.