From 38c56b20185ed84989563283f5855f82ef6d91ae Mon Sep 17 00:00:00 2001 From: Mattia Procopio Date: Tue, 6 Feb 2018 15:47:38 +0100 Subject: [PATCH] Check that the Bearer header is properly formatted --- oauthlib/oauth2/rfc6749/tokens.py | 40 ++++++++------ tests/oauth2/rfc6749/test_tokens.py | 81 +++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+), 15 deletions(-) diff --git a/oauthlib/oauth2/rfc6749/tokens.py b/oauthlib/oauth2/rfc6749/tokens.py index 4ae20e0e..a7491f47 100644 --- a/oauthlib/oauth2/rfc6749/tokens.py +++ b/oauthlib/oauth2/rfc6749/tokens.py @@ -220,6 +220,24 @@ def signed_token_generator(request): return signed_token_generator +def get_token_from_header(request): + """ + Helper function to extract a token from the request header. + :param request: The request object + :return: Return the token or None if the Authorization header is malformed. + """ + token = None + + if 'Authorization' in request.headers: + split_header = request.headers.get('Authorization').split() + if len(split_header) == 2 and split_header[0] == 'Bearer': + token = split_header[1] + else: + token = request.access_token + + return token + + class TokenBase(object): def __call__(self, request, refresh_token=False): @@ -286,16 +304,12 @@ def create_token(self, request, refresh_token=False, save_token=True): return token def validate_request(self, request): - token = None - if 'Authorization' in request.headers: - token = request.headers.get('Authorization')[7:] - else: - token = request.access_token + token = get_token_from_header(request) return self.request_validator.validate_bearer_token( token, request.scopes, request) def estimate_type(self, request): - if request.headers.get('Authorization', '').startswith('Bearer'): + if request.headers.get('Authorization', '').split(' ')[0] == 'Bearer': return 9 elif request.access_token is not None: return 5 @@ -331,17 +345,13 @@ def create_token(self, request, refresh_token=False, save_token=False): return self.request_validator.get_jwt_bearer_token(None, None, request) def validate_request(self, request): - token = None - if 'Authorization' in request.headers: - token = request.headers.get('Authorization')[7:] - else: - token = request.access_token + token = get_token_from_header(request) return self.request_validator.validate_jwt_bearer_token( token, request.scopes, request) def estimate_type(self, request): - token = request.headers.get('Authorization', '')[7:] - if token.startswith('ey') and token.count('.') in (2, 4): + split_header = request.headers.get('Authorization', '').split() + + if len(split_header) == 2 and split_header[0] == 'Bearer' and split_header[1].startswith('ey') and split_header[1].count('.') in (2, 4): return 10 - else: - return 0 + return 0 diff --git a/tests/oauth2/rfc6749/test_tokens.py b/tests/oauth2/rfc6749/test_tokens.py index 570afb01..ecac03e2 100644 --- a/tests/oauth2/rfc6749/test_tokens.py +++ b/tests/oauth2/rfc6749/test_tokens.py @@ -2,6 +2,7 @@ import mock +from oauthlib.common import Request from oauthlib.oauth2.rfc6749.tokens import * from ...unittest import TestCase @@ -61,9 +62,22 @@ class TokenTest(TestCase): bearer_headers = { 'Authorization': 'Bearer vF9dft4qmT' } + fake_bearer_headers = [ + {'Authorization': 'Beaver vF9dft4qmT'}, + {'Authorization': 'BeavervF9dft4qmT'}, + {'Authorization': 'Beaver vF9dft4qmT'}, + {'Authorization': 'BearerF9dft4qmT'}, + {'Authorization': 'Bearer vF9d ft4qmT'}, + ] + valid_header_with_multiple_spaces = {'Authorization': 'Bearer vF9dft4qmT'} bearer_body = 'access_token=vF9dft4qmT' bearer_uri = 'http://server.example.com/resource?access_token=vF9dft4qmT' + def _mocked_validate_bearer_token(self, token, scopes, request): + if not token: + return False + return True + def test_prepare_mac_header(self): """Verify mac signatures correctness @@ -83,8 +97,57 @@ def test_prepare_bearer_request(self): self.assertEqual(prepare_bearer_body(self.token), self.bearer_body) self.assertEqual(prepare_bearer_uri(self.token, uri=self.uri), self.bearer_uri) + def test_fake_bearer_is_not_validated(self): + request_validator = mock.MagicMock() + request_validator.validate_bearer_token = self._mocked_validate_bearer_token + + for fake_header in self.fake_bearer_headers: + request = Request('/', headers=fake_header) + result = BearerToken(request_validator=request_validator).validate_request(request) + + self.assertFalse(result) + + def test_header_with_multispaces_is_validated(self): + request_validator = mock.MagicMock() + request_validator.validate_bearer_token = self._mocked_validate_bearer_token + + request = Request('/', headers=self.valid_header_with_multiple_spaces) + result = BearerToken(request_validator=request_validator).validate_request(request) + + self.assertTrue(result) + + def test_estimate_type_with_fake_header_returns_type_0(self): + request_validator = mock.MagicMock() + request_validator.validate_bearer_token = self._mocked_validate_bearer_token + + for fake_header in self.fake_bearer_headers: + request = Request('/', headers=fake_header) + result = BearerToken(request_validator=request_validator).estimate_type(request) + + if fake_header['Authorization'].count(' ') == 2 and \ + fake_header['Authorization'].split()[0] == 'Bearer': + # If we're dealing with the header containing 2 spaces, it will be recognized + # as a Bearer valid header, the token itself will be invalid by the way. + self.assertEqual(result, 9) + else: + self.assertEqual(result, 0) + class JWTTokenTestCase(TestCase): + fake_bearer_headers = [ + {'Authorization': 'Beaver vF9dft4qmT'}, + {'Authorization': 'BeavervF9dft4qmT'}, + {'Authorization': 'Beaver vF9dft4qmT'}, + {'Authorization': 'BearerF9dft4qmT'}, + {'Authorization': 'Bearer vF9df t4qmT'}, + ] + + valid_header_with_multiple_spaces = {'Authorization': 'Bearer vF9dft4qmT'} + + def _mocked_validate_bearer_token(self, token, scopes, request): + if not token: + return False + return True def test_create_token_callable_expires_in(self): """ @@ -180,6 +243,24 @@ def test_validate_token_from_request(self): request.scopes, request) + def test_fake_bearer_is_not_validated(self): + request_validator = mock.MagicMock() + request_validator.validate_jwt_bearer_token = self._mocked_validate_bearer_token + + for fake_header in self.fake_bearer_headers: + request = Request('/', headers=fake_header) + result = JWTToken(request_validator=request_validator).validate_request(request) + + self.assertFalse(result) + + def test_header_with_multiple_spaces_is_validated(self): + request_validator = mock.MagicMock() + request_validator.validate_jwt_bearer_token = self._mocked_validate_bearer_token + request = Request('/', headers=self.valid_header_with_multiple_spaces) + result = JWTToken(request_validator=request_validator).validate_request(request) + + self.assertTrue(result) + def test_estimate_type(self): """ Estimate type results for a jwt token