Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Switch to using requests for external connections.

  • Loading branch information...
commit 8f5f3d9e44fb5044fc73189ecbb89afe83c52302 1 parent 1712462
@Osmose Osmose authored
View
86 browserid/certificates.py
@@ -5,10 +5,11 @@
import collections
import threading
import time
-
from urlparse import urljoin
-from browserid.utils import secure_urlopen
+import requests
+from requests.exceptions import RequestException
+
from browserid.errors import (ConnectionError,
InvalidIssuerError)
@@ -141,71 +142,44 @@ def __len__(self):
return len(self.items_map)
-def fetch_public_key(hostname, well_known_url=None):
+def _get(url):
+ """Fetch resource with requests."""
+ try:
+ return requests.get(url)
+ except RequestException, e:
+ raise ConnectionError(str(e))
+
+
+def fetch_public_key(hostname, well_known_url=WELL_KNOWN_URL):
"""Fetch the BrowserID public key for the given hostname.
This function uses the well-known BrowserID meta-data file to extract
the public key for the given hostname.
"""
- if well_known_url is None:
- well_known_url = WELL_KNOWN_URL
+ hostname = 'https://%s' % hostname
- hostname = "https://" + hostname
# Try to find the public key. If it can't be found then we
# raise an InvalidIssuerError. Any other connection-related
# errors are passed back up to the caller.
- try:
- # Try to read the well-known browserid file to load the key.
+ response = _get(urljoin(hostname, well_known_url))
+ if response.status_code == 200:
try:
- browserid_url = urljoin(hostname, well_known_url)
- browserid_data = urlread(browserid_url)
- except ConnectionError, e:
- if "404" not in str(e):
- raise
- # The well-known file was not found, try falling back to
- # just "/pk". Not really a good idea, but that's currently
- # the only way to get browserid.org's public key.
- pubkey_url = urljoin(hostname, "/pk")
- key = urlread(urljoin(hostname, pubkey_url))
+ key = json.loads(response.text)['public-key']
+ except (ValueError, KeyError):
+ raise InvalidIssuerError('Host %r has malformed public key '
+ 'document' % hostname)
+ else:
+ # The well-known file was not found, try falling back to
+ # just "/pk".
+ response = _get(urljoin(hostname, '/pk'))
+ if response.status_code == 200:
try:
- key = json.loads(key)
+ key = json.loads(response.text)
except ValueError:
- msg = "Host %r has malformed public key document"
- raise InvalidIssuerError(msg % (hostname,))
+ raise InvalidIssuerError('Host %r has malformed BrowserID '
+ 'metadata document' % hostname)
else:
- # The well-known file was found, it must contain the key
- # data as part of its JSON response.
- try:
- key = json.loads(browserid_data)["public-key"]
- except (ValueError, KeyError):
- msg = "Host %r has malformed BrowserID metadata document"
- raise InvalidIssuerError(msg % (hostname,))
- return key
- except ConnectionError, e:
- if "404" not in str(e):
- raise
- msg = "Host %r does not declare support for BrowserID" % (hostname,)
- raise InvalidIssuerError(msg)
-
+ raise InvalidIssuerError('Host %r does not declare support for '
+ 'BrowserID' % hostname)
-def urlread(url, data=None):
- """Read the given URL, return response as a string."""
- # Anything that goes wrong inside this function will
- # be re-raised as an instance of ConnectionError.
- try:
- resp = secure_urlopen(url, data)
- try:
- info = resp.info()
- except AttributeError:
- info = {}
- content_length = info.get("Content-Length")
- if content_length is None:
- data = resp.read()
- else:
- try:
- data = resp.read(int(content_length))
- except ValueError:
- raise ConnectionError("server sent invalid content-length")
- except Exception, e:
- raise ConnectionError(str(e))
- return data
+ return key
View
81 browserid/tests/test_certificates.py
@@ -0,0 +1,81 @@
+import json
+import unittest
+
+from mock import Mock, patch
+from requests.exceptions import RequestException
+
+from browserid.certificates import fetch_public_key
+from browserid.errors import ConnectionError, InvalidIssuerError
+
+
+# Retrieved from browserid.org on April 3rd 2012
+BROWSERID_PK = (
+ '{"public-key":{"algorithm":"RS","n":"175097498616944944948724600'
+ '5376783505040424585022287992333285107040747762043794156382037076'
+ '4990477086508806099113661589535215545809212436371869690217935480'
+ '2753014948839838403516326408223252892569346143048785710656475684'
+ '9535475273645907806528415369926171343773128172627893352378261396'
+ '0153494025694829910802495907763077221584500090734186210456302804'
+ '6884323084778492418923884368673543934239778647619964884232166051'
+ '7909653959911229288600229842193433562918970249484466937121698566'
+ '1583323059605724956419024024496484812121544425787678170853739436'
+ '5238417167558546493512404073066199364247440288962324288605736789'
+ '20055912798079","e":"65537"}}')
+BROWSERID_PK_PY = json.loads(BROWSERID_PK)
+
+
+class TestFetchPublicKey(unittest.TestCase):
+ @patch('browserid.certificates.requests')
+ def _fetch(self, hostname, requests, well_known_url=None,
+ side_effect=None, response_text='', status_code=200):
+ response = Mock()
+ response.text = response_text
+ response.status_code = status_code
+ requests.get.side_effect = side_effect
+ requests.get.return_value = response
+
+ kwargs = {}
+ if well_known_url is not None:
+ kwargs['well_known_url'] = well_known_url
+
+ return fetch_public_key(hostname, **kwargs)
+
+ def test_connection_error(self):
+ """If there is an error connecting, raise a ConnectionError."""
+ with self.assertRaises(ConnectionError):
+ self._fetch('test.com', side_effect=RequestException)
+
+ @patch('browserid.certificates.fetch_public_key')
+ def test_missing_well_known_document(self, fetch):
+ with self.assertRaises(InvalidIssuerError):
+ self._fetch('test.com', status_code=404)
+
+ def test_malformed_well_known_document(self):
+ response_text = 'I AINT NO JSON, FOOL!'
+ with self.assertRaises(InvalidIssuerError):
+ self._fetch('test.com', response_text=response_text)
+
+ def test_malformed_pub_key_document(self):
+ # We need the first request to raise a 404, so we replace
+ # post with a custom function here.
+ def post(url, data):
+ response = Mock()
+ if not post.called:
+ response.status_code = 404
+ post.called = True
+ response.text = 'I AINT NO JSON, FOOL!'
+ return response
+ post.called = False
+
+ with patch('browserid.certificates.requests') as requests:
+ requests.post = post
+ with self.assertRaises(InvalidIssuerError):
+ fetch_public_key('test.com')
+
+ def test_well_known_doc_with_no_public_key(self):
+ with self.assertRaises(InvalidIssuerError):
+ self._fetch('test.com', response_text='{}')
+
+ def test_successful_fetch(self):
+ key = self._fetch('test.com', response_text=BROWSERID_PK)
+ self.assertEquals(key, BROWSERID_PK_PY['public-key'])
View
32 browserid/tests/test_utils.py
@@ -76,38 +76,6 @@ def shutdown(self):
class TestUtils(unittest.TestCase):
- def test_secure_urlopen(self):
- server = TestingServer()
- server.start()
- try:
- kwds = {"timeout": 1}
- # We don't trust the server's certificate, so this fails
- # if we're doing strong validation.
- try:
- with warnings.catch_warnings(record=True) as w:
- warnings.simplefilter("always")
- secure_urlopen(server.base_url, **kwds)
- except ConnectionError:
- # This means we have a system ca_certs file.
- # The request is unverified and should therefore fail.
- pass
- else:
- # This means we have no system ca_certs file.
- # We issue a warning and forego verification.
- self.assertEquals(len(w), 1) # pragma: nocover
- # The certificate doesn't belong to localhost, so this fails.
- kwds["ca_certs"] = server.certfile
- self.assertRaises(ConnectionError,
- secure_urlopen, server.base_url, **kwds)
- # Set a valid cert for local host, trust it, we succeed.
- server.certfile = _filepath("certs/localhost.crt")
- server.keyfile = _filepath("certs/localhost.key")
- kwds["ca_certs"] = server.certfile
- self.assertEquals(secure_urlopen(server.base_url, **kwds).read(),
- "OK")
- finally:
- server.shutdown()
-
def test_encode_decode_bytes(self):
self.assertEquals("HELLO", decode_bytes(encode_bytes("HELLO")))
self.assertEquals("HELLO", decode_bytes(encode_bytes(u"HELLO")))
View
187 browserid/tests/test_verifiers.py
@@ -3,13 +3,13 @@
# You can obtain one at http://mozilla.org/MPL/2.0/.
import time
-import json
import unittest
import warnings
+from mock import Mock, patch
+
import browserid
-from browserid.tests.support import (patched_urlopen,
- patched_key_fetching,
+from browserid.tests.support import (patched_key_fetching,
get_keypair,
fetch_public_key,
make_assertion)
@@ -22,8 +22,8 @@
ConnectionError,
ExpiredSignatureError,
InvalidSignatureError,
- InvalidIssuerError,
AudienceMismatchError)
+from browserid.tests.test_certificates import BROWSERID_PK_PY
# This is an old assertion I generated on myfavoritebeer.org.
# It's expired and signed with an old private key.
@@ -121,90 +121,6 @@ def setUp(self):
# There should be a warning about using this verifier.
self.assertEquals(w[0].category, FutureWarning)
- def test_error_while_fetching_public_key(self):
- with patched_urlopen(exc=RuntimeError("TESTING")):
- self.assertRaises(ConnectionError,
- self.verifier.verify, EXPIRED_ASSERTION, now=0)
-
- def test_missing_well_known_document(self):
- with patched_urlopen(exc=RuntimeError("404 Not Found")):
- self.assertRaises(InvalidIssuerError,
- self.verifier.verify, EXPIRED_ASSERTION, now=0)
-
- def test_malformed_well_known_document(self):
- # patch urlopen
- def urlopen(url, data):
- class response(object):
- @staticmethod
- def read():
- return "I AINT NO JSON, FOOL!"
- return response
-
- with patched_urlopen(urlopen):
- self.assertRaises(InvalidIssuerError,
- self.verifier.verify, EXPIRED_ASSERTION, now=0)
-
- def test_malformed_pub_key_document(self):
- called = []
-
- def urlopen(url, data):
- # First call must raise 404 so it will look for /pk.
- # Second call must return invalid JSON.
- class response(object):
- @staticmethod
- def read():
- if not called:
- called.append(True)
- raise ValueError("404 Not Found")
- return "I AINT NO JSON, FOOL!"
- return response
-
- with patched_urlopen(urlopen):
- self.assertRaises(InvalidIssuerError,
- self.verifier.verify, EXPIRED_ASSERTION, now=0)
-
- def test_well_known_doc_with_no_public_key(self):
- def urlopen(url, data):
- class response(object):
- @staticmethod
- def read():
- return "{}"
- return response
-
- with patched_urlopen(urlopen):
- self.assertRaises(InvalidIssuerError,
- self.verifier.verify, EXPIRED_ASSERTION, now=0)
-
- def test_well_known_doc_with_public_key(self):
- # The browserid.org server doesn't currently have /.well-known/browserid.
- # This simulates it with a dummy key.
- def urlopen(url, data): # NOQA
- class response(object):
- @staticmethod
- def read():
- key = fetch_public_key("browserid.org")
- return json.dumps({"public-key": key})
- return response
-
- with patched_urlopen(urlopen):
- assertion = make_assertion("t@m.com", "http://e.com")
- self.assertTrue(self.verifier.verify(assertion))
-
- def test_handling_of_invalid_content_length_header_from_server(self):
- def urlopen(url, data):
- class response(object):
- @staticmethod
- def info():
- return {"Content-Length": "forty-two"}
- @staticmethod # NOQA
- def read(size):
- raise RuntimeError # pragma: nocover
- return response
-
- with patched_urlopen(urlopen):
- self.assertRaises(ConnectionError, self.verifier.verify,
- EXPIRED_ASSERTION, now=0)
-
def test_error_handling_in_verify_certificate_chain(self):
self.assertRaises(ValueError,
self.verifier.verify_certificate_chain, [])
@@ -213,76 +129,53 @@ def test_error_handling_in_verify_certificate_chain(self):
self.assertRaises(ExpiredSignatureError,
self.verifier.verify_certificate_chain, certs)
+ @patch('browserid.certificates.fetch_public_key')
+ def test_well_known_doc_with_public_key(self, fetch_public_key):
+ fetch_public_key.return_value = BROWSERID_PK_PY['public-key']
+ assertion = make_assertion("t@m.com", "http://e.com")
+ self.assertTrue(self.verifier.verify(assertion))
+
class TestRemoteVerifier(unittest.TestCase, VerifierTestCases):
def setUp(self):
self.verifier = RemoteVerifier(["*"])
+ @patch('browserid.verifiers.remote.requests')
+ def _verify(self, requests, response_text='', assertion=EXPIRED_ASSERTION,
+ status_code=200):
+ response = Mock()
+ response.text = response_text
+ response.status_code = status_code
+ requests.post.return_value = response
+
+ return self.verifier.verify(assertion)
+
def test_handling_of_valid_response_from_server(self):
- def urlopen(url, data):
- class response(object):
- @staticmethod
- def read():
- return '{"email": "t@m.com", '\
- ' "status": "okay", '\
- ' "audience": "http://myfavoritebeer.org"}'
- return response
-
- with patched_urlopen(urlopen):
- data = self.verifier.verify(EXPIRED_ASSERTION)
- self.assertEquals(data["email"], "t@m.com")
-
- def test_handling_of_invalid_content_length_header_from_server(self):
- def urlopen(url, data):
- class response(object):
- @staticmethod
- def info():
- return {"Content-Length": "forty-two"}
- @staticmethod # NOQA
- def read(size):
- raise RuntimeError # pragma: nocover
- return response
-
- with patched_urlopen(urlopen):
- self.assertRaises(ConnectionError,
- self.verifier.verify, EXPIRED_ASSERTION)
+ response_text = ('{"email": "t@m.com", "status": "okay", '
+ '"audience": "http://myfavoritebeer.org"}')
+ data = self._verify(response_text=response_text)
+ self.assertEquals(data["email"], "t@m.com")
def test_handling_of_invalid_json_from_server(self):
- def urlopen(url, data):
- class response(object):
- @staticmethod
- def read():
- return "SERVER RETURNS SOMETHING THAT ISNT JSON"
- return response
-
- with patched_urlopen(urlopen):
- self.assertRaises(ConnectionError,
- self.verifier.verify, EXPIRED_ASSERTION)
-
- def test_handling_of_incorrect_audience_returned_by_server(self):
- def urlopen(url, data):
- class response(object):
- @staticmethod
- def read():
- return '{"email": "t@m.com", '\
- ' "status": "okay", '\
- '"audience": "WRONG"}'
- return response
-
- with patched_urlopen(urlopen):
- self.assertRaises(AudienceMismatchError,
- self.verifier.verify, EXPIRED_ASSERTION)
-
- def test_handling_of_500_error_from_server(self):
- with patched_urlopen(exc=ConnectionError("500 Server Error")):
- self.assertRaises(ValueError,
- self.verifier.verify, EXPIRED_ASSERTION)
+ with self.assertRaises(ConnectionError):
+ self._verify(response_text='SERVER RETURNS SOMETHING THAT ISNT JSON')
+
+ @patch('browserid.verifiers.remote.requests')
+ def test_handling_of_incorrect_audience_returned_by_server(self, requests):
+ response_text = ('{"email": "t@m.com", "status": "okay", '
+ '"audience": "WRONG"}')
+ with self.assertRaises(AudienceMismatchError):
+ self._verify(response_text=response_text)
+
+ @patch('browserid.verifiers.remote.requests')
+ def test_handling_of_500_error_from_server(self, requests):
+ with self.assertRaises(ValueError):
+ self._verify(status_code=500)
def test_handling_of_503_error_from_server(self):
- with patched_urlopen(exc=ConnectionError("503 Back Off")):
- self.assertRaises(ConnectionError, self.verifier.verify,
- EXPIRED_ASSERTION)
+ with self.assertRaises(ConnectionError):
+ self._verify(status_code=503)
class TestDummyVerifier(unittest.TestCase, VerifierTestCases):
View
3  browserid/verifiers/local.py
@@ -10,8 +10,7 @@
from browserid.certificates import CertificatesManager
from browserid.utils import unbundle_certs_and_assertion
from browserid.errors import (InvalidSignatureError,
- ExpiredSignatureError,
- AudienceMismatchError)
+ ExpiredSignatureError)
DEFAULT_TRUSTED_SECONDARIES = ("browserid.org", "diresworb.org",
View
49 browserid/verifiers/remote.py
@@ -4,10 +4,10 @@
import json
+import requests
+from requests.exceptions import RequestException
+
from browserid.verifiers import Verifier
-from browserid.utils import (secure_urlopen,
- decode_json_bytes,
- unbundle_certs_and_assertion)
from browserid.errors import (InvalidSignatureError,
ConnectionError,
AudienceMismatchError)
@@ -15,6 +15,14 @@
BROWSERID_VERIFIER_URL = "https://browserid.org/verify"
+def _post(url, params):
+ """Send HTTP POST with requests."""
+ try:
+ return requests.post(url, params)
+ except RequestException, e:
+ raise ConnectionError(str(e))
+
+
class RemoteVerifier(Verifier):
"""Class for remote verification of BrowserID identity assertions.
@@ -48,33 +56,20 @@ def verify(self, assertion, audience=None):
# If no explicit audience was given, this will also parse it out
# for inclusion in the request to the remote verifier service.
audience = self.check_audience(assertion, audience)
- # Encode the data into x-www-form-urlencoded.
- post_data = {"assertion": assertion, "audience": audience}
- post_data = "&".join("%s=%s" % item for item in post_data.items())
- # Post it to the verifier.
- try:
- resp = secure_urlopen(self.verifier_url, post_data)
- except ConnectionError, e:
- # BrowserID server sends "500 server error" for broken assertions.
- # For now, just translate that directly. Should check by hand.
- if "500" in str(e):
- raise ValueError("Malformed assertion")
- raise
- # Read the response, being careful to raise an appropriate
- # error if the server does something funny.
+
+ response = _post(self.verifier_url, {'assertion': assertion,
+ 'audience': audience})
+
+ # BrowserID server sends "500 server error" for broken assertions.
+ # For now, just translate that directly. Should check by hand.
+ if response.status_code == 500:
+ raise ValueError('Malformed assertion')
+
try:
- try:
- info = resp.info()
- except AttributeError:
- info = {}
- content_length = info.get("Content-Length")
- if content_length is None:
- data = resp.read()
- else:
- data = resp.read(int(content_length))
- data = json.loads(data)
+ data = json.loads(response.text)
except ValueError:
raise ConnectionError("server returned invalid response")
+
# Did it come back clean?
if data.get('status') != "okay":
raise InvalidSignatureError(str(data))
View
3  requirements/dev.txt
@@ -0,0 +1,3 @@
+-r prod.txt
+
+mock
View
1  requirements/prod.txt
@@ -0,0 +1 @@
+requests
View
2  setup.py
@@ -10,7 +10,7 @@
with open(os.path.join(here, 'CHANGES.txt')) as f:
CHANGES = f.read()
-requires = ['M2Crypto']
+requires = ['M2Crypto', 'requests']
setup(name='PyBrowserID',
version='0.4.0',
Please sign in to comment.
Something went wrong with that request. Please try again.