-
Notifications
You must be signed in to change notification settings - Fork 111
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Initial implementation of configurable service backends
- Loading branch information
Showing
8 changed files
with
288 additions
and
193 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
from django.utils.module_loading import import_string | ||
|
||
|
||
def _get_backends(): | ||
backends = [] | ||
for backend_path in ['mama_cas.services.backends.SettingsBackend']: | ||
backend = import_string(backend_path)() | ||
backends.append(backend) | ||
return backends | ||
|
||
|
||
def _is_allowed(attr, *args): | ||
for backend in _get_backends(): | ||
try: | ||
if getattr(backend, attr)(*args): | ||
return True | ||
except AttributeError: | ||
continue | ||
return False | ||
|
||
|
||
def get_callbacks(service): | ||
for backend in _get_backends(): | ||
callbacks = backend.get_callbacks(service) | ||
if callbacks: | ||
# TODO merge callback dicts? | ||
return callbacks | ||
return [] | ||
|
||
|
||
def get_logout_url(service): | ||
for backend in _get_backends(): | ||
logout_url = backend.get_logout_url(service) | ||
if logout_url: | ||
return logout_url | ||
return None | ||
|
||
|
||
def logout_allowed(service): | ||
return _is_allowed('logout_allowed', service) | ||
|
||
|
||
def proxy_allowed(service): | ||
return _is_allowed('proxy_allowed', service) | ||
|
||
|
||
def proxy_callback_allowed(service, pgturl): | ||
return _is_allowed('proxy_callback_allowed', service, pgturl) | ||
|
||
|
||
def service_allowed(service): | ||
return _is_allowed('service_allowed', service) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
import re | ||
import warnings | ||
|
||
from django.conf import settings | ||
from django.core.exceptions import ImproperlyConfigured | ||
from django.utils import six | ||
from django.utils.functional import cached_property | ||
|
||
|
||
class ServiceConfig(object): | ||
PROXY_ALLOW_DEFAULT = False | ||
CALLBACKS_DEFAULT = [] | ||
LOGOUT_ALLOW_DEFAULT = False | ||
LOGOUT_URL_DEFAULT = None | ||
|
||
@cached_property | ||
def services(self): | ||
services = [] | ||
|
||
for service in getattr(settings, 'MAMA_CAS_VALID_SERVICES', []): | ||
if isinstance(service, six.string_types): | ||
warnings.warn( | ||
'Service URL configuration is changing. Check the documentation ' | ||
'for the MAMA_CAS_VALID_SERVICES setting.', DeprecationWarning) | ||
match = re.compile(service) | ||
service = {'SERVICE': service} | ||
else: | ||
service = service.copy() | ||
try: | ||
match = re.compile(service['SERVICE']) | ||
except KeyError: | ||
raise ImproperlyConfigured( | ||
'Missing SERVICE key for service configuration. ' | ||
'Check your MAMA_CAS_VALID_SERVICES setting.') | ||
|
||
service['MATCH'] = match | ||
# TODO For transitional backwards compatibility, this defaults to True. | ||
service.setdefault('PROXY_ALLOW', True) | ||
service.setdefault('CALLBACKS', self.CALLBACKS_DEFAULT) | ||
service.setdefault('LOGOUT_ALLOW', self.LOGOUT_ALLOW_DEFAULT) | ||
service.setdefault('LOGOUT_URL', self.LOGOUT_URL_DEFAULT) | ||
try: | ||
service['PROXY_PATTERN'] = re.compile(service['PROXY_PATTERN']) | ||
except KeyError: | ||
pass | ||
services.append(service) | ||
|
||
return services | ||
|
||
def get_service(self, s): | ||
for service in self.services: | ||
if service['MATCH'].match(s): | ||
return service | ||
return {} | ||
|
||
def get_config(self, service, setting): | ||
"""Access the configuration for a given service and setting.""" | ||
try: | ||
return self.get_service(service)[setting] | ||
except KeyError: | ||
return getattr(self, setting + '_DEFAULT') | ||
|
||
def is_valid(self, s): | ||
if not self.services: | ||
return True | ||
return bool(self.get_service(s)) | ||
|
||
|
||
services = ServiceConfig() | ||
|
||
|
||
class SettingsBackend(object): | ||
def get_callbacks(self, service): | ||
return services.get_config(service, 'CALLBACKS') | ||
|
||
def get_logout_url(self, service): | ||
return services.get_config(service, 'LOGOUT_URL') | ||
|
||
def logout_allowed(self, service): | ||
return services.get_config(service, 'LOGOUT_ALLOW') | ||
|
||
def proxy_allowed(self, service): | ||
return services.get_config(service, 'PROXY_ALLOW') | ||
|
||
def proxy_callback_allowed(self, service, pgturl): | ||
""" | ||
""" | ||
try: | ||
return services.get_config(service, 'PROXY_PATTERN').match(pgturl) | ||
except AttributeError: | ||
# TODO For transitional backwards compatibility, check against valid services | ||
return self.service_allowed(pgturl) | ||
|
||
def service_allowed(self, service): | ||
""" | ||
""" | ||
if not service: | ||
return False | ||
return services.is_valid(service) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
from django.core.exceptions import ImproperlyConfigured | ||
from django.test import TestCase | ||
from django.test.utils import modify_settings | ||
from django.test.utils import override_settings | ||
|
||
from mama_cas.services import get_callbacks | ||
from mama_cas.services import get_logout_url | ||
from mama_cas.services import logout_allowed | ||
from mama_cas.services import proxy_allowed | ||
from mama_cas.services import proxy_callback_allowed | ||
from mama_cas.services import service_allowed | ||
from mama_cas.services.backends import services as cached_services | ||
|
||
|
||
class UtilsTests(TestCase): | ||
def tearDown(self): | ||
try: | ||
# Remove cached property so the valid services | ||
# setting can be changed per-test | ||
del cached_services.services | ||
except AttributeError: | ||
pass | ||
|
||
def test_get_callbacks(self): | ||
""" | ||
When callbacks are configured, ``get_callbacks()`` should return | ||
a list of the configured callbacks and an empty list otherwise. | ||
""" | ||
self.assertEqual(get_callbacks('http://www.example.com'), ['mama_cas.callbacks.user_name_attributes']) | ||
self.assertEqual(get_callbacks('http://example.org'), []) | ||
|
||
def test_get_logout_url(self): | ||
""" | ||
When a logout URL is configured, ``get_logout_url()`` should return | ||
the URL and ``None`` otherwise. | ||
""" | ||
self.assertEqual(get_logout_url('http://www.example.com'), 'https://example.com/logout') | ||
self.assertIsNone(get_logout_url('http://example.org')) | ||
|
||
def test_logout_allowed(self): | ||
""" | ||
When logout behavior is enabled, ``logout_allowed()`` should | ||
return ``True`` and ``False`` otherwise. | ||
""" | ||
self.assertTrue(logout_allowed('http://www.example.com')) | ||
self.assertFalse(logout_allowed('http://example.com')) | ||
self.assertFalse(logout_allowed('http://www.example.org')) | ||
|
||
@modify_settings(MAMA_CAS_VALID_SERVICES={ | ||
'append': [{'SERVICE': 'http://example\.org/proxy'}] | ||
}) | ||
def test_proxy_allowed(self): | ||
""" | ||
When proxy behavior is enabled, ``proxy_allowed()`` should | ||
return ``True`` and ``False`` otherwise. If it is not | ||
configured at all, ``True`` should be returned. | ||
""" | ||
self.assertTrue(proxy_allowed('http://www.example.com')) | ||
self.assertTrue(proxy_allowed('http://example.org/proxy')) | ||
self.assertFalse(proxy_allowed('http://example.com')) | ||
self.assertFalse(proxy_allowed('http://www.example.org')) | ||
|
||
def test_proxy_callback_allowed(self): | ||
""" | ||
When a proxy callback is configured, ``proxy_callback_allowed()`` | ||
should return ``True`` if the pgturl matches the pattern and | ||
``False`` otherwise. | ||
""" | ||
self.assertTrue(proxy_callback_allowed('https://www.example.com', 'https://www.example.com')) | ||
self.assertTrue(proxy_callback_allowed('http://example.org', 'https://www.example.com')) | ||
self.assertFalse(proxy_callback_allowed('http://example.org', 'http://example.org')) | ||
|
||
@override_settings(MAMA_CAS_VALID_SERVICES=('http://.*\.example\.com',)) | ||
def test_service_allowed_tuple(self): | ||
""" | ||
When valid services are configured, ``service_allowed()`` | ||
should return ``True`` if the provided URL matches, and | ||
``False`` otherwise. | ||
""" | ||
self.assertTrue(service_allowed('http://www.example.com')) | ||
self.assertFalse(service_allowed('http://www.example.org')) | ||
|
||
def test_service_allowed(self): | ||
""" | ||
When valid services are configured, ``service_allowed()`` | ||
should return ``True`` if the provided URL matches, and | ||
``False`` otherwise. | ||
""" | ||
self.assertTrue(service_allowed('http://www.example.com')) | ||
self.assertFalse(service_allowed('http://www.example.org')) | ||
|
||
@override_settings(MAMA_CAS_VALID_SERVICES=()) | ||
def test_empty_valid_services_tuple(self): | ||
""" | ||
When no valid services are configured, | ||
``service_allowed()`` should return ``True``. | ||
""" | ||
self.assertTrue(service_allowed('http://www.example.com')) | ||
|
||
@override_settings(MAMA_CAS_VALID_SERVICES=[]) | ||
def test_empty_valid_services(self): | ||
""" | ||
When no valid services are configured, | ||
``service_allowed()`` should return ``True``. | ||
""" | ||
self.assertTrue(service_allowed('http://www.example.com')) | ||
|
||
@override_settings(MAMA_CAS_VALID_SERVICES=[{}]) | ||
def test_invalid_valid_services(self): | ||
""" | ||
When invalid services are configured, ``service_allowed()`` | ||
should raise ``ImproperlyConfigured``. | ||
""" | ||
with self.assertRaises(ImproperlyConfigured): | ||
service_allowed('http://www.example.com') |
Oops, something went wrong.