Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PyOpenSSL for SNI-support on Python2 #156

Merged
merged 10 commits into from Mar 20, 2013
3 changes: 3 additions & 0 deletions CONTRIBUTORS.txt
Expand Up @@ -48,5 +48,8 @@ In chronological order:
* hartator <hartator@gmail.com>
* Corrected multipart behavior for params

* Sune Kirkeby <mig@ibofobi.dk>
* Optional SNI-support for Python 2 via PyOpenSSL.

* [Your name or handle] <[email or website]>
* [Brief summary of your changes]
10 changes: 10 additions & 0 deletions docs/contrib.rst
@@ -0,0 +1,10 @@
Contrib Modules
===============

These modules implement various extra features, that may not be ready for
prime time.

SNI-support for Python 2
------------------------

.. automodule:: urllib3.contrib.pyopenssl
10 changes: 10 additions & 0 deletions docs/index.rst
Expand Up @@ -9,6 +9,7 @@ urllib3 Documentation
managers
helpers
collections
contrib


Highlights
Expand Down Expand Up @@ -144,6 +145,15 @@ but can also be used independently.

helpers

Contrib Modules
---------------

These modules implement various extra features, that may not be ready for
prime time.

.. toctree::

contrib

Contributing
============
Expand Down
Empty file added test/contrib/__init__.py
Empty file.
17 changes: 17 additions & 0 deletions test/contrib/test_pyopenssl.py
@@ -0,0 +1,17 @@
try:
from urllib3.contrib.pyopenssl import (inject_into_urllib3,
extract_from_urllib3)
except ImportError as e:
from nose.plugins.skip import SkipTest
raise SkipTest('Could not import pyopenssl: %r' % e)

from ..with_dummyserver.test_https import TestHTTPS, TestHTTPS_TLSv1
from ..with_dummyserver.test_socketlevel import TestSNI, TestSocketClosing


def setup_module():
inject_into_urllib3()


def teardown_module():
extract_from_urllib3()
47 changes: 23 additions & 24 deletions test/with_dummyserver/test_socketlevel.py
@@ -1,16 +1,13 @@
from urllib3 import HTTPConnectionPool, HTTPSConnectionPool
from urllib3.poolmanager import proxy_from_url
from urllib3.exceptions import MaxRetryError, TimeoutError, SSLError
from urllib3 import util

from dummyserver.testcase import SocketDummyServerTestCase

from nose.plugins.skip import SkipTest
from threading import Event

try:
from ssl import HAS_SNI
except ImportError: # openssl without SNI
HAS_SNI = False


class TestCookies(SocketDummyServerTestCase):

Expand All @@ -34,28 +31,30 @@ def multicookie_response_handler(listener):
self.assertEquals(r.headers, {'set-cookie': 'foo=1, bar=1'})


if HAS_SNI:
class TestSNI(SocketDummyServerTestCase):
class TestSNI(SocketDummyServerTestCase):

def test_hostname_in_first_request_packet(self):
done_receiving = Event()
self.buf = b''
def test_hostname_in_first_request_packet(self):
if not util.HAS_SNI:
raise SkipTest('SNI-support not available')

def socket_handler(listener):
sock = listener.accept()[0]
done_receiving = Event()
self.buf = b''

self.buf = sock.recv(65536) # We only accept one packet
done_receiving.set() # let the test know it can proceed

self._start_server(socket_handler)
pool = HTTPSConnectionPool(self.host, self.port)
try:
pool.request('GET', '/', retries=0)
except SSLError: # We are violating the protocol
pass
done_receiving.wait()
self.assertTrue(self.host.encode() in self.buf,
"missing hostname in SSL handshake")
def socket_handler(listener):
sock = listener.accept()[0]

self.buf = sock.recv(65536) # We only accept one packet
done_receiving.set() # let the test know it can proceed

self._start_server(socket_handler)
pool = HTTPSConnectionPool(self.host, self.port)
try:
pool.request('GET', '/', retries=0)
except SSLError: # We are violating the protocol
pass
done_receiving.wait()
self.assertTrue(self.host.encode() in self.buf,
"missing hostname in SSL handshake")


class TestSocketClosing(SocketDummyServerTestCase):
Expand Down
161 changes: 161 additions & 0 deletions urllib3/contrib/pyopenssl.py
@@ -0,0 +1,161 @@
'''SSL with SNI-support for Python 2.

This needs the following packages installed:

* pyOpenSSL (tested with 0.13)
* ndg-httpsclient (tested with 0.3.2)
* pyasn1 (tested with 0.1.6)

To activate it call :func:`~urllib3.contrib.pyopenssl.inject_into_urllib3`.
This can be done in a ``sitecustomize`` module, or at any other time before
your application begins using ``urllib3``, like this::

try:
import urllib3.contrib.pyopenssl
urllib3.contrib.pyopenssl.inject_into_urllib3()
except ImportError:
pass

Now you can use :mod:`urllib3` as you normally would, and it will support SNI
when the required modules are installed.
'''

from ndg.httpsclient.ssl_peer_verification import (ServerSSLCertVerification,
SUBJ_ALT_NAME_SUPPORT)
from ndg.httpsclient.subj_alt_name import SubjectAltName
import OpenSSL.SSL
from pyasn1.codec.der import decoder as der_decoder
from socket import _fileobject
import ssl

from .. import connectionpool
from .. import util

__all__ = ['inject_into_urllib3', 'extract_from_urllib3']

# SNI only *really* works if we can read the subjectAltName of certificates.
HAS_SNI = SUBJ_ALT_NAME_SUPPORT

# Map from urllib3 to PyOpenSSL compatible parameter-values.
_openssl_versions = {
ssl.PROTOCOL_SSLv23: OpenSSL.SSL.SSLv23_METHOD,
ssl.PROTOCOL_SSLv3: OpenSSL.SSL.SSLv3_METHOD,
ssl.PROTOCOL_TLSv1: OpenSSL.SSL.TLSv1_METHOD,
}
_openssl_verify = {
ssl.CERT_NONE: OpenSSL.SSL.VERIFY_NONE,
ssl.CERT_OPTIONAL: OpenSSL.SSL.VERIFY_PEER,
ssl.CERT_REQUIRED: OpenSSL.SSL.VERIFY_PEER
+ OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
}


orig_util_HAS_SNI = util.HAS_SNI
orig_connectionpool_ssl_wrap_socket = connectionpool.ssl_wrap_socket


def inject_into_urllib3():
'Monkey-patch urllib3 with PyOpenSSL-backed SSL-support.'

connectionpool.ssl_wrap_socket = ssl_wrap_socket
util.HAS_SNI = HAS_SNI


def extract_from_urllib3():
'Undo monkey-patching by :func:`inject_into_urllib3`.'

connectionpool.ssl_wrap_socket = orig_connectionpool_ssl_wrap_socket
util.HAS_SNI = orig_util_HAS_SNI


### Note: This is a slightly bug-fixed version of same from ndg-httpsclient.
def get_subj_alt_name(peer_cert):
# Search through extensions
dns_name = []
if not SUBJ_ALT_NAME_SUPPORT:
return dns_name

general_names = SubjectAltName()
for i in range(peer_cert.get_extension_count()):
ext = peer_cert.get_extension(i)
ext_name = ext.get_short_name()
if ext_name != 'subjectAltName':
continue

# PyOpenSSL returns extension data in ASN.1 encoded form
ext_dat = ext.get_data()
decoded_dat = der_decoder.decode(ext_dat,
asn1Spec=general_names)

for name in decoded_dat:
if not isinstance(name, SubjectAltName):
continue
for entry in range(len(name)):
component = name.getComponentByPosition(entry)
if component.getName() != 'dNSName':
continue
dns_name.append(str(component.getComponent()))

return dns_name


class WrappedSocket(object):
'''API-compatibility wrapper for Python OpenSSL's Connection-class.'''

def __init__(self, connection, socket):
self.connection = connection
self.socket = socket

def makefile(self, mode, bufsize=-1):
return _fileobject(self.connection, mode, bufsize)

def settimeout(self, timeout):
return self.socket.settimeout(timeout)

def sendall(self, data):
return self.connection.sendall(data)

def getpeercert(self):
x509 = self.connection.get_peer_certificate()
if not x509:
raise ssl.SSLError('')
return {
'subject': (
(('commonName', x509.get_subject().CN),),
),
'subjectAltName': [
('DNS', value)
for value in get_subj_alt_name(x509)
]
}


def _verify_callback(cnx, x509, err_no, err_depth, return_code):
return err_no == 0


def ssl_wrap_socket(sock, keyfile=None, certfile=None, cert_reqs=None,
ca_certs=None, server_hostname=None,
ssl_version=None):
ctx = OpenSSL.SSL.Context(_openssl_versions[ssl_version])
if certfile:
ctx.use_certificate_file(certfile)
if keyfile:
ctx.use_privatekey_file(keyfile)
if cert_reqs != ssl.CERT_NONE:
ctx.set_verify(_openssl_verify[cert_reqs], _verify_callback)
if ca_certs:
try:
ctx.load_verify_locations(ca_certs, None)
except OpenSSL.SSL.Error as e:
raise ssl.SSLError('bad ca_certs: %r' % ca_certs, e)

cnx = OpenSSL.SSL.Connection(ctx, sock)
cnx.set_tlsext_host_name(server_hostname)
cnx.set_connect_state()
try:
cnx.do_handshake()
except OpenSSL.SSL.Error as e:
raise ssl.SSLError('bad handshake', e)

return WrappedSocket(cnx, sock)