From d17d3ea81beff69871e5d9faa06e6bdaf233c58b Mon Sep 17 00:00:00 2001 From: Federico Frenguelli Date: Thu, 15 Jan 2015 17:58:04 +0100 Subject: [PATCH] added support for querystring in redirection uris --- oauth2_provider/compat.py | 4 +- oauth2_provider/models.py | 18 ++- .../tests/test_authorization_code.py | 142 ++++++++++++++++++ oauth2_provider/tests/test_implicit.py | 40 +++++ 4 files changed, 200 insertions(+), 4 deletions(-) diff --git a/oauth2_provider/compat.py b/oauth2_provider/compat.py index e82e4ef97..86acb03e3 100644 --- a/oauth2_provider/compat.py +++ b/oauth2_provider/compat.py @@ -10,9 +10,9 @@ # urlparse in python3 has been renamed to urllib.parse try: - from urlparse import urlparse, parse_qs, urlunparse + from urlparse import urlparse, parse_qs, parse_qsl, urlunparse except ImportError: - from urllib.parse import urlparse, parse_qs, urlunparse + from urllib.parse import urlparse, parse_qs, parse_qsl, urlunparse try: from urllib import urlencode, unquote_plus diff --git a/oauth2_provider/models.py b/oauth2_provider/models.py index 688d64834..261276ecb 100644 --- a/oauth2_provider/models.py +++ b/oauth2_provider/models.py @@ -14,7 +14,7 @@ from django.core.exceptions import ImproperlyConfigured from .settings import oauth2_settings -from .compat import AUTH_USER_MODEL +from .compat import AUTH_USER_MODEL, parse_qsl, urlparse from .generators import generate_client_secret, generate_client_id from .validators import validate_uris @@ -94,7 +94,21 @@ def redirect_uri_allowed(self, uri): :param uri: Url to check """ - return uri in self.redirect_uris.split() + for allowed_uri in self.redirect_uris.split(): + parsed_allowed_uri = urlparse(allowed_uri) + parsed_uri = urlparse(uri) + + if (parsed_allowed_uri.scheme == parsed_uri.scheme and + parsed_allowed_uri.netloc == parsed_uri.netloc and + parsed_allowed_uri.path == parsed_uri.path): + + aqs_set = set(parse_qsl(parsed_allowed_uri.query)) + uqs_set = set(parse_qsl(parsed_uri.query)) + + if aqs_set.issubset(uqs_set): + return True + + return False def clean(self): from django.core.exceptions import ValidationError diff --git a/oauth2_provider/tests/test_authorization_code.py b/oauth2_provider/tests/test_authorization_code.py index af2d94aa7..9277226e7 100644 --- a/oauth2_provider/tests/test_authorization_code.py +++ b/oauth2_provider/tests/test_authorization_code.py @@ -403,6 +403,46 @@ def test_code_post_auth_deny_custom_redirect_uri_scheme(self): self.assertIn('custom-scheme://example.com?', response['Location']) self.assertIn("error=access_denied", response['Location']) + def test_code_post_auth_redirection_uri_with_querystring(self): + """ + Tests that a redirection uri with query string is allowed + and query string is retained on redirection. + See http://tools.ietf.org/html/rfc6749#section-3.1.2 + """ + self.client.login(username="test_user", password="123456") + + form_data = { + 'client_id': self.application.client_id, + 'state': 'random_state_string', + 'scope': 'read write', + 'redirect_uri': 'http://example.com?foo=bar', + 'response_type': 'code', + 'allow': True, + } + + response = self.client.post(reverse('oauth2_provider:authorize'), data=form_data) + self.assertEqual(response.status_code, 302) + self.assertIn("http://example.com?foo=bar", response['Location']) + self.assertIn("code=", response['Location']) + + def test_code_post_auth_fails_when_redirect_uri_path_is_invalid(self): + """ + Tests that a redirection uri is matched using scheme + netloc + path + """ + self.client.login(username="test_user", password="123456") + + form_data = { + 'client_id': self.application.client_id, + 'state': 'random_state_string', + 'scope': 'read write', + 'redirect_uri': 'http://example.com/a?foo=bar', + 'response_type': 'code', + 'allow': True, + } + + response = self.client.post(reverse('oauth2_provider:authorize'), data=form_data) + self.assertEqual(response.status_code, 400) + class TestAuthorizationCodeTokenView(BaseTest): def get_auth(self): @@ -759,6 +799,108 @@ def test_malicious_redirect_uri(self): response = self.client.post(reverse('oauth2_provider:token'), data=token_request_data) self.assertEqual(response.status_code, 401) + def test_code_exchange_succeed_when_redirect_uri_match(self): + """ + Tests code exchange succeed when redirect uri matches the one used for code request + """ + self.client.login(username="test_user", password="123456") + + # retrieve a valid authorization code + authcode_data = { + 'client_id': self.application.client_id, + 'state': 'random_state_string', + 'scope': 'read write', + 'redirect_uri': 'http://example.it?foo=bar', + 'response_type': 'code', + 'allow': True, + } + response = self.client.post(reverse('oauth2_provider:authorize'), data=authcode_data) + query_dict = parse_qs(urlparse(response['Location']).query) + authorization_code = query_dict['code'].pop() + + # exchange authorization code for a valid access token + token_request_data = { + 'grant_type': 'authorization_code', + 'code': authorization_code, + 'redirect_uri': 'http://example.it?foo=bar' + } + auth_headers = self.get_basic_auth_header(self.application.client_id, self.application.client_secret) + + response = self.client.post(reverse('oauth2_provider:token'), data=token_request_data, **auth_headers) + self.assertEqual(response.status_code, 200) + + content = json.loads(response.content.decode("utf-8")) + self.assertEqual(content['token_type'], "Bearer") + self.assertEqual(content['scope'], "read write") + self.assertEqual(content['expires_in'], oauth2_settings.ACCESS_TOKEN_EXPIRE_SECONDS) + + def test_code_exchange_fails_when_redirect_uri_does_not_match(self): + """ + Tests code exchange fails when redirect uri does not match the one used for code request + """ + self.client.login(username="test_user", password="123456") + + # retrieve a valid authorization code + authcode_data = { + 'client_id': self.application.client_id, + 'state': 'random_state_string', + 'scope': 'read write', + 'redirect_uri': 'http://example.it?foo=bar', + 'response_type': 'code', + 'allow': True, + } + response = self.client.post(reverse('oauth2_provider:authorize'), data=authcode_data) + query_dict = parse_qs(urlparse(response['Location']).query) + authorization_code = query_dict['code'].pop() + + # exchange authorization code for a valid access token + token_request_data = { + 'grant_type': 'authorization_code', + 'code': authorization_code, + 'redirect_uri': 'http://example.it?foo=baraa' + } + auth_headers = self.get_basic_auth_header(self.application.client_id, self.application.client_secret) + + response = self.client.post(reverse('oauth2_provider:token'), data=token_request_data, **auth_headers) + self.assertEqual(response.status_code, 401) + + def test_code_exchange_succeed_when_redirect_uri_match_with_multiple_query_params(self): + """ + Tests code exchange succeed when redirect uri matches the one used for code request + """ + self.client.login(username="test_user", password="123456") + self.application.redirect_uris = "http://localhost http://example.com?foo=bar" + self.application.save() + + # retrieve a valid authorization code + authcode_data = { + 'client_id': self.application.client_id, + 'state': 'random_state_string', + 'scope': 'read write', + 'redirect_uri': 'http://example.com?bar=baz&foo=bar', + 'response_type': 'code', + 'allow': True, + } + response = self.client.post(reverse('oauth2_provider:authorize'), data=authcode_data) + query_dict = parse_qs(urlparse(response['Location']).query) + authorization_code = query_dict['code'].pop() + + # exchange authorization code for a valid access token + token_request_data = { + 'grant_type': 'authorization_code', + 'code': authorization_code, + 'redirect_uri': 'http://example.com?bar=baz&foo=bar' + } + auth_headers = self.get_basic_auth_header(self.application.client_id, self.application.client_secret) + + response = self.client.post(reverse('oauth2_provider:token'), data=token_request_data, **auth_headers) + self.assertEqual(response.status_code, 200) + + content = json.loads(response.content.decode("utf-8")) + self.assertEqual(content['token_type'], "Bearer") + self.assertEqual(content['scope'], "read write") + self.assertEqual(content['expires_in'], oauth2_settings.ACCESS_TOKEN_EXPIRE_SECONDS) + class TestAuthorizationCodeProtectedResource(BaseTest): def test_resource_access_allowed(self): diff --git a/oauth2_provider/tests/test_implicit.py b/oauth2_provider/tests/test_implicit.py index 936e96674..9293616d7 100644 --- a/oauth2_provider/tests/test_implicit.py +++ b/oauth2_provider/tests/test_implicit.py @@ -185,6 +185,46 @@ def test_token_post_auth_deny(self): self.assertEqual(response.status_code, 302) self.assertIn("error=access_denied", response['Location']) + def test_implicit_redirection_uri_with_querystring(self): + """ + Tests that a redirection uri with query string is allowed + and query string is retained on redirection. + See http://tools.ietf.org/html/rfc6749#section-3.1.2 + """ + self.client.login(username="test_user", password="123456") + + form_data = { + 'client_id': self.application.client_id, + 'state': 'random_state_string', + 'scope': 'read write', + 'redirect_uri': 'http://example.com?foo=bar', + 'response_type': 'token', + 'allow': True, + } + + response = self.client.post(reverse('oauth2_provider:authorize'), data=form_data) + self.assertEqual(response.status_code, 302) + self.assertIn("http://example.com?foo=bar", response['Location']) + self.assertIn("access_token=", response['Location']) + + def test_implicit_fails_when_redirect_uri_path_is_invalid(self): + """ + Tests that a redirection uri is matched using scheme + netloc + path + """ + self.client.login(username="test_user", password="123456") + + form_data = { + 'client_id': self.application.client_id, + 'state': 'random_state_string', + 'scope': 'read write', + 'redirect_uri': 'http://example.com/a?foo=bar', + 'response_type': 'code', + 'allow': True, + } + + response = self.client.post(reverse('oauth2_provider:authorize'), data=form_data) + self.assertEqual(response.status_code, 400) + class TestImplicitTokenView(BaseTest): def test_resource_access_allowed(self):