Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Turn the certificate manager into a manager for well-known files

  • Loading branch information...
commit d7a6414bb00d726f58e34301a43fa8666a3d262a 1 parent badbf31
Kyle Fuller kylef authored rfk committed
20 browserid/tests/support.py
View
@@ -8,7 +8,7 @@
from browserid.utils import encode_bytes, bundle_certs_and_assertion
-from browserid import certificates
+from browserid import wellknown
from browserid import jwt
# if unittest2 isn't available, assume that we are python 2.7
@@ -50,14 +50,14 @@ def _hex(value):
return value
-def fetch_public_key(hostname, verify=None):
- """Fetch the BrowserID public key for the given hostname.
+def fetch_wellknown_file(hostname, verify=None):
+ """Fetch the BrowserID support file for the given hostname.
- Actually, this implementation generates the key locally based on
+ Actually, this implementation generates a key locally based on
a hash of the hostname. This lets us exercise all the crypto code
while using predictable local values.
"""
- return get_keypair(hostname)[0]
+ return {"public-key": get_keypair(hostname)[0]}
def get_keypair(hostname):
@@ -146,7 +146,7 @@ def make_assertion(email, audience, issuer=None, exp=None,
@contextmanager
-def patched_key_fetching(replacement=None, exc=None):
+def patched_wellknown_fetching(replacement=None, exc=None):
"""Patch the key fetching mechanism with the given callable.
This is to allow easier testing.
@@ -157,8 +157,8 @@ def raise_exception(*args, **kwargs):
if exc is not None:
replacement = raise_exception
if replacement is None:
- replacement = fetch_public_key
- old_callable = certificates.fetch_public_key
- certificates.fetch_public_key = replacement
+ replacement = fetch_wellknown_file
+ old_callable = wellknown.fetch_wellknown_file
+ wellknown.fetch_wellknown_file = replacement
yield
- certificates.fetch_public_key = old_callable
+ wellknown.fetch_wellknown_file = old_callable
45 browserid/tests/test_verifiers.py
View
@@ -8,13 +8,13 @@
from mock import Mock, patch
import browserid
-from browserid.tests.support import (patched_key_fetching,
+from browserid.tests.support import (patched_wellknown_fetching,
get_keypair,
- fetch_public_key,
+ fetch_wellknown_file,
make_assertion, unittest)
from browserid import jwt
from browserid import RemoteVerifier, LocalVerifier
-from browserid.certificates import FIFOCache, CertificatesManager
+from browserid.wellknown import FIFOCache, WellKnownManager
from browserid.verifiers.workerpool import WorkerPoolVerifier
from browserid.utils import (encode_json_bytes,
decode_json_bytes,
@@ -135,12 +135,12 @@ def test_error_handling_in_verify_certificate_chain(self):
self.assertRaises(ExpiredSignatureError,
self.verifier.verify_certificate_chain, certs)
- @patch('browserid.certificates.fetch_public_key', fetch_public_key)
+ @patch('browserid.wellknown.fetch_wellknown_file', fetch_wellknown_file)
def test_well_known_doc_with_public_key(self):
assertion = make_assertion("t@m.com", "http://e.com")
self.assertTrue(self.verifier.verify(assertion))
- @patch('browserid.certificates.fetch_public_key', fetch_public_key)
+ @patch('browserid.wellknown.fetch_wellknown_file', fetch_wellknown_file)
def test_audience_verification(self):
# create an assertion with the audience set to http://persona.org for
@@ -231,7 +231,7 @@ def test_handling_of_503_error_from_server(self):
class TestDummyVerifier(unittest.TestCase, VerifierTestCases):
def setUp(self):
- self.patched = patched_key_fetching()
+ self.patched = patched_wellknown_fetching()
self.patched.__enter__()
self.verifier = LocalVerifier(["*"], warning=False)
@@ -261,7 +261,7 @@ def test_verification_of_untrusted_issuer(self):
# Assertions for @moz.com addresses can come from moz.com
assertion = make_assertion("test@moz.com", audience, issuer=issuer)
self.assertTrue(self.verifier.verify(assertion, audience))
- # But assertions for other addresses cannot.
+ # But assertions for other addresses cannot (unless they delegated).
assertion = make_assertion("test@example.com", audience,
issuer=issuer)
self.assertRaises(InvalidSignatureError, self.verifier.verify,
@@ -290,15 +290,16 @@ def test_verification_of_dummy_assertion_with_bad_certificate_sig(self):
assertion)
def test_cache_eviction_based_on_time(self):
- certs = CertificatesManager(FIFOCache(cache_timeout=0.1))
- verifier = LocalVerifier(["*"], certs=certs, warning=False)
+ wellknown_manager = WellKnownManager(FIFOCache(cache_timeout=0.1))
+ verifier = LocalVerifier(["*"], wellknown_manager=wellknown_manager,
+ warning=False)
# Prime the cache by verifying an assertion.
assertion = make_assertion("test@example.com", "")
self.assertTrue(verifier.verify(assertion))
# Make it error out if re-fetching the keys
- with patched_key_fetching(exc=RuntimeError("key fetch disabled")):
- verifier.fetch_public_key = fetch_public_key
+ with patched_wellknown_fetching(exc=RuntimeError("key fetch disabled")):
+ verifier.fetch_wellknown_file = fetch_wellknown_file
# It should be in the cache, so this works fine.
verifier.verify(assertion)
# But after sleeping it gets evicted and the error is triggered.
@@ -306,41 +307,43 @@ def test_cache_eviction_based_on_time(self):
self.assertRaises(RuntimeError, verifier.verify, assertion)
def test_cache_eviction_based_on_size(self):
- certs = CertificatesManager(max_size=2)
- verifier = LocalVerifier(["*"], certs=certs, warning=False)
+ wellknown_manager = WellKnownManager(max_size=2)
+ verifier = LocalVerifier(["*"], wellknown_manager=wellknown_manager,
+ warning=False)
# Prime the cache by verifying some assertions.
assertion1 = make_assertion("test@1.com", "", "1.com")
self.assertTrue(verifier.verify(assertion1))
assertion2 = make_assertion("test@2.com", "", "2.com")
self.assertTrue(verifier.verify(assertion2))
- self.assertEquals(len(certs.cache), 2)
+ self.assertEquals(len(wellknown_manager.cache), 2)
# Hitting a third host should evict the first public key.
assertion3 = make_assertion("test@3.com", "", "3.com")
self.assertTrue(verifier.verify(assertion3))
- self.assertEquals(len(certs.cache), 2)
+ self.assertEquals(len(wellknown_manager.cache), 2)
# Make it error out if re-fetching any keys
- with patched_key_fetching(exc=RuntimeError("key fetch disabled")):
+ with patched_wellknown_fetching(exc=RuntimeError("key fetch disabled")):
# It should have to re-fetch for 1, but not 2.
self.assertTrue(verifier.verify(assertion2))
self.assertRaises(RuntimeError, verifier.verify, assertion1)
def test_cache_eviction_during_write(self):
- certs = CertificatesManager(cache_timeout=0.1)
- verifier = LocalVerifier(["*"], certs=certs, warning=False)
+ wellknown_manager = WellKnownManager(cache_timeout=0.1)
+ verifier = LocalVerifier(["*"], wellknown_manager=wellknown_manager,
+ warning=False)
# Prime the cache by verifying an assertion.
assertion1 = make_assertion("test@1.com", "", "1.com")
self.assertTrue(verifier.verify(assertion1))
- self.assertEquals(len(certs.cache), 1)
+ self.assertEquals(len(wellknown_manager.cache), 1)
# Let that cached key expire
time.sleep(0.1)
# Now grab a different key; caching it should purge the expired key.
assertion2 = make_assertion("test@2.com", "", "2.com")
self.assertTrue(verifier.verify(assertion2))
- self.assertEquals(len(certs.cache), 1)
+ self.assertEquals(len(wellknown_manager.cache), 1)
# Check that only the second entry is in cache.
- with patched_key_fetching(exc=RuntimeError("key fetch disabled")):
+ with patched_wellknown_fetching(exc=RuntimeError("key fetch disabled")):
self.assertTrue(verifier.verify(assertion2))
self.assertRaises(RuntimeError, verifier.verify, assertion1)
20 browserid/tests/test_certificates.py → browserid/tests/test_wellknown.py
View
@@ -3,7 +3,7 @@
from mock import Mock, patch
from requests.exceptions import RequestException
-from browserid.certificates import fetch_public_key
+from browserid.wellknown import fetch_wellknown_file
from browserid.errors import ConnectionError, InvalidIssuerError
from browserid.tests.support import unittest
@@ -25,7 +25,7 @@
class TestFetchPublicKey(unittest.TestCase):
- @patch('browserid.certificates.requests')
+ @patch('browserid.wellknown.requests')
def _fetch(self, hostname, requests, well_known_url=None,
side_effect=None, response_text='', status_code=200):
response = Mock()
@@ -38,14 +38,22 @@ def _fetch(self, hostname, requests, well_known_url=None,
if well_known_url is not None:
kwargs['well_known_url'] = well_known_url
- return fetch_public_key(hostname, **kwargs)
+ wellknown = fetch_wellknown_file(hostname, **kwargs)
+
+ try:
+ key = wellknown['public-key']
+ except KeyError:
+ raise InvalidIssuerError('Host %r has malformed public key '
+ 'document' % hostname)
+
+ return key
def test_connection_error(self):
"""If there is an error connecting, raise a ConnectionError."""
with self.assertRaises(ConnectionError):
self._fetch('test.com', side_effect=RequestException)
- @patch('browserid.certificates.fetch_public_key')
+ @patch('browserid.wellknown.fetch_wellknown_file')
def test_missing_well_known_document(self, fetch):
with self.assertRaises(InvalidIssuerError):
self._fetch('test.com', status_code=404)
@@ -67,10 +75,10 @@ def post(url, data):
return response
post.called = False
- with patch('browserid.certificates.requests') as requests:
+ with patch('browserid.wellknown.requests') as requests:
requests.post = post
with self.assertRaises(InvalidIssuerError):
- fetch_public_key('test.com')
+ fetch_wellknown_file('test.com')
def test_well_known_doc_with_no_public_key(self):
with self.assertRaises(InvalidIssuerError):
13 browserid/verifiers/local.py
View
@@ -7,7 +7,7 @@
from browserid import jwt
from browserid.verifiers import Verifier
-from browserid.certificates import CertificatesManager
+from browserid.wellknown import WellKnownManager
from browserid.utils import unbundle_certs_and_assertion
from browserid.errors import (InvalidSignatureError,
ExpiredSignatureError,
@@ -26,17 +26,14 @@ class LocalVerifier(Verifier):
verify() method and let it work its magic.
"""
- def __init__(self, audiences=None, trusted_secondaries=None, certs=None,
- warning=True):
+ def __init__(self, audiences=None, trusted_secondaries=None,
+ wellknown_manager=None, warning=True):
if trusted_secondaries is None:
trusted_secondaries = DEFAULT_TRUSTED_SECONDARIES
- if certs is None:
- certs = CertificatesManager()
-
super(LocalVerifier, self).__init__(audiences)
self.trusted_secondaries = trusted_secondaries
- self.certs = certs
+ self.wellknown_manager = wellknown_manager or WellKnownManager()
if warning:
_emit_warning()
@@ -126,7 +123,7 @@ def verify_certificate_chain(self, certificates, now=None):
if now is None:
now = int(time.time() * 1000)
root_issuer = certificates[0].payload["iss"]
- root_key = self.certs[root_issuer]
+ root_key = self.wellknown_manager.get_key(root_issuer)
current_key = root_key
for cert in certificates:
if cert.payload["exp"] < now:
53 browserid/certificates.py → browserid/wellknown.py
View
@@ -16,11 +16,10 @@
WELL_KNOWN_URL = "/.well-known/browserid"
-class CertificatesManager(object):
- """A simple certificate handler. It acts like a dictionary of
- certificates. The key being the hostname and the value the certificate
- itself. the certificate manager populates itself automatically, so you
- don't need to fetch the public key when you get a KeyError.
+class WellKnownManager(object):
+ """A simple well-known BrowserID file handler. It acts like a dict of
+ BrowserID well-known files. This manager populates itself automatically,
+ so you don't need to fetch the file when you get a KeyError.
"""
def __init__(self, cache=None, verify=None, **kwargs):
@@ -32,22 +31,31 @@ def __init__(self, cache=None, verify=None, **kwargs):
def __getitem__(self, hostname):
try:
# Use a cached key if available.
- (error, key) = self.cache[hostname]
+ (error, support) = self.cache[hostname]
except KeyError:
# Fetch the key afresh from the specified server.
# Cache any failures so we're not flooding bad hosts.
- error = key = None
+ error = support = None
try:
- key = self.fetch_public_key(hostname)
+ support = self.fetch_wellknown_file(hostname)
except Exception, e: # NOQA
error = e
- self.cache[hostname] = (error, key)
+ self.cache[hostname] = (error, support)
if error is not None:
raise error
+ return support
+
+ def get_key(self, hostname):
+ try:
+ key = self[hostname]['public-key']
+ except KeyError:
+ raise InvalidIssuerError(
+ "Host %r doesn't provide a public key" % hostname)
+
return key
- def fetch_public_key(self, hostname):
- return fetch_public_key(hostname, verify=self.verify)
+ def fetch_wellknown_file(self, hostname):
+ return fetch_wellknown_file(hostname, verify=self.verify)
class FIFOCache(object):
@@ -75,7 +83,7 @@ def __getitem__(self, key):
This method retrieves the value cached under the given key, evicting
it from the cache if expired.
- If the key doesn't exist, it loads it using the fetch_public_key
+ If the key doesn't exist, it loads it using the fetch_wellknown_file
method.
"""
(timestamp, value) = self.items_map[key]
@@ -152,26 +160,25 @@ def _get(url, verify):
raise ConnectionError(msg)
-def fetch_public_key(hostname, well_known_url=WELL_KNOWN_URL, verify=None):
- """Fetch the BrowserID public key for the given hostname.
+def fetch_wellknown_file(hostname, well_known_url=WELL_KNOWN_URL, verify=None):
+ """Fetch the BrowserID well-known file for the given hostname.
- This function uses the well-known BrowserID meta-data file to extract
- the public key for the given hostname.
+ This function fetches and parses the well-known BrowserID meta-data file.
:param verify: verify the certificate when requesting ssl resources
"""
hostname = 'https://%s' % hostname
- # Try to find the public key. If it can't be found then we
+ # Try to find the support file. If it can't be found then we
# raise an InvalidIssuerError. Any other connection-related
# errors are passed back up to the caller.
response = _get(urljoin(hostname, well_known_url), verify=verify)
if response.status_code == 200:
try:
- key = json.loads(response.text)['public-key']
- except (ValueError, KeyError):
- raise InvalidIssuerError('Host %r has malformed public key '
- 'document' % hostname)
+ data = json.loads(response.text)
+ except ValueError:
+ raise InvalidIssuerError('Host %r has malformed BrowserID '
+ 'metadata document' % hostname)
else:
# The well-known file was not found, try falling back to
# just "/pk".
@@ -182,8 +189,10 @@ def fetch_public_key(hostname, well_known_url=WELL_KNOWN_URL, verify=None):
except ValueError:
raise InvalidIssuerError('Host %r has malformed BrowserID '
'metadata document' % hostname)
+
+ data = {"public-key": key}
else:
raise InvalidIssuerError('Host %r does not declare support for '
'BrowserID' % hostname)
- return key
+ return data
Please sign in to comment.
Something went wrong with that request. Please try again.