diff --git a/mama_cas/cas.py b/mama_cas/cas.py index 9a105cc..2f53deb 100644 --- a/mama_cas/cas.py +++ b/mama_cas/cas.py @@ -7,13 +7,12 @@ from django.utils.module_loading import import_string from django.utils.translation import ugettext_lazy as _ +from mama_cas.exceptions import InvalidTicketSpec +from mama_cas.exceptions import ValidationError from mama_cas.models import ServiceTicket from mama_cas.models import ProxyTicket from mama_cas.models import ProxyGrantingTicket -from mama_cas.exceptions import InvalidTicketSpec -from mama_cas.exceptions import ValidationError -from mama_cas.utils import get_config - +from mama_cas.services import get_callbacks logger = logging.getLogger(__name__) @@ -105,7 +104,7 @@ def get_attributes(user, service): warnings.warn( 'The MAMA_CAS_ATTRIBUTE_CALLBACKS setting is deprecated. Service callbacks ' 'should be configured using MAMA_CAS_VALID_SERVICES.', DeprecationWarning) - callbacks.extend(get_config(service, 'CALLBACKS')) + callbacks.extend(get_callbacks(service)) for path in callbacks: callback = import_string(path) diff --git a/mama_cas/models.py b/mama_cas/models.py index 5f695a1..cbcfed1 100644 --- a/mama_cas/models.py +++ b/mama_cas/models.py @@ -24,12 +24,14 @@ from mama_cas.exceptions import UnauthorizedServiceProxy from mama_cas.exceptions import ValidationError from mama_cas.request import SingleSignOutRequest +from mama_cas.services import get_logout_url +from mama_cas.services import logout_allowed +from mama_cas.services import service_allowed +from mama_cas.services import proxy_allowed +from mama_cas.services import proxy_callback_allowed from mama_cas.utils import add_query_params from mama_cas.utils import clean_service_url -from mama_cas.utils import get_config from mama_cas.utils import is_scheme_https -from mama_cas.utils import is_valid_service -from mama_cas.utils import is_valid_proxy_callback from mama_cas.utils import match_service if gevent: @@ -104,7 +106,7 @@ def validate_ticket(self, ticket, service, renew=False, require_https=False): if require_https and not is_scheme_https(service): raise InvalidService("Service %s is not HTTPS" % service) - if not is_valid_service(service): + if not service_allowed(service): raise InvalidService("Service %s is not a valid %s URL" % (service, t.name)) @@ -266,10 +268,10 @@ def request_sign_out(self): Send a POST request to the ``ServiceTicket``s logout URL to request sign-out. """ - if not get_config(self.service, 'LOGOUT_ALLOW'): + if not logout_allowed(self.service): return request = SingleSignOutRequest(context={'ticket': self}) - url = get_config(self.service, 'LOGOUT_URL') or self.service + url = get_logout_url(self.service) or self.service try: resp = requests.post(url, data={'logoutRequest': request.render_content()}) resp.raise_for_status() @@ -319,13 +321,13 @@ def create_ticket(self, service, pgturl, **kwargs): def validate_callback(self, service, pgturl, pgtid, pgtiou): """Verify the provided proxy callback URL.""" - if not get_config(service, 'PROXY_ALLOW'): + if not proxy_allowed(service): raise UnauthorizedServiceProxy("%s is not authorized to use proxy authentication" % service) if not is_scheme_https(pgturl): raise InvalidProxyCallback("Proxy callback %s is not HTTPS" % pgturl) - if not is_valid_proxy_callback(service, pgturl): + if not proxy_callback_allowed(service, pgturl): raise InvalidProxyCallback("%s is not an authorized proxy callback URL" % pgturl) # Check the proxy callback URL and SSL certificate diff --git a/mama_cas/services/__init__.py b/mama_cas/services/__init__.py new file mode 100644 index 0000000..430b071 --- /dev/null +++ b/mama_cas/services/__init__.py @@ -0,0 +1,52 @@ +from django.utils.module_loading import import_string + + +def _get_backends(): + backends = [] + for backend_path in ['mama_cas.services.backends.SettingsBackend']: + backend = import_string(backend_path)() + backends.append(backend) + return backends + + +def _is_allowed(attr, *args): + for backend in _get_backends(): + try: + if getattr(backend, attr)(*args): + return True + except AttributeError: + continue + return False + + +def get_callbacks(service): + for backend in _get_backends(): + callbacks = backend.get_callbacks(service) + if callbacks: + # TODO merge callback dicts? + return callbacks + return [] + + +def get_logout_url(service): + for backend in _get_backends(): + logout_url = backend.get_logout_url(service) + if logout_url: + return logout_url + return None + + +def logout_allowed(service): + return _is_allowed('logout_allowed', service) + + +def proxy_allowed(service): + return _is_allowed('proxy_allowed', service) + + +def proxy_callback_allowed(service, pgturl): + return _is_allowed('proxy_callback_allowed', service, pgturl) + + +def service_allowed(service): + return _is_allowed('service_allowed', service) diff --git a/mama_cas/services/backends.py b/mama_cas/services/backends.py new file mode 100644 index 0000000..3a31603 --- /dev/null +++ b/mama_cas/services/backends.py @@ -0,0 +1,99 @@ +import re +import warnings + +from django.conf import settings +from django.core.exceptions import ImproperlyConfigured +from django.utils import six +from django.utils.functional import cached_property + + +class ServiceConfig(object): + PROXY_ALLOW_DEFAULT = False + CALLBACKS_DEFAULT = [] + LOGOUT_ALLOW_DEFAULT = False + LOGOUT_URL_DEFAULT = None + + @cached_property + def services(self): + services = [] + + for service in getattr(settings, 'MAMA_CAS_VALID_SERVICES', []): + if isinstance(service, six.string_types): + warnings.warn( + 'Service URL configuration is changing. Check the documentation ' + 'for the MAMA_CAS_VALID_SERVICES setting.', DeprecationWarning) + match = re.compile(service) + service = {'SERVICE': service} + else: + service = service.copy() + try: + match = re.compile(service['SERVICE']) + except KeyError: + raise ImproperlyConfigured( + 'Missing SERVICE key for service configuration. ' + 'Check your MAMA_CAS_VALID_SERVICES setting.') + + service['MATCH'] = match + # TODO For transitional backwards compatibility, this defaults to True. + service.setdefault('PROXY_ALLOW', True) + service.setdefault('CALLBACKS', self.CALLBACKS_DEFAULT) + service.setdefault('LOGOUT_ALLOW', self.LOGOUT_ALLOW_DEFAULT) + service.setdefault('LOGOUT_URL', self.LOGOUT_URL_DEFAULT) + try: + service['PROXY_PATTERN'] = re.compile(service['PROXY_PATTERN']) + except KeyError: + pass + services.append(service) + + return services + + def get_service(self, s): + for service in self.services: + if service['MATCH'].match(s): + return service + return {} + + def get_config(self, service, setting): + """Access the configuration for a given service and setting.""" + try: + return self.get_service(service)[setting] + except KeyError: + return getattr(self, setting + '_DEFAULT') + + def is_valid(self, s): + if not self.services: + return True + return bool(self.get_service(s)) + + +services = ServiceConfig() + + +class SettingsBackend(object): + def get_callbacks(self, service): + return services.get_config(service, 'CALLBACKS') + + def get_logout_url(self, service): + return services.get_config(service, 'LOGOUT_URL') + + def logout_allowed(self, service): + return services.get_config(service, 'LOGOUT_ALLOW') + + def proxy_allowed(self, service): + return services.get_config(service, 'PROXY_ALLOW') + + def proxy_callback_allowed(self, service, pgturl): + """ + """ + try: + return services.get_config(service, 'PROXY_PATTERN').match(pgturl) + except AttributeError: + # TODO For transitional backwards compatibility, check against valid services + return self.service_allowed(pgturl) + + def service_allowed(self, service): + """ + """ + if not service: + return False + return services.is_valid(service) diff --git a/mama_cas/tests/test_services.py b/mama_cas/tests/test_services.py new file mode 100644 index 0000000..092c247 --- /dev/null +++ b/mama_cas/tests/test_services.py @@ -0,0 +1,115 @@ +from django.core.exceptions import ImproperlyConfigured +from django.test import TestCase +from django.test.utils import modify_settings +from django.test.utils import override_settings + +from mama_cas.services import get_callbacks +from mama_cas.services import get_logout_url +from mama_cas.services import logout_allowed +from mama_cas.services import proxy_allowed +from mama_cas.services import proxy_callback_allowed +from mama_cas.services import service_allowed +from mama_cas.services.backends import services as cached_services + + +class UtilsTests(TestCase): + def tearDown(self): + try: + # Remove cached property so the valid services + # setting can be changed per-test + del cached_services.services + except AttributeError: + pass + + def test_get_callbacks(self): + """ + When callbacks are configured, ``get_callbacks()`` should return + a list of the configured callbacks and an empty list otherwise. + """ + self.assertEqual(get_callbacks('http://www.example.com'), ['mama_cas.callbacks.user_name_attributes']) + self.assertEqual(get_callbacks('http://example.org'), []) + + def test_get_logout_url(self): + """ + When a logout URL is configured, ``get_logout_url()`` should return + the URL and ``None`` otherwise. + """ + self.assertEqual(get_logout_url('http://www.example.com'), 'https://example.com/logout') + self.assertIsNone(get_logout_url('http://example.org')) + + def test_logout_allowed(self): + """ + When logout behavior is enabled, ``logout_allowed()`` should + return ``True`` and ``False`` otherwise. + """ + self.assertTrue(logout_allowed('http://www.example.com')) + self.assertFalse(logout_allowed('http://example.com')) + self.assertFalse(logout_allowed('http://www.example.org')) + + @modify_settings(MAMA_CAS_VALID_SERVICES={ + 'append': [{'SERVICE': 'http://example\.org/proxy'}] + }) + def test_proxy_allowed(self): + """ + When proxy behavior is enabled, ``proxy_allowed()`` should + return ``True`` and ``False`` otherwise. If it is not + configured at all, ``True`` should be returned. + """ + self.assertTrue(proxy_allowed('http://www.example.com')) + self.assertTrue(proxy_allowed('http://example.org/proxy')) + self.assertFalse(proxy_allowed('http://example.com')) + self.assertFalse(proxy_allowed('http://www.example.org')) + + def test_proxy_callback_allowed(self): + """ + When a proxy callback is configured, ``proxy_callback_allowed()`` + should return ``True`` if the pgturl matches the pattern and + ``False`` otherwise. + """ + self.assertTrue(proxy_callback_allowed('https://www.example.com', 'https://www.example.com')) + self.assertTrue(proxy_callback_allowed('http://example.org', 'https://www.example.com')) + self.assertFalse(proxy_callback_allowed('http://example.org', 'http://example.org')) + + @override_settings(MAMA_CAS_VALID_SERVICES=('http://.*\.example\.com',)) + def test_service_allowed_tuple(self): + """ + When valid services are configured, ``service_allowed()`` + should return ``True`` if the provided URL matches, and + ``False`` otherwise. + """ + self.assertTrue(service_allowed('http://www.example.com')) + self.assertFalse(service_allowed('http://www.example.org')) + + def test_service_allowed(self): + """ + When valid services are configured, ``service_allowed()`` + should return ``True`` if the provided URL matches, and + ``False`` otherwise. + """ + self.assertTrue(service_allowed('http://www.example.com')) + self.assertFalse(service_allowed('http://www.example.org')) + + @override_settings(MAMA_CAS_VALID_SERVICES=()) + def test_empty_valid_services_tuple(self): + """ + When no valid services are configured, + ``service_allowed()`` should return ``True``. + """ + self.assertTrue(service_allowed('http://www.example.com')) + + @override_settings(MAMA_CAS_VALID_SERVICES=[]) + def test_empty_valid_services(self): + """ + When no valid services are configured, + ``service_allowed()`` should return ``True``. + """ + self.assertTrue(service_allowed('http://www.example.com')) + + @override_settings(MAMA_CAS_VALID_SERVICES=[{}]) + def test_invalid_valid_services(self): + """ + When invalid services are configured, ``service_allowed()`` + should raise ``ImproperlyConfigured``. + """ + with self.assertRaises(ImproperlyConfigured): + service_allowed('http://www.example.com') diff --git a/mama_cas/tests/test_utils.py b/mama_cas/tests/test_utils.py index 2e4860d..db107c1 100644 --- a/mama_cas/tests/test_utils.py +++ b/mama_cas/tests/test_utils.py @@ -1,31 +1,16 @@ # -*- coding: utf-8 -*- -from django.core.exceptions import ImproperlyConfigured from django.test import TestCase -from django.test.utils import modify_settings -from django.test.utils import override_settings -from mama_cas.utils import services as service_config from mama_cas.utils import add_query_params -from mama_cas.utils import get_config from mama_cas.utils import clean_service_url from mama_cas.utils import is_scheme_https -from mama_cas.utils import is_valid_proxy_callback -from mama_cas.utils import is_valid_service from mama_cas.utils import match_service from mama_cas.utils import redirect from mama_cas.utils import to_bool class UtilsTests(TestCase): - def tearDown(self): - try: - # Remove cached property so the valid services - # setting can be changed per-test - del service_config.services - except AttributeError: - pass - def test_add_query_params(self): """ When called with a URL and a dict of parameters, @@ -80,72 +65,6 @@ def test_match_service(self): self.assertFalse(match_service('https://www.example.com:80/', 'https://www.example.com/')) self.assertFalse(match_service('https://www.example.com', 'https://www.example.com/')) - @override_settings(MAMA_CAS_VALID_SERVICES=('http://.*\.example\.com',)) - def test_is_valid_service_tuple(self): - """ - When valid services are configured, ``is_valid_service()`` - should return ``True`` if the provided URL matches, and - ``False`` otherwise. - """ - self.assertTrue(is_valid_service('http://www.example.com')) - self.assertFalse(is_valid_service('http://www.example.org')) - - def test_is_valid_service(self): - """ - When valid services are configured, ``is_valid_service()`` - should return ``True`` if the provided URL matches, and - ``False`` otherwise. - """ - self.assertTrue(is_valid_service('http://www.example.com')) - self.assertFalse(is_valid_service('http://www.example.org')) - - @override_settings(MAMA_CAS_VALID_SERVICES=()) - def test_empty_valid_services_tuple(self): - """ - When no valid services are configured, - ``is_valid_service()`` should return ``True``. - """ - self.assertTrue(is_valid_service('http://www.example.com')) - - @override_settings(MAMA_CAS_VALID_SERVICES=[]) - def test_empty_valid_services(self): - """ - When no valid services are configured, - ``is_valid_service()`` should return ``True``. - """ - self.assertTrue(is_valid_service('http://www.example.com')) - - @override_settings(MAMA_CAS_VALID_SERVICES=[{}]) - def test_invalid_valid_services(self): - """ - When invalid services are configured, ``is_valid_service`` - should raise ``ImproperlyConfigured``. - """ - with self.assertRaises(ImproperlyConfigured): - is_valid_service('http://www.example.com') - - @modify_settings(MAMA_CAS_VALID_SERVICES={ - 'append': [{'SERVICE': 'http://example\.com/proxy', 'PROXY_ALLOW': False}] - }) - def test_get_config(self): - """ - """ - self.assertTrue(get_config('http://www.example.com', 'PROXY_ALLOW')) - self.assertFalse(get_config('http://example.com/proxy', 'PROXY_ALLOW')) - self.assertFalse(get_config('http://example.org', 'PROXY_ALLOW')) - self.assertEqual(get_config('http://www.example.com', 'CALLBACKS'), - ['mama_cas.callbacks.user_name_attributes']) - self.assertEqual(get_config('http://example.org', 'CALLBACKS'), []) - - def test_is_valid_proxy_callback(self): - """ - When a valid pgturl is provided, `is_valid_proxy_callback()` - should return `True`, otherwise it should return `False`. - """ - self.assertTrue(is_valid_proxy_callback('https://www.example.com', 'https://www.example.com')) - self.assertTrue(is_valid_proxy_callback('http://example.org', 'https://www.example.com')) - self.assertFalse(is_valid_proxy_callback('http://example.org', 'http://example.org')) - def test_redirect(self): """ When redirecting, params should be injected on the redirection diff --git a/mama_cas/utils.py b/mama_cas/utils.py index df12fb0..4bd19b1 100644 --- a/mama_cas/utils.py +++ b/mama_cas/utils.py @@ -1,88 +1,21 @@ import logging -import re -import warnings from django.conf import settings -from django.core.exceptions import ImproperlyConfigured from django.core.exceptions import PermissionDenied from django.core import urlresolvers from django.http import HttpResponseRedirect -from django.utils import six from django.utils.encoding import force_bytes -from django.utils.functional import cached_property -from .compat import parse_qsl -from .compat import urlencode -from .compat import urlparse -from .compat import urlunparse +from mama_cas.compat import parse_qsl +from mama_cas.compat import urlencode +from mama_cas.compat import urlparse +from mama_cas.compat import urlunparse +from mama_cas.services import service_allowed logger = logging.getLogger(__name__) -class ServiceConfig(object): - PROXY_ALLOW_DEFAULT = False - CALLBACKS_DEFAULT = [] - LOGOUT_ALLOW_DEFAULT = False - LOGOUT_URL_DEFAULT = None - - @cached_property - def services(self): - services = [] - - for service in getattr(settings, 'MAMA_CAS_VALID_SERVICES', []): - if isinstance(service, six.string_types): - warnings.warn( - 'Service URL configuration is changing. Check the documentation ' - 'for the MAMA_CAS_VALID_SERVICES setting.', DeprecationWarning) - match = re.compile(service) - service = {'SERVICE': service} - else: - service = service.copy() - try: - match = re.compile(service['SERVICE']) - except KeyError: - raise ImproperlyConfigured( - 'Missing SERVICE key for service configuration. ' - 'Check your MAMA_CAS_VALID_SERVICES setting.') - - service['MATCH'] = match - # TODO For transitional backwards compatibility, this defaults to True. - service.setdefault('PROXY_ALLOW', True) - service.setdefault('CALLBACKS', self.CALLBACKS_DEFAULT) - service.setdefault('LOGOUT_ALLOW', self.LOGOUT_ALLOW_DEFAULT) - service.setdefault('LOGOUT_URL', self.LOGOUT_URL_DEFAULT) - try: - service['PROXY_PATTERN'] = re.compile(service['PROXY_PATTERN']) - except KeyError: - pass - services.append(service) - - return services - - def get_service(self, s): - for service in self.services: - if service['MATCH'].match(s): - return service - return {} - - def is_valid(self, s): - if not self.services: - return True - return bool(self.get_service(s)) - - -services = ServiceConfig() - - -def get_config(service, setting): - """Access the configuration for a given service and setting.""" - try: - return services.get_service(service)[setting] - except KeyError: - return getattr(services, setting + '_DEFAULT') - - def add_query_params(url, params): """ Inject additional query parameters into an existing URL. If @@ -130,30 +63,6 @@ def match_service(service1, service2): return False -def is_valid_service(service): - """ - Check the provided service against the configured list of valid - services. - """ - if not service: - return False - return services.is_valid(service) - - -def is_valid_proxy_callback(service, pgturl): - """ - Check the provided proxy callback against the configured allowable - callback pattern. If no pattern is configured, return `True`. - """ - try: - return get_config(service, 'PROXY_PATTERN').match(pgturl) - except AttributeError: - # TODO For transitional backwards compatibility, check against valid services - if is_valid_service(pgturl): - return True - return False - - def redirect(to, *args, **kwargs): """ Similar to the Django ``redirect`` shortcut but with altered @@ -168,7 +77,7 @@ def redirect(to, *args, **kwargs): except urlresolvers.NoReverseMatch: if '/' not in to and '.' not in to: to = urlresolvers.reverse('cas_login') - elif not is_valid_service(to): + elif not service_allowed(to): raise PermissionDenied() if params: diff --git a/mama_cas/views.py b/mama_cas/views.py index 6795c4a..6f2dc2d 100644 --- a/mama_cas/views.py +++ b/mama_cas/views.py @@ -25,9 +25,9 @@ from mama_cas.response import ValidationResponse from mama_cas.response import ProxyResponse from mama_cas.response import SamlValidationResponse +from mama_cas.services import service_allowed from mama_cas.utils import add_query_params from mama_cas.utils import clean_service_url -from mama_cas.utils import is_valid_service from mama_cas.utils import redirect from mama_cas.utils import to_bool @@ -146,7 +146,7 @@ def get(self, request, *args, **kwargs): service = request.GET.get('service') ticket = request.GET.get('ticket') - if not is_valid_service(service): + if not service_allowed(service): return redirect('cas_login') msg = _("Do you want to access %(service)s as %(user)s?") % {