Skip to content

Loading…

Rfk/delegated #13

Merged
merged 7 commits into from

3 participants

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
View
1 CHANGES.txt
@@ -5,6 +5,7 @@
use when M2Crypto is not available.
* Added "from_pem_data" and "to_pem_data" methods to Key objects.
Currently these are only available when M2Crypto is installed.
+ * Added support for delegation of authority; thanks @kylef.
0.6.1 - 2012-06-07
==================
View
3 TODO.txt
@@ -1,3 +0,0 @@
-
- * delegation of authority for a domain
-
View
105 browserid/certificates.py → browserid/supportdoc.py
@@ -13,14 +13,18 @@
from browserid.errors import (ConnectionError,
InvalidIssuerError)
+DEFAULT_TRUSTED_SECONDARIES = ("browserid.org", "diresworb.org",
+ "dev.diresworb.org")
WELL_KNOWN_URL = "/.well-known/browserid"
+DEFAULT_MAX_DELEGATIONS = 6
-class CertificatesManager(object):
- """A simple certificate handler. It acts like a dictionary of
- certificates. The key being the hostname and the value the certificate
- itself. the certificate manager populates itself automatically, so you
- don't need to fetch the public key when you get a KeyError.
+class SupportDocumentManager(object):
+ """Manager for mapping hostnames to their BrowserID support documents.
+
+ This object handles the association of a hostname with its BrowserID
+ support document, if any. It automatically fetches support documents
+ as required and stores them in a cache for future reference.
"""
def __init__(self, cache=None, verify=None, **kwargs):
@@ -29,25 +33,75 @@ def __init__(self, cache=None, verify=None, **kwargs):
cache = FIFOCache(**kwargs)
self.cache = cache
- def __getitem__(self, hostname):
+ def get_support_document(self, hostname):
+ """Get the BrowserID support document for the given hostname."""
try:
# Use a cached key if available.
- (error, key) = self.cache[hostname]
+ (error, supportdoc) = self.cache[hostname]
except KeyError:
# Fetch the key afresh from the specified server.
# Cache any failures so we're not flooding bad hosts.
- error = key = None
+ error = supportdoc = None
try:
- key = self.fetch_public_key(hostname)
+ supportdoc = self.fetch_support_document(hostname)
except Exception, e: # NOQA
error = e
- self.cache[hostname] = (error, key)
+ self.cache[hostname] = (error, supportdoc)
if error is not None:
raise error
+ return supportdoc
+
+ def get_key(self, hostname):
+ """Get the public key for verifying assertions from the given host."""
+ supportdoc = self.get_support_document(hostname)
+ try:
+ key = supportdoc['public-key']
+ except KeyError:
+ raise InvalidIssuerError(
+ "Host %r doesn't provide a public key" % hostname)
+
return key
- def fetch_public_key(self, hostname):
- return fetch_public_key(hostname, verify=self.verify)
+ def fetch_support_document(self, hostname):
+ """Fetch the BrowserID support document for the given hostname."""
+ return fetch_support_document(hostname, verify=self.verify)
+
+ def is_trusted_issuer(self, hostname, issuer, trusted_secondaries=None,
+ max_delegations=DEFAULT_MAX_DELEGATIONS):
+ """Check whether an issuer is trusted to assert for a given hostname.
+
+ This method checks whether the given issuer is trusted to assert
+ identitiers for the given hostname. There are three cases in which
+ this can be true:
+
+ * The hostname and issuer and the same
+ * The issuer is in the list of trusted secondaries
+ * The hostname delegates authority to the issuer
+
+ You can disable the check for delegated authority by setting the
+ max_delegations argument to 0.
+ """
+ if hostname == issuer:
+ return True
+
+ if trusted_secondaries is None:
+ trusted_secondaries = DEFAULT_TRUSTED_SECONDARIES
+
+ if issuer in trusted_secondaries:
+ return True
+
+ num_delegations = 0
+ while num_delegations < max_delegations:
+ supportdoc = self.get_support_document(hostname)
+ authority = supportdoc.get("authority")
+ if authority is None:
+ break
+ if authority == issuer:
+ return True
+ hostname = authority
+ num_delegations += 1
+
+ return False
class FIFOCache(object):
@@ -74,9 +128,6 @@ def __getitem__(self, key):
This method retrieves the value cached under the given key, evicting
it from the cache if expired.
-
- If the key doesn't exist, it loads it using the fetch_public_key
- method.
"""
(timestamp, value) = self.items_map[key]
if self.cache_timeout:
@@ -152,26 +203,28 @@ def _get(url, verify):
raise ConnectionError(msg)
-def fetch_public_key(hostname, well_known_url=WELL_KNOWN_URL, verify=None):
- """Fetch the BrowserID public key for the given hostname.
+def fetch_support_document(hostname, well_known_url=None, verify=None):
+ """Fetch the BrowserID well-known file for the given hostname.
- This function uses the well-known BrowserID meta-data file to extract
- the public key for the given hostname.
+ This function fetches and parses the well-known BrowserID meta-data file.
:param verify: verify the certificate when requesting ssl resources
"""
+ if well_known_url is None:
+ well_known_url = WELL_KNOWN_URL
+
hostname = 'https://%s' % hostname
- # Try to find the public key. If it can't be found then we
+ # Try to find the support document. If it can't be found then we
# raise an InvalidIssuerError. Any other connection-related
# errors are passed back up to the caller.
response = _get(urljoin(hostname, well_known_url), verify=verify)
if response.status_code == 200:
try:
- key = json.loads(response.text)['public-key']
- except (ValueError, KeyError):
- raise InvalidIssuerError('Host %r has malformed public key '
- 'document' % hostname)
+ data = json.loads(response.text)
+ except ValueError:
+ raise InvalidIssuerError('Host %r has malformed BrowserID '
+ 'support document' % hostname)
else:
# The well-known file was not found, try falling back to
# just "/pk".
@@ -182,8 +235,10 @@ def fetch_public_key(hostname, well_known_url=WELL_KNOWN_URL, verify=None):
except ValueError:
raise InvalidIssuerError('Host %r has malformed BrowserID '
'metadata document' % hostname)
+
+ data = {"public-key": key}
else:
raise InvalidIssuerError('Host %r does not declare support for '
'BrowserID' % hostname)
- return key
+ return data
View
40 browserid/tests/support.py
@@ -8,7 +8,7 @@
from browserid.utils import encode_bytes, bundle_certs_and_assertion
-from browserid import certificates
+from browserid import supportdoc
from browserid import jwt
# if unittest2 isn't available, assume that we are python 2.7
@@ -50,14 +50,24 @@ def _hex(value):
return value
-def fetch_public_key(hostname, verify=None):
- """Fetch the BrowserID public key for the given hostname.
+def fetch_support_document(hostname, verify=None):
+ """Fetch the BrowserID support document for the given hostname.
- Actually, this implementation generates the key locally based on
+ Actually, this implementation generates a key locally based on
a hash of the hostname. This lets us exercise all the crypto code
while using predictable local values.
"""
- return get_keypair(hostname)[0]
+
+ if hostname == "redirect.org":
+ return {"authority": "delegated.org"}
+
+ if hostname == "redirect-twice.org":
+ return {"authority": "redirect.org"}
+
+ if hostname == "infinite.org":
+ return {"authority": "infinite.org"}
+
+ return {"public-key": get_keypair(hostname)[0]}
def get_keypair(hostname):
@@ -146,7 +156,7 @@ def make_assertion(email, audience, issuer=None, exp=None,
@contextmanager
-def patched_key_fetching(replacement=None, exc=None):
+def patched_supportdoc_fetching(replacement=None, exc=None):
"""Patch the key fetching mechanism with the given callable.
This is to allow easier testing.
@@ -157,8 +167,18 @@ def raise_exception(*args, **kwargs):
if exc is not None:
replacement = raise_exception
if replacement is None:
- replacement = fetch_public_key
- old_callable = certificates.fetch_public_key
- certificates.fetch_public_key = replacement
+ replacement = fetch_support_document
+ old_callable = supportdoc.fetch_support_document
+ supportdoc.fetch_support_document = replacement
yield
- certificates.fetch_public_key = old_callable
+ supportdoc.fetch_support_document = old_callable
+
+
+def callwith(context):
+ """Decorator to call a function with a context manager."""
+ def decorator(func):
+ def wrapper(*args, **kwds):
+ with context:
+ return func(*args, **kwds)
+ return wrapper
+ return decorator
View
64 browserid/tests/test_certificates.py → browserid/tests/test_supportdoc.py
@@ -3,9 +3,11 @@
from mock import Mock, patch
from requests.exceptions import RequestException
-from browserid.certificates import fetch_public_key
+from browserid.supportdoc import fetch_support_document, SupportDocumentManager
from browserid.errors import ConnectionError, InvalidIssuerError
from browserid.tests.support import unittest
+from browserid.tests.support import (fetch_support_document as
+ patched_support_document)
# Retrieved from browserid.org on April 3rd 2012
@@ -25,7 +27,8 @@
class TestFetchPublicKey(unittest.TestCase):
- @patch('browserid.certificates.requests')
+
+ @patch('browserid.supportdoc.requests')
def _fetch(self, hostname, requests, well_known_url=None,
side_effect=None, response_text='', status_code=200):
response = Mock()
@@ -38,19 +41,27 @@ def _fetch(self, hostname, requests, well_known_url=None,
if well_known_url is not None:
kwargs['well_known_url'] = well_known_url
- return fetch_public_key(hostname, **kwargs)
+ supportdoc = fetch_support_document(hostname, **kwargs)
+
+ try:
+ key = supportdoc['public-key']
+ except KeyError:
+ raise InvalidIssuerError('Host %r has malformed public key '
+ 'document' % hostname)
+
+ return key
def test_connection_error(self):
"""If there is an error connecting, raise a ConnectionError."""
with self.assertRaises(ConnectionError):
self._fetch('test.com', side_effect=RequestException)
- @patch('browserid.certificates.fetch_public_key')
- def test_missing_well_known_document(self, fetch):
+ @patch('browserid.supportdoc.fetch_support_document')
+ def test_missing_support_document(self, fetch):
with self.assertRaises(InvalidIssuerError):
self._fetch('test.com', status_code=404)
- def test_malformed_well_known_document(self):
+ def test_malformed_support_document(self):
response_text = 'I AINT NO JSON, FOOL!'
with self.assertRaises(InvalidIssuerError):
self._fetch('test.com', response_text=response_text)
@@ -67,15 +78,50 @@ def post(url, data):
return response
post.called = False
- with patch('browserid.certificates.requests') as requests:
+ with patch('browserid.supportdoc.requests') as requests:
requests.post = post
with self.assertRaises(InvalidIssuerError):
- fetch_public_key('test.com')
+ fetch_support_document('test.com')
- def test_well_known_doc_with_no_public_key(self):
+ def test_support_document_with_no_public_key(self):
with self.assertRaises(InvalidIssuerError):
self._fetch('test.com', response_text='{}')
def test_successful_fetch(self):
key = self._fetch('test.com', response_text=BROWSERID_PK)
self.assertEquals(key, BROWSERID_PK_PY['public-key'])
+
+
+class TestTrustedIssuers(unittest.TestCase):
+ def setUp(self):
+ self.supportdocmgr = SupportDocumentManager()
+
+ def _is_trusted_issuer(self, *args, **kwds):
+ return self.supportdocmgr.is_trusted_issuer(*args, **kwds)
+
+ def test_trusted_secondaries(self):
+ self.assertTrue(self._is_trusted_issuer('test.com', 'browserid.org'))
+ self.assertFalse(self._is_trusted_issuer('test.com', 'browserid.org',
+ trusted_secondaries=[], max_delegations=0))
+
+ def test_hostname_issuer(self):
+ self.assertTrue(self._is_trusted_issuer('test.com', 'test.com'))
+ self.assertFalse(self._is_trusted_issuer('abc.com', 'test.com',
+ max_delegations=0))
+
+ @patch('browserid.supportdoc.fetch_support_document',
+ patched_support_document)
+ def test_delegated_primary(self):
+ self.assertTrue(self._is_trusted_issuer('redirect.org',
+ 'delegated.org'))
+
+ def test_disabled_delegated_primary(self):
+ self.assertFalse(self._is_trusted_issuer('redirect.org',
+ 'delegated.org', max_delegations=0))
+
+ @patch('browserid.supportdoc.fetch_support_document',
+ patched_support_document)
+ def test_infinite_delegated_primary_recursion(self):
+ self.assertFalse(self._is_trusted_issuer('infinite.org', None))
+ self.assertFalse(self._is_trusted_issuer('infinite.org',
+ 'delegated.org'))
View
62 browserid/tests/test_verifiers.py
@@ -8,13 +8,12 @@
from mock import Mock, patch
import browserid
-from browserid.tests.support import (patched_key_fetching,
+from browserid.tests.support import (patched_supportdoc_fetching,
get_keypair,
- fetch_public_key,
- make_assertion, unittest)
+ make_assertion, unittest, callwith)
from browserid import jwt
from browserid import RemoteVerifier, LocalVerifier
-from browserid.certificates import FIFOCache, CertificatesManager
+from browserid.supportdoc import FIFOCache, SupportDocumentManager
from browserid.verifiers.workerpool import WorkerPoolVerifier
from browserid.utils import (encode_json_bytes,
decode_json_bytes,
@@ -135,12 +134,24 @@ def test_error_handling_in_verify_certificate_chain(self):
self.assertRaises(ExpiredSignatureError,
self.verifier.verify_certificate_chain, certs)
- @patch('browserid.certificates.fetch_public_key', fetch_public_key)
+ @callwith(patched_supportdoc_fetching())
def test_well_known_doc_with_public_key(self):
assertion = make_assertion("t@m.com", "http://e.com")
self.assertTrue(self.verifier.verify(assertion))
- @patch('browserid.certificates.fetch_public_key', fetch_public_key)
+ @callwith(patched_supportdoc_fetching())
+ def test_delegated_primary(self):
+ assertion = make_assertion("t@redirect.org", "http://persona.org",
+ issuer="delegated.org")
+ self.assertTrue(self.verifier.verify(assertion))
+
+ @callwith(patched_supportdoc_fetching())
+ def test_double_delegated_primary(self):
+ assertion = make_assertion("t@redirect-twice.org",
+ "http://persona.org", issuer="delegated.org")
+ self.assertTrue(self.verifier.verify(assertion))
+
+ @callwith(patched_supportdoc_fetching())
def test_audience_verification(self):
# create an assertion with the audience set to http://persona.org for
@@ -231,7 +242,7 @@ def test_handling_of_503_error_from_server(self):
class TestDummyVerifier(unittest.TestCase, VerifierTestCases):
def setUp(self):
- self.patched = patched_key_fetching()
+ self.patched = patched_supportdoc_fetching()
self.patched.__enter__()
self.verifier = LocalVerifier(["*"], warning=False)
@@ -261,7 +272,7 @@ def test_verification_of_untrusted_issuer(self):
# Assertions for @moz.com addresses can come from moz.com
assertion = make_assertion("test@moz.com", audience, issuer=issuer)
self.assertTrue(self.verifier.verify(assertion, audience))
- # But assertions for other addresses cannot.
+ # But assertions for other addresses cannot (unless they delegated).
assertion = make_assertion("test@example.com", audience,
issuer=issuer)
self.assertRaises(InvalidSignatureError, self.verifier.verify,
@@ -290,57 +301,62 @@ def test_verification_of_dummy_assertion_with_bad_certificate_sig(self):
assertion)
def test_cache_eviction_based_on_time(self):
- certs = CertificatesManager(FIFOCache(cache_timeout=0.1))
- verifier = LocalVerifier(["*"], certs=certs, warning=False)
+ supportdocs = SupportDocumentManager(FIFOCache(cache_timeout=0.1))
+ verifier = LocalVerifier(["*"], supportdocs=supportdocs,
+ warning=False)
# Prime the cache by verifying an assertion.
assertion = make_assertion("test@example.com", "")
self.assertTrue(verifier.verify(assertion))
# Make it error out if re-fetching the keys
- with patched_key_fetching(exc=RuntimeError("key fetch disabled")):
- verifier.fetch_public_key = fetch_public_key
+ exc = RuntimeError("key fetch disabled")
+ with patched_supportdoc_fetching(exc=exc):
# It should be in the cache, so this works fine.
- verifier.verify(assertion)
+ self.assertTrue(verifier.verify(assertion))
# But after sleeping it gets evicted and the error is triggered.
time.sleep(0.1)
self.assertRaises(RuntimeError, verifier.verify, assertion)
def test_cache_eviction_based_on_size(self):
- certs = CertificatesManager(max_size=2)
- verifier = LocalVerifier(["*"], certs=certs, warning=False)
+ supportdocs = SupportDocumentManager(max_size=2)
+ verifier = LocalVerifier(["*"], supportdocs=supportdocs,
+ warning=False)
# Prime the cache by verifying some assertions.
assertion1 = make_assertion("test@1.com", "", "1.com")
self.assertTrue(verifier.verify(assertion1))
assertion2 = make_assertion("test@2.com", "", "2.com")
self.assertTrue(verifier.verify(assertion2))
- self.assertEquals(len(certs.cache), 2)
+ self.assertEquals(len(supportdocs.cache), 2)
# Hitting a third host should evict the first public key.
assertion3 = make_assertion("test@3.com", "", "3.com")
self.assertTrue(verifier.verify(assertion3))
- self.assertEquals(len(certs.cache), 2)
+ self.assertEquals(len(supportdocs.cache), 2)
# Make it error out if re-fetching any keys
- with patched_key_fetching(exc=RuntimeError("key fetch disabled")):
+ exc = RuntimeError("key fetch disabled")
+ with patched_supportdoc_fetching(exc=exc):
# It should have to re-fetch for 1, but not 2.
self.assertTrue(verifier.verify(assertion2))
self.assertRaises(RuntimeError, verifier.verify, assertion1)
def test_cache_eviction_during_write(self):
- certs = CertificatesManager(cache_timeout=0.1)
- verifier = LocalVerifier(["*"], certs=certs, warning=False)
+ supportdocs = SupportDocumentManager(cache_timeout=0.1)
+ verifier = LocalVerifier(["*"], supportdocs=supportdocs,
+ warning=False)
# Prime the cache by verifying an assertion.
assertion1 = make_assertion("test@1.com", "", "1.com")
self.assertTrue(verifier.verify(assertion1))
- self.assertEquals(len(certs.cache), 1)
+ self.assertEquals(len(supportdocs.cache), 1)
# Let that cached key expire
time.sleep(0.1)
# Now grab a different key; caching it should purge the expired key.
assertion2 = make_assertion("test@2.com", "", "2.com")
self.assertTrue(verifier.verify(assertion2))
- self.assertEquals(len(certs.cache), 1)
+ self.assertEquals(len(supportdocs.cache), 1)
# Check that only the second entry is in cache.
- with patched_key_fetching(exc=RuntimeError("key fetch disabled")):
+ exc = RuntimeError("key fetch disabled")
+ with patched_supportdoc_fetching(exc=exc):
self.assertTrue(verifier.verify(assertion2))
self.assertRaises(RuntimeError, verifier.verify, assertion1)
View
29 browserid/verifiers/local.py
@@ -7,17 +7,13 @@
from browserid import jwt
from browserid.verifiers import Verifier
-from browserid.certificates import CertificatesManager
+from browserid.supportdoc import SupportDocumentManager
from browserid.utils import unbundle_certs_and_assertion
from browserid.errors import (InvalidSignatureError,
ExpiredSignatureError,
UnsupportedCertChainError)
-DEFAULT_TRUSTED_SECONDARIES = ("browserid.org", "diresworb.org",
- "dev.diresworb.org")
-
-
class LocalVerifier(Verifier):
"""Class for local verification of BrowserID identity assertions.
@@ -26,17 +22,11 @@ class LocalVerifier(Verifier):
verify() method and let it work its magic.
"""
- def __init__(self, audiences=None, trusted_secondaries=None, certs=None,
- warning=True):
- if trusted_secondaries is None:
- trusted_secondaries = DEFAULT_TRUSTED_SECONDARIES
-
- if certs is None:
- certs = CertificatesManager()
-
+ def __init__(self, audiences=None, trusted_secondaries=None,
+ supportdocs=None, warning=True):
super(LocalVerifier, self).__init__(audiences)
self.trusted_secondaries = trusted_secondaries
- self.certs = certs
+ self.supportdocs = supportdocs or SupportDocumentManager()
if warning:
_emit_warning()
@@ -87,10 +77,11 @@ def verify(self, assertion, audience=None, now=None):
# No point doing all that crypto if we're going to fail out anyway.
email = certificates[-1].payload["principal"]["email"]
root_issuer = certificates[0].payload["iss"]
- if root_issuer not in self.trusted_secondaries:
- if not email.endswith("@" + root_issuer):
- msg = "untrusted root issuer: %s" % (root_issuer,)
- raise InvalidSignatureError(msg)
+ provider = email.split('@')[-1]
+ if not self.supportdocs.is_trusted_issuer(provider,
+ root_issuer, self.trusted_secondaries):
+ msg = "untrusted root issuer: %s" % (root_issuer,)
+ raise InvalidSignatureError(msg)
# Verify the entire chain of certificates.
cert = self.verify_certificate_chain(certificates, now=now)
@@ -126,7 +117,7 @@ def verify_certificate_chain(self, certificates, now=None):
if now is None:
now = int(time.time() * 1000)
root_issuer = certificates[0].payload["iss"]
- root_key = self.certs[root_issuer]
+ root_key = self.supportdocs.get_key(root_issuer)
current_key = root_key
for cert in certificates:
if cert.payload["exp"] < now:
Something went wrong with that request. Please try again.