Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Rename "WellKnownManager" to "SupportDocumentManager".

  • Loading branch information...
commit fa3d2b6a29455d4526ec9dff6af835d49c76d0f0 1 parent c0c0b8f
@rfk rfk authored
View
68 browserid/wellknown.py → browserid/supportdoc.py
@@ -19,10 +19,12 @@
DEFAULT_MAX_DELEGATIONS = 6
-class WellKnownManager(object):
- """A simple well-known BrowserID file handler. It acts like a dict of
- BrowserID well-known files. This manager populates itself automatically,
- so you don't need to fetch the file when you get a KeyError.
+class SupportDocumentManager(object):
+ """Manager for mapping hostnames to their BrowserID support documents.
+
+ This object handles the association of a hostname with its BrowserID
+ support document, if any. It automatically fetches support documents
+ as required and stores them in a cache for future reference.
"""
def __init__(self, cache=None, verify=None, **kwargs):
@@ -31,47 +33,53 @@ def __init__(self, cache=None, verify=None, **kwargs):
cache = FIFOCache(**kwargs)
self.cache = cache
- def __getitem__(self, hostname):
+ def get_support_document(self, hostname):
+ """Get the BrowserID support document for the given hostname."""
try:
# Use a cached key if available.
- (error, support) = self.cache[hostname]
+ (error, supportdoc) = self.cache[hostname]
except KeyError:
# Fetch the key afresh from the specified server.
# Cache any failures so we're not flooding bad hosts.
- error = support = None
+ error = supportdoc = None
try:
- support = self.fetch_wellknown_file(hostname)
+ supportdoc = self.fetch_support_document(hostname)
except Exception, e: # NOQA
error = e
- self.cache[hostname] = (error, support)
+ self.cache[hostname] = (error, supportdoc)
if error is not None:
raise error
- return support
+ return supportdoc
def get_key(self, hostname):
+ """Get the public key for verifying assertions from the given host."""
+ supportdoc = self.get_support_document(hostname)
try:
- key = self[hostname]['public-key']
+ key = supportdoc['public-key']
except KeyError:
raise InvalidIssuerError(
"Host %r doesn't provide a public key" % hostname)
return key
- def fetch_wellknown_file(self, hostname):
- return fetch_wellknown_file(hostname, verify=self.verify)
+ def fetch_support_document(self, hostname):
+ """Fetch the BrowserID support document for the given hostname."""
+ return fetch_support_document(hostname, verify=self.verify)
- def is_issuer_valid(self, hostname, issuer, trusted_secondaries=None,
+ def is_trusted_issuer(self, hostname, issuer, trusted_secondaries=None,
max_delegations=DEFAULT_MAX_DELEGATIONS):
- """
- This method allows you to check if a hostname is valid for an issuer.
+ """Check whether an issuer is trusted to assert for a given hostname.
+
+ This method checks whether the given issuer is trusted to assert
+ identitiers for the given hostname. There are three cases in which
+ this can be true:
- There are three methods where this will return True:
- * The hostname is the issuer
- * The hostname is in the list of trusted secondaries
- * When the hostname delegates to the issuer
+ * The hostname and issuer and the same
+ * The issuer is in the list of trusted secondaries
+ * The hostname delegates authority to the issuer
- You can disable the check for delegated primaries by setting the
- max_delegations to 0.
+ You can disable the check for delegated authority by setting the
+ max_delegations argument to 0.
"""
if hostname == issuer:
return True
@@ -84,8 +92,8 @@ def is_issuer_valid(self, hostname, issuer, trusted_secondaries=None,
num_delegations = 0
while num_delegations < max_delegations:
- support = self[hostname]
- authority = support.get("authority")
+ supportdoc = self.get_support_document(hostname)
+ authority = supportdoc.get("authority")
if authority is None:
break
if authority == issuer:
@@ -120,9 +128,6 @@ def __getitem__(self, key):
This method retrieves the value cached under the given key, evicting
it from the cache if expired.
-
- If the key doesn't exist, it loads it using the fetch_wellknown_file
- method.
"""
(timestamp, value) = self.items_map[key]
if self.cache_timeout:
@@ -198,16 +203,19 @@ def _get(url, verify):
raise ConnectionError(msg)
-def fetch_wellknown_file(hostname, well_known_url=WELL_KNOWN_URL, verify=None):
+def fetch_support_document(hostname, well_known_url=None, verify=None):
"""Fetch the BrowserID well-known file for the given hostname.
This function fetches and parses the well-known BrowserID meta-data file.
:param verify: verify the certificate when requesting ssl resources
"""
+ if well_known_url is None:
+ well_known_url = WELL_KNOWN_URL
+
hostname = 'https://%s' % hostname
- # Try to find the support file. If it can't be found then we
+ # Try to find the support document. If it can't be found then we
# raise an InvalidIssuerError. Any other connection-related
# errors are passed back up to the caller.
response = _get(urljoin(hostname, well_known_url), verify=verify)
@@ -216,7 +224,7 @@ def fetch_wellknown_file(hostname, well_known_url=WELL_KNOWN_URL, verify=None):
data = json.loads(response.text)
except ValueError:
raise InvalidIssuerError('Host %r has malformed BrowserID '
- 'metadata document' % hostname)
+ 'support document' % hostname)
else:
# The well-known file was not found, try falling back to
# just "/pk".
View
26 browserid/tests/support.py
@@ -8,7 +8,7 @@
from browserid.utils import encode_bytes, bundle_certs_and_assertion
-from browserid import wellknown
+from browserid import supportdoc
from browserid import jwt
# if unittest2 isn't available, assume that we are python 2.7
@@ -50,8 +50,8 @@ def _hex(value):
return value
-def fetch_wellknown_file(hostname, verify=None):
- """Fetch the BrowserID support file for the given hostname.
+def fetch_support_document(hostname, verify=None):
+ """Fetch the BrowserID support document for the given hostname.
Actually, this implementation generates a key locally based on
a hash of the hostname. This lets us exercise all the crypto code
@@ -156,7 +156,7 @@ def make_assertion(email, audience, issuer=None, exp=None,
@contextmanager
-def patched_wellknown_fetching(replacement=None, exc=None):
+def patched_supportdoc_fetching(replacement=None, exc=None):
"""Patch the key fetching mechanism with the given callable.
This is to allow easier testing.
@@ -167,8 +167,18 @@ def raise_exception(*args, **kwargs):
if exc is not None:
replacement = raise_exception
if replacement is None:
- replacement = fetch_wellknown_file
- old_callable = wellknown.fetch_wellknown_file
- wellknown.fetch_wellknown_file = replacement
+ replacement = fetch_support_document
+ old_callable = supportdoc.fetch_support_document
+ supportdoc.fetch_support_document = replacement
yield
- wellknown.fetch_wellknown_file = old_callable
+ supportdoc.fetch_support_document = old_callable
+
+
+def callwith(context):
+ """Decorator to call a function with a context manager."""
+ def decorator(func):
+ def wrapper(*args, **kwds):
+ with context:
+ return func(*args, **kwds)
+ return wrapper
+ return decorator
View
56 browserid/tests/test_wellknown.py → browserid/tests/test_supportdoc.py
@@ -3,11 +3,11 @@
from mock import Mock, patch
from requests.exceptions import RequestException
-from browserid.wellknown import fetch_wellknown_file, WellKnownManager
+from browserid.supportdoc import fetch_support_document, SupportDocumentManager
from browserid.errors import ConnectionError, InvalidIssuerError
from browserid.tests.support import unittest
-from browserid.tests.support import (fetch_wellknown_file as
- patched_wellknown_file)
+from browserid.tests.support import (fetch_support_document as
+ patched_support_document)
# Retrieved from browserid.org on April 3rd 2012
@@ -27,7 +27,8 @@
class TestFetchPublicKey(unittest.TestCase):
- @patch('browserid.wellknown.requests')
+
+ @patch('browserid.supportdoc.requests')
def _fetch(self, hostname, requests, well_known_url=None,
side_effect=None, response_text='', status_code=200):
response = Mock()
@@ -40,10 +41,10 @@ def _fetch(self, hostname, requests, well_known_url=None,
if well_known_url is not None:
kwargs['well_known_url'] = well_known_url
- wellknown = fetch_wellknown_file(hostname, **kwargs)
+ supportdoc = fetch_support_document(hostname, **kwargs)
try:
- key = wellknown['public-key']
+ key = supportdoc['public-key']
except KeyError:
raise InvalidIssuerError('Host %r has malformed public key '
'document' % hostname)
@@ -55,12 +56,12 @@ def test_connection_error(self):
with self.assertRaises(ConnectionError):
self._fetch('test.com', side_effect=RequestException)
- @patch('browserid.wellknown.fetch_wellknown_file')
- def test_missing_well_known_document(self, fetch):
+ @patch('browserid.supportdoc.fetch_support_document')
+ def test_missing_support_document(self, fetch):
with self.assertRaises(InvalidIssuerError):
self._fetch('test.com', status_code=404)
- def test_malformed_well_known_document(self):
+ def test_malformed_support_document(self):
response_text = 'I AINT NO JSON, FOOL!'
with self.assertRaises(InvalidIssuerError):
self._fetch('test.com', response_text=response_text)
@@ -77,12 +78,12 @@ def post(url, data):
return response
post.called = False
- with patch('browserid.wellknown.requests') as requests:
+ with patch('browserid.supportdoc.requests') as requests:
requests.post = post
with self.assertRaises(InvalidIssuerError):
- fetch_wellknown_file('test.com')
+ fetch_support_document('test.com')
- def test_well_known_doc_with_no_public_key(self):
+ def test_support_document_with_no_public_key(self):
with self.assertRaises(InvalidIssuerError):
self._fetch('test.com', response_text='{}')
@@ -91,31 +92,36 @@ def test_successful_fetch(self):
self.assertEquals(key, BROWSERID_PK_PY['public-key'])
-class TestIssuerValidation(unittest.TestCase):
+class TestTrustedIssuers(unittest.TestCase):
def setUp(self):
- self.wellknown = WellKnownManager()
+ self.supportdocmgr = SupportDocumentManager()
+
+ def _is_trusted_issuer(self, *args, **kwds):
+ return self.supportdocmgr.is_trusted_issuer(*args, **kwds)
def test_trusted_secondaries(self):
- self.assertTrue(self.wellknown.is_issuer_valid('test.com', 'browserid.org'))
- self.assertFalse(self.wellknown.is_issuer_valid('test.com',
- 'browserid.org', trusted_secondaries=[], max_delegations=0))
+ self.assertTrue(self._is_trusted_issuer('test.com', 'browserid.org'))
+ self.assertFalse(self._is_trusted_issuer('test.com', 'browserid.org',
+ trusted_secondaries=[], max_delegations=0))
def test_hostname_issuer(self):
- self.assertTrue(self.wellknown.is_issuer_valid('test.com', 'test.com'))
- self.assertFalse(self.wellknown.is_issuer_valid('abc.com', 'test.com',
+ self.assertTrue(self._is_trusted_issuer('test.com', 'test.com'))
+ self.assertFalse(self._is_trusted_issuer('abc.com', 'test.com',
max_delegations=0))
- @patch('browserid.wellknown.fetch_wellknown_file', patched_wellknown_file)
+ @patch('browserid.supportdoc.fetch_support_document',
+ patched_support_document)
def test_delegated_primary(self):
- self.assertTrue(self.wellknown.is_issuer_valid('redirect.org',
+ self.assertTrue(self._is_trusted_issuer('redirect.org',
'delegated.org'))
def test_disabled_delegated_primary(self):
- self.assertFalse(self.wellknown.is_issuer_valid('redirect.org',
+ self.assertFalse(self._is_trusted_issuer('redirect.org',
'delegated.org', max_delegations=0))
- @patch('browserid.wellknown.fetch_wellknown_file', patched_wellknown_file)
+ @patch('browserid.supportdoc.fetch_support_document',
+ patched_support_document)
def test_infinite_delegated_primary_recursion(self):
- self.assertFalse(self.wellknown.is_issuer_valid('infinite.org', None))
- self.assertFalse(self.wellknown.is_issuer_valid('infinite.org',
+ self.assertFalse(self._is_trusted_issuer('infinite.org', None))
+ self.assertFalse(self._is_trusted_issuer('infinite.org',
'delegated.org'))
View
49 browserid/tests/test_verifiers.py
@@ -8,13 +8,12 @@
from mock import Mock, patch
import browserid
-from browserid.tests.support import (patched_wellknown_fetching,
+from browserid.tests.support import (patched_supportdoc_fetching,
get_keypair,
- fetch_wellknown_file,
- make_assertion, unittest)
+ make_assertion, unittest, callwith)
from browserid import jwt
from browserid import RemoteVerifier, LocalVerifier
-from browserid.wellknown import FIFOCache, WellKnownManager
+from browserid.supportdoc import FIFOCache, SupportDocumentManager
from browserid.verifiers.workerpool import WorkerPoolVerifier
from browserid.utils import (encode_json_bytes,
decode_json_bytes,
@@ -135,24 +134,24 @@ def test_error_handling_in_verify_certificate_chain(self):
self.assertRaises(ExpiredSignatureError,
self.verifier.verify_certificate_chain, certs)
- @patch('browserid.wellknown.fetch_wellknown_file', fetch_wellknown_file)
+ @callwith(patched_supportdoc_fetching())
def test_well_known_doc_with_public_key(self):
assertion = make_assertion("t@m.com", "http://e.com")
self.assertTrue(self.verifier.verify(assertion))
- @patch('browserid.wellknown.fetch_wellknown_file', fetch_wellknown_file)
+ @callwith(patched_supportdoc_fetching())
def test_delegated_primary(self):
assertion = make_assertion("t@redirect.org", "http://persona.org",
issuer="delegated.org")
self.assertTrue(self.verifier.verify(assertion))
- @patch('browserid.wellknown.fetch_wellknown_file', fetch_wellknown_file)
+ @callwith(patched_supportdoc_fetching())
def test_double_delegated_primary(self):
assertion = make_assertion("t@redirect-twice.org",
"http://persona.org", issuer="delegated.org")
self.assertTrue(self.verifier.verify(assertion))
- @patch('browserid.wellknown.fetch_wellknown_file', fetch_wellknown_file)
+ @callwith(patched_supportdoc_fetching())
def test_audience_verification(self):
# create an assertion with the audience set to http://persona.org for
@@ -243,7 +242,7 @@ def test_handling_of_503_error_from_server(self):
class TestDummyVerifier(unittest.TestCase, VerifierTestCases):
def setUp(self):
- self.patched = patched_wellknown_fetching()
+ self.patched = patched_supportdoc_fetching()
self.patched.__enter__()
self.verifier = LocalVerifier(["*"], warning=False)
@@ -302,60 +301,62 @@ def test_verification_of_dummy_assertion_with_bad_certificate_sig(self):
assertion)
def test_cache_eviction_based_on_time(self):
- wellknown_manager = WellKnownManager(FIFOCache(cache_timeout=0.1))
- verifier = LocalVerifier(["*"], wellknown_manager=wellknown_manager,
+ supportdoc_mgr = SupportDocumentManager(FIFOCache(cache_timeout=0.1))
+ verifier = LocalVerifier(["*"], supportdoc_manager=supportdoc_mgr,
warning=False)
# Prime the cache by verifying an assertion.
assertion = make_assertion("test@example.com", "")
self.assertTrue(verifier.verify(assertion))
# Make it error out if re-fetching the keys
- with patched_wellknown_fetching(exc=RuntimeError("key fetch disabled")):
- verifier.fetch_wellknown_file = fetch_wellknown_file
+ exc = RuntimeError("key fetch disabled")
+ with patched_supportdoc_fetching(exc=exc):
# It should be in the cache, so this works fine.
- verifier.verify(assertion)
+ self.assertTrue(verifier.verify(assertion))
# But after sleeping it gets evicted and the error is triggered.
time.sleep(0.1)
self.assertRaises(RuntimeError, verifier.verify, assertion)
def test_cache_eviction_based_on_size(self):
- wellknown_manager = WellKnownManager(max_size=2)
- verifier = LocalVerifier(["*"], wellknown_manager=wellknown_manager,
+ supportdoc_mgr = SupportDocumentManager(max_size=2)
+ verifier = LocalVerifier(["*"], supportdoc_manager=supportdoc_mgr,
warning=False)
# Prime the cache by verifying some assertions.
assertion1 = make_assertion("test@1.com", "", "1.com")
self.assertTrue(verifier.verify(assertion1))
assertion2 = make_assertion("test@2.com", "", "2.com")
self.assertTrue(verifier.verify(assertion2))
- self.assertEquals(len(wellknown_manager.cache), 2)
+ self.assertEquals(len(supportdoc_mgr.cache), 2)
# Hitting a third host should evict the first public key.
assertion3 = make_assertion("test@3.com", "", "3.com")
self.assertTrue(verifier.verify(assertion3))
- self.assertEquals(len(wellknown_manager.cache), 2)
+ self.assertEquals(len(supportdoc_mgr.cache), 2)
# Make it error out if re-fetching any keys
- with patched_wellknown_fetching(exc=RuntimeError("key fetch disabled")):
+ exc = RuntimeError("key fetch disabled")
+ with patched_supportdoc_fetching(exc=exc):
# It should have to re-fetch for 1, but not 2.
self.assertTrue(verifier.verify(assertion2))
self.assertRaises(RuntimeError, verifier.verify, assertion1)
def test_cache_eviction_during_write(self):
- wellknown_manager = WellKnownManager(cache_timeout=0.1)
- verifier = LocalVerifier(["*"], wellknown_manager=wellknown_manager,
+ supportdoc_mgr = SupportDocumentManager(cache_timeout=0.1)
+ verifier = LocalVerifier(["*"], supportdoc_manager=supportdoc_mgr,
warning=False)
# Prime the cache by verifying an assertion.
assertion1 = make_assertion("test@1.com", "", "1.com")
self.assertTrue(verifier.verify(assertion1))
- self.assertEquals(len(wellknown_manager.cache), 1)
+ self.assertEquals(len(supportdoc_mgr.cache), 1)
# Let that cached key expire
time.sleep(0.1)
# Now grab a different key; caching it should purge the expired key.
assertion2 = make_assertion("test@2.com", "", "2.com")
self.assertTrue(verifier.verify(assertion2))
- self.assertEquals(len(wellknown_manager.cache), 1)
+ self.assertEquals(len(supportdoc_mgr.cache), 1)
# Check that only the second entry is in cache.
- with patched_wellknown_fetching(exc=RuntimeError("key fetch disabled")):
+ exc = RuntimeError("key fetch disabled")
+ with patched_supportdoc_fetching(exc=exc):
self.assertTrue(verifier.verify(assertion2))
self.assertRaises(RuntimeError, verifier.verify, assertion1)
View
15 browserid/verifiers/local.py
@@ -7,7 +7,7 @@
from browserid import jwt
from browserid.verifiers import Verifier
-from browserid.wellknown import WellKnownManager, DEFAULT_TRUSTED_SECONDARIES
+from browserid.supportdoc import SupportDocumentManager
from browserid.utils import unbundle_certs_and_assertion
from browserid.errors import (InvalidSignatureError,
ExpiredSignatureError,
@@ -23,13 +23,12 @@ class LocalVerifier(Verifier):
"""
def __init__(self, audiences=None, trusted_secondaries=None,
- wellknown_manager=None, warning=True):
- if trusted_secondaries is None:
- trusted_secondaries = DEFAULT_TRUSTED_SECONDARIES
-
+ supportdoc_manager=None, warning=True):
super(LocalVerifier, self).__init__(audiences)
self.trusted_secondaries = trusted_secondaries
- self.wellknown_manager = wellknown_manager or WellKnownManager()
+ if supportdoc_manager is None:
+ supportdoc_manager = SupportDocumentManager()
+ self.supportdoc_manager = supportdoc_manager
if warning:
_emit_warning()
@@ -81,7 +80,7 @@ def verify(self, assertion, audience=None, now=None):
email = certificates[-1].payload["principal"]["email"]
root_issuer = certificates[0].payload["iss"]
provider = email.split('@')[-1]
- if not self.wellknown_manager.is_issuer_valid(provider,
+ if not self.supportdoc_manager.is_trusted_issuer(provider,
root_issuer, self.trusted_secondaries):
msg = "untrusted root issuer: %s" % (root_issuer,)
raise InvalidSignatureError(msg)
@@ -120,7 +119,7 @@ def verify_certificate_chain(self, certificates, now=None):
if now is None:
now = int(time.time() * 1000)
root_issuer = certificates[0].payload["iss"]
- root_key = self.wellknown_manager.get_key(root_issuer)
+ root_key = self.supportdoc_manager.get_key(root_issuer)
current_key = root_key
for cert in certificates:
if cert.payload["exp"] < now:
Please sign in to comment.
Something went wrong with that request. Please try again.