Skip to content

Commit

Permalink
Support token auth with custom header in MultiAuth class (Fixes #125)
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelgrinberg committed May 1, 2021
1 parent ccf98aa commit 6509081
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 16 deletions.
33 changes: 18 additions & 15 deletions flask_httpauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ def default_auth_error(status):
self.get_password(default_get_password)
self.error_handler(default_auth_error)

def is_compatible_auth(self, headers):
if self.header is None or self.header == 'Authorization':
try:
scheme, _ = request.headers.get('Authorization', '').split(
None, 1)
except ValueError:
# malformed Authorization header
return False
return scheme == self.scheme
else:
return self.header in headers

def get_password(self, f):
self.get_password_callback = f
return f
Expand Down Expand Up @@ -368,21 +380,12 @@ def login_required(self, f=None, role=None, optional=None):
def login_required_internal(f):
@wraps(f)
def decorated(*args, **kwargs):
selected_auth = None
if 'Authorization' in request.headers:
try:
scheme, creds = request.headers[
'Authorization'].split(None, 1)
except ValueError:
# malformed Authorization header
pass
else:
for auth in self.additional_auth:
if auth.scheme == scheme:
selected_auth = auth
break
if selected_auth is None:
selected_auth = self.main_auth
selected_auth = self.main_auth
if not self.main_auth.is_compatible_auth(request.headers):
for auth in self.additional_auth:
if auth.is_compatible_auth(request.headers):
selected_auth = auth
break
return selected_auth.login_required(role=role,
optional=optional
)(f)(*args, **kwargs)
Expand Down
32 changes: 31 additions & 1 deletion tests/test_multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ def setUp(self):

basic_auth = HTTPBasicAuth()
token_auth = HTTPTokenAuth('MyToken')
multi_auth = MultiAuth(basic_auth, token_auth)
custom_token_auth = HTTPTokenAuth(header='X-Token')
multi_auth = MultiAuth(basic_auth, token_auth, custom_token_auth)

@basic_auth.verify_password
def verify_password(username, password):
Expand All @@ -37,6 +38,16 @@ def get_token_role(auth):
def error_handler():
return 'error', 401, {'WWW-Authenticate': 'MyToken realm="Foo"'}

@custom_token_auth.verify_token
def verify_custom_token(token):
return token == 'this-is-the-custom-token!'

@custom_token_auth.get_user_roles
def get_custom_token_role(auth):
if auth['token'] == 'this-is-the-custom-token!':
return 'foo'
return

@app.route('/')
def index():
return 'index'
Expand Down Expand Up @@ -91,6 +102,19 @@ def test_multi_auth_login_invalid_token(self):
self.assertEqual(response.headers['WWW-Authenticate'],
'MyToken realm="Foo"')

def test_multi_auth_login_valid_custom_token(self):
response = self.client.get(
'/protected', headers={'X-Token': 'this-is-the-custom-token!'})
self.assertEqual(response.data.decode('utf-8'), 'access granted:None')

def test_multi_auth_login_invalid_custom_token(self):
response = self.client.get(
'/protected', headers={'X-Token': 'this-is-not-the-token!'})
self.assertEqual(response.status_code, 401)
self.assertTrue('WWW-Authenticate' in response.headers)
self.assertEqual(response.headers['WWW-Authenticate'],
'Bearer realm="Authentication Required"')

def test_multi_auth_login_invalid_scheme(self):
response = self.client.get(
'/protected', headers={'Authorization': 'Foo this-is-the-token!'})
Expand All @@ -116,3 +140,9 @@ def test_multi_auth_login_valid_token_role(self):
'/protected-with-role', headers={'Authorization':
'MyToken this-is-the-token!'})
self.assertEqual(response.data.decode('utf-8'), 'role access granted')

def test_multi_auth_login_valid_custom_token_role(self):
response = self.client.get(
'/protected-with-role', headers={'X-Token':
'this-is-the-custom-token!'})
self.assertEqual(response.data.decode('utf-8'), 'role access granted')

0 comments on commit 6509081

Please sign in to comment.