Permalink
Browse files

Merge pull request #13 from mozilla/rfk/delegated

Rfk/delegated
  • Loading branch information...
2 parents badbf31 + 3300749 commit 47bbed581cb29ff4549b7c0567547f17d3281f24 @almet almet committed Jul 16, 2012
View
@@ -5,6 +5,7 @@
use when M2Crypto is not available.
* Added "from_pem_data" and "to_pem_data" methods to Key objects.
Currently these are only available when M2Crypto is installed.
+ * Added support for delegation of authority; thanks @kylef.
0.6.1 - 2012-06-07
==================
View
@@ -1,3 +0,0 @@
-
- * delegation of authority for a domain
-
@@ -13,14 +13,18 @@
from browserid.errors import (ConnectionError,
InvalidIssuerError)
+DEFAULT_TRUSTED_SECONDARIES = ("browserid.org", "diresworb.org",
+ "dev.diresworb.org")
WELL_KNOWN_URL = "/.well-known/browserid"
+DEFAULT_MAX_DELEGATIONS = 6
-class CertificatesManager(object):
- """A simple certificate handler. It acts like a dictionary of
- certificates. The key being the hostname and the value the certificate
- itself. the certificate manager populates itself automatically, so you
- don't need to fetch the public key when you get a KeyError.
+class SupportDocumentManager(object):
+ """Manager for mapping hostnames to their BrowserID support documents.
+
+ This object handles the association of a hostname with its BrowserID
+ support document, if any. It automatically fetches support documents
+ as required and stores them in a cache for future reference.
"""
def __init__(self, cache=None, verify=None, **kwargs):
@@ -29,25 +33,75 @@ def __init__(self, cache=None, verify=None, **kwargs):
cache = FIFOCache(**kwargs)
self.cache = cache
- def __getitem__(self, hostname):
+ def get_support_document(self, hostname):
+ """Get the BrowserID support document for the given hostname."""
try:
# Use a cached key if available.
- (error, key) = self.cache[hostname]
+ (error, supportdoc) = self.cache[hostname]
except KeyError:
# Fetch the key afresh from the specified server.
# Cache any failures so we're not flooding bad hosts.
- error = key = None
+ error = supportdoc = None
try:
- key = self.fetch_public_key(hostname)
+ supportdoc = self.fetch_support_document(hostname)
except Exception, e: # NOQA
error = e
- self.cache[hostname] = (error, key)
+ self.cache[hostname] = (error, supportdoc)
if error is not None:
raise error
+ return supportdoc
+
+ def get_key(self, hostname):
+ """Get the public key for verifying assertions from the given host."""
+ supportdoc = self.get_support_document(hostname)
+ try:
+ key = supportdoc['public-key']
+ except KeyError:
+ raise InvalidIssuerError(
+ "Host %r doesn't provide a public key" % hostname)
+
return key
- def fetch_public_key(self, hostname):
- return fetch_public_key(hostname, verify=self.verify)
+ def fetch_support_document(self, hostname):
+ """Fetch the BrowserID support document for the given hostname."""
+ return fetch_support_document(hostname, verify=self.verify)
+
+ def is_trusted_issuer(self, hostname, issuer, trusted_secondaries=None,
+ max_delegations=DEFAULT_MAX_DELEGATIONS):
+ """Check whether an issuer is trusted to assert for a given hostname.
+
+ This method checks whether the given issuer is trusted to assert
+ identitiers for the given hostname. There are three cases in which
+ this can be true:
+
+ * The hostname and issuer and the same
+ * The issuer is in the list of trusted secondaries
+ * The hostname delegates authority to the issuer
+
+ You can disable the check for delegated authority by setting the
+ max_delegations argument to 0.
+ """
+ if hostname == issuer:
+ return True
+
+ if trusted_secondaries is None:
+ trusted_secondaries = DEFAULT_TRUSTED_SECONDARIES
+
+ if issuer in trusted_secondaries:
+ return True
+
+ num_delegations = 0
+ while num_delegations < max_delegations:
+ supportdoc = self.get_support_document(hostname)
+ authority = supportdoc.get("authority")
+ if authority is None:
+ break
+ if authority == issuer:
+ return True
+ hostname = authority
+ num_delegations += 1
+
+ return False
class FIFOCache(object):
@@ -74,9 +128,6 @@ def __getitem__(self, key):
This method retrieves the value cached under the given key, evicting
it from the cache if expired.
-
- If the key doesn't exist, it loads it using the fetch_public_key
- method.
"""
(timestamp, value) = self.items_map[key]
if self.cache_timeout:
@@ -152,26 +203,28 @@ def _get(url, verify):
raise ConnectionError(msg)
-def fetch_public_key(hostname, well_known_url=WELL_KNOWN_URL, verify=None):
- """Fetch the BrowserID public key for the given hostname.
+def fetch_support_document(hostname, well_known_url=None, verify=None):
+ """Fetch the BrowserID well-known file for the given hostname.
- This function uses the well-known BrowserID meta-data file to extract
- the public key for the given hostname.
+ This function fetches and parses the well-known BrowserID meta-data file.
:param verify: verify the certificate when requesting ssl resources
"""
+ if well_known_url is None:
+ well_known_url = WELL_KNOWN_URL
+
hostname = 'https://%s' % hostname
- # Try to find the public key. If it can't be found then we
+ # Try to find the support document. If it can't be found then we
# raise an InvalidIssuerError. Any other connection-related
# errors are passed back up to the caller.
response = _get(urljoin(hostname, well_known_url), verify=verify)
if response.status_code == 200:
try:
- key = json.loads(response.text)['public-key']
- except (ValueError, KeyError):
- raise InvalidIssuerError('Host %r has malformed public key '
- 'document' % hostname)
+ data = json.loads(response.text)
+ except ValueError:
+ raise InvalidIssuerError('Host %r has malformed BrowserID '
+ 'support document' % hostname)
else:
# The well-known file was not found, try falling back to
# just "/pk".
@@ -182,8 +235,10 @@ def fetch_public_key(hostname, well_known_url=WELL_KNOWN_URL, verify=None):
except ValueError:
raise InvalidIssuerError('Host %r has malformed BrowserID '
'metadata document' % hostname)
+
+ data = {"public-key": key}
else:
raise InvalidIssuerError('Host %r does not declare support for '
'BrowserID' % hostname)
- return key
+ return data
View
@@ -8,7 +8,7 @@
from browserid.utils import encode_bytes, bundle_certs_and_assertion
-from browserid import certificates
+from browserid import supportdoc
from browserid import jwt
# if unittest2 isn't available, assume that we are python 2.7
@@ -50,14 +50,24 @@ def _hex(value):
return value
-def fetch_public_key(hostname, verify=None):
- """Fetch the BrowserID public key for the given hostname.
+def fetch_support_document(hostname, verify=None):
+ """Fetch the BrowserID support document for the given hostname.
- Actually, this implementation generates the key locally based on
+ Actually, this implementation generates a key locally based on
a hash of the hostname. This lets us exercise all the crypto code
while using predictable local values.
"""
- return get_keypair(hostname)[0]
+
+ if hostname == "redirect.org":
+ return {"authority": "delegated.org"}
+
+ if hostname == "redirect-twice.org":
+ return {"authority": "redirect.org"}
+
+ if hostname == "infinite.org":
+ return {"authority": "infinite.org"}
+
+ return {"public-key": get_keypair(hostname)[0]}
def get_keypair(hostname):
@@ -146,7 +156,7 @@ def make_assertion(email, audience, issuer=None, exp=None,
@contextmanager
-def patched_key_fetching(replacement=None, exc=None):
+def patched_supportdoc_fetching(replacement=None, exc=None):
"""Patch the key fetching mechanism with the given callable.
This is to allow easier testing.
@@ -157,8 +167,18 @@ def raise_exception(*args, **kwargs):
if exc is not None:
replacement = raise_exception
if replacement is None:
- replacement = fetch_public_key
- old_callable = certificates.fetch_public_key
- certificates.fetch_public_key = replacement
+ replacement = fetch_support_document
+ old_callable = supportdoc.fetch_support_document
+ supportdoc.fetch_support_document = replacement
yield
- certificates.fetch_public_key = old_callable
+ supportdoc.fetch_support_document = old_callable
+
+
+def callwith(context):
+ """Decorator to call a function with a context manager."""
+ def decorator(func):
+ def wrapper(*args, **kwds):
+ with context:
+ return func(*args, **kwds)
+ return wrapper
+ return decorator
@@ -3,9 +3,11 @@
from mock import Mock, patch
from requests.exceptions import RequestException
-from browserid.certificates import fetch_public_key
+from browserid.supportdoc import fetch_support_document, SupportDocumentManager
from browserid.errors import ConnectionError, InvalidIssuerError
from browserid.tests.support import unittest
+from browserid.tests.support import (fetch_support_document as
+ patched_support_document)
# Retrieved from browserid.org on April 3rd 2012
@@ -25,7 +27,8 @@
class TestFetchPublicKey(unittest.TestCase):
- @patch('browserid.certificates.requests')
+
+ @patch('browserid.supportdoc.requests')
def _fetch(self, hostname, requests, well_known_url=None,
side_effect=None, response_text='', status_code=200):
response = Mock()
@@ -38,19 +41,27 @@ def _fetch(self, hostname, requests, well_known_url=None,
if well_known_url is not None:
kwargs['well_known_url'] = well_known_url
- return fetch_public_key(hostname, **kwargs)
+ supportdoc = fetch_support_document(hostname, **kwargs)
+
+ try:
+ key = supportdoc['public-key']
+ except KeyError:
+ raise InvalidIssuerError('Host %r has malformed public key '
+ 'document' % hostname)
+
+ return key
def test_connection_error(self):
"""If there is an error connecting, raise a ConnectionError."""
with self.assertRaises(ConnectionError):
self._fetch('test.com', side_effect=RequestException)
- @patch('browserid.certificates.fetch_public_key')
- def test_missing_well_known_document(self, fetch):
+ @patch('browserid.supportdoc.fetch_support_document')
+ def test_missing_support_document(self, fetch):
with self.assertRaises(InvalidIssuerError):
self._fetch('test.com', status_code=404)
- def test_malformed_well_known_document(self):
+ def test_malformed_support_document(self):
response_text = 'I AINT NO JSON, FOOL!'
with self.assertRaises(InvalidIssuerError):
self._fetch('test.com', response_text=response_text)
@@ -67,15 +78,50 @@ def post(url, data):
return response
post.called = False
- with patch('browserid.certificates.requests') as requests:
+ with patch('browserid.supportdoc.requests') as requests:
requests.post = post
with self.assertRaises(InvalidIssuerError):
- fetch_public_key('test.com')
+ fetch_support_document('test.com')
- def test_well_known_doc_with_no_public_key(self):
+ def test_support_document_with_no_public_key(self):
with self.assertRaises(InvalidIssuerError):
self._fetch('test.com', response_text='{}')
def test_successful_fetch(self):
key = self._fetch('test.com', response_text=BROWSERID_PK)
self.assertEquals(key, BROWSERID_PK_PY['public-key'])
+
+
+class TestTrustedIssuers(unittest.TestCase):
+ def setUp(self):
+ self.supportdocmgr = SupportDocumentManager()
+
+ def _is_trusted_issuer(self, *args, **kwds):
+ return self.supportdocmgr.is_trusted_issuer(*args, **kwds)
+
+ def test_trusted_secondaries(self):
+ self.assertTrue(self._is_trusted_issuer('test.com', 'browserid.org'))
+ self.assertFalse(self._is_trusted_issuer('test.com', 'browserid.org',
+ trusted_secondaries=[], max_delegations=0))
+
+ def test_hostname_issuer(self):
+ self.assertTrue(self._is_trusted_issuer('test.com', 'test.com'))
+ self.assertFalse(self._is_trusted_issuer('abc.com', 'test.com',
+ max_delegations=0))
+
+ @patch('browserid.supportdoc.fetch_support_document',
+ patched_support_document)
+ def test_delegated_primary(self):
+ self.assertTrue(self._is_trusted_issuer('redirect.org',
+ 'delegated.org'))
+
+ def test_disabled_delegated_primary(self):
+ self.assertFalse(self._is_trusted_issuer('redirect.org',
+ 'delegated.org', max_delegations=0))
+
+ @patch('browserid.supportdoc.fetch_support_document',
+ patched_support_document)
+ def test_infinite_delegated_primary_recursion(self):
+ self.assertFalse(self._is_trusted_issuer('infinite.org', None))
+ self.assertFalse(self._is_trusted_issuer('infinite.org',
+ 'delegated.org'))
Oops, something went wrong.

0 comments on commit 47bbed5

Please sign in to comment.