Skip to content

Commit

Permalink
Role authorization
Browse files Browse the repository at this point in the history
  • Loading branch information
gemerden authored and miguelgrinberg committed Apr 19, 2020
1 parent 35a0c45 commit 8178f6d
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 41 deletions.
128 changes: 89 additions & 39 deletions flask_httpauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def __init__(self, scheme=None, realm=None):
self.scheme = scheme
self.realm = realm or "Authentication Required"
self.get_password_callback = None
self.get_user_roles_callback = None
self.auth_error_callback = None

def default_get_password(username):
Expand All @@ -38,6 +39,10 @@ def get_password(self, f):
self.get_password_callback = f
return f

def get_user_roles(self, f):
self.get_user_roles_callback = f
return f

def error_handler(self, f):
@wraps(f)
def decorated(*args, **kwargs):
Expand Down Expand Up @@ -85,25 +90,61 @@ def get_auth_password(self, auth):

return password

def login_required(self, f):
@wraps(f)
def decorated(*args, **kwargs):
auth = self.get_auth()

# Flask normally handles OPTIONS requests on its own, but in the
# case it is configured to forward those to the application, we
# need to ignore authentication headers and let the request through
# to avoid unwanted interactions with CORS.
if request.method != 'OPTIONS': # pragma: no cover
password = self.get_auth_password(auth)

if not self.authenticate(auth, password):
# Clear TCP receive buffer of any pending data
request.data
return self.auth_error_callback()

return f(*args, **kwargs)
return decorated
def authorize(self, role, user, auth):
if role is None:
return True
if isinstance(role, (list, tuple)):
roles = role
else:
roles = [role]
if user is True:
user = auth
if self.get_user_roles_callback is None: # pragma: no cover
raise ValueError('get_user_roles callback is not defined')
user_roles = self.get_user_roles_callback(user)
if user_roles is None:
user_roles = {}
elif not isinstance(user_roles, (list, tuple)):
user_roles = {user_roles}
else:
user_roles = set(user_roles)
for role in roles:
if isinstance(role, (list, tuple)):
role = set(role)
if role & user_roles == role:
return True
elif role in user_roles:
return True

def login_required(self, f=None, role=None):
if f is not None and role is not None: # pragma: no cover
raise ValueError('role is the only supported argument')

def login_required_internal(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = self.get_auth()

# Flask normally handles OPTIONS requests on its own, but in
# the case it is configured to forward those to the
# application, we need to ignore authentication headers and
# let the request through to avoid unwanted interactions with
# CORS.
if request.method != 'OPTIONS': # pragma: no cover
password = self.get_auth_password(auth)

user = self.authenticate(auth, password)
if not user or not self.authorize(role, user, auth):
# Clear TCP receive buffer of any pending data
request.data
return self.auth_error_callback()

return f(*args, **kwargs)
return decorated

if f:
return login_required_internal(f)
return login_required_internal

def username(self):
if not request.authorization:
Expand Down Expand Up @@ -271,23 +312,32 @@ def __init__(self, main_auth, *args):
self.main_auth = main_auth
self.additional_auth = args

def login_required(self, f):
@wraps(f)
def decorated(*args, **kwargs):
selected_auth = None
if 'Authorization' in request.headers:
try:
scheme, creds = request.headers['Authorization'].split(
None, 1)
except ValueError:
# malformed Authorization header
pass
else:
for auth in self.additional_auth:
if auth.scheme == scheme:
selected_auth = auth
break
if selected_auth is None:
selected_auth = self.main_auth
return selected_auth.login_required(f)(*args, **kwargs)
return decorated
def login_required(self, f=None, role=None):
if f is not None and role is not None: # pragma: no cover
raise ValueError('role is the only supported argument')

def login_required_internal(f):
@wraps(f)
def decorated(*args, **kwargs):
selected_auth = None
if 'Authorization' in request.headers:
try:
scheme, creds = request.headers[
'Authorization'].split(None, 1)
except ValueError:
# malformed Authorization header
pass
else:
for auth in self.additional_auth:
if auth.scheme == scheme:
selected_auth = auth
break
if selected_auth is None:
selected_auth = self.main_auth
return selected_auth.login_required(role=role)(f)(
*args, **kwargs)
return decorated

if f:
return login_required_internal(f)
return login_required_internal
32 changes: 31 additions & 1 deletion tests/test_multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,24 @@ def setUp(self):

@basic_auth.verify_password
def verify_password(username, password):
return username == 'john' and password == 'hello'
if username == 'john' and password == 'hello':
return 'john'

@basic_auth.get_user_roles
def get_basic_role(username):
if username == 'john':
return ['foo', 'bar']

@token_auth.verify_token
def verify_token(token):
return token == 'this-is-the-token!'

@token_auth.get_user_roles
def get_token_role(auth):
if auth['token'] == 'this-is-the-token!':
return 'foo'
return

@token_auth.error_handler
def error_handler():
return 'error', 401, {'WWW-Authenticate': 'MyToken realm="Foo"'}
Expand All @@ -34,6 +46,11 @@ def index():
def auth_route():
return 'access granted'

@app.route('/protected-with-role')
@multi_auth.login_required(role='foo')
def auth_role_route():
return 'role access granted'

self.app = app
self.client = app.test_client()

Expand Down Expand Up @@ -86,3 +103,16 @@ def test_multi_malformed_header(self):
response = self.client.get(
'/protected', headers={'Authorization': 'token-without-scheme'})
self.assertEqual(response.status_code, 401)

def test_multi_auth_login_valid_basic_role(self):
creds = base64.b64encode(b'john:hello').decode('utf-8')
response = self.client.get(
'/protected-with-role', headers={'Authorization':
'Basic ' + creds})
self.assertEqual(response.data.decode('utf-8'), 'role access granted')

def test_multi_auth_login_valid_token_role(self):
response = self.client.get(
'/protected-with-role', headers={'Authorization':
'MyToken this-is-the-token!'})
self.assertEqual(response.data.decode('utf-8'), 'role access granted')
121 changes: 121 additions & 0 deletions tests/test_roles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import unittest
import base64
from flask import Flask, g
from flask_httpauth import HTTPBasicAuth


class HTTPAuthTestCase(unittest.TestCase):
def setUp(self):
app = Flask(__name__)
app.config['SECRET_KEY'] = 'my secret'

roles_auth = HTTPBasicAuth()

@roles_auth.verify_password
def roles_auth_verify_password(username, password):
g.anon = False
if username == 'john':
return password == 'hello'
elif username == 'susan':
return password == 'bye'
elif username == '':
g.anon = True
return True
return False

@roles_auth.get_user_roles
def get_user_roles(auth):
username = auth.username
if username == 'john':
return 'normal'
elif username == 'susan':
return ('normal', 'special')

@roles_auth.error_handler
def error_handler():
return 'error', 403 # use a custom error status

@app.route('/')
def index():
return 'index'

@app.route('/normal')
@roles_auth.login_required(role='normal')
def roles_auth_route_normal():
return 'normal:' + roles_auth.username()

@app.route('/special')
@roles_auth.login_required(role='special')
def roles_auth_route_special():
return 'special:' + roles_auth.username()

@app.route('/normal-or-special')
@roles_auth.login_required(role=('normal', 'special'))
def roles_auth_route_normal_or_special():
return 'normal_or_special:' + roles_auth.username()

@app.route('/normal-and-special')
@roles_auth.login_required(role=(('normal', 'special'),))
def roles_auth_route_normal_and_special():
return 'normal_and_special:' + roles_auth.username()

self.app = app
self.roles_auth = roles_auth
self.client = app.test_client()

def test_verify_roles_valid_normal_1(self):
creds = base64.b64encode(b'susan:bye').decode('utf-8')
response = self.client.get(
'/normal', headers={'Authorization': 'Basic ' + creds})
self.assertEqual(response.data, b'normal:susan')

def test_verify_roles_valid_normal_2(self):
creds = base64.b64encode(b'john:hello').decode('utf-8')
response = self.client.get(
'/normal', headers={'Authorization': 'Basic ' + creds})
self.assertEqual(response.data, b'normal:john')

def test_verify_auth_login_valid_special(self):
creds = base64.b64encode(b'susan:bye').decode('utf-8')
response = self.client.get(
'/special', headers={'Authorization': 'Basic ' + creds})
self.assertEqual(response.data, b'special:susan')

def test_verify_auth_login_invalid_special(self):
creds = base64.b64encode(b'john:hello').decode('utf-8')
response = self.client.get(
'/special', headers={'Authorization': 'Basic ' + creds})
self.assertEqual(response.status_code, 403)
self.assertTrue('WWW-Authenticate' in response.headers)

def test_verify_auth_login_valid_normal_or_special_1(self):
creds = base64.b64encode(b'susan:bye').decode('utf-8')
response = self.client.get(
'/normal-or-special', headers={'Authorization': 'Basic ' + creds})
self.assertEqual(response.data, b'normal_or_special:susan')

def test_verify_auth_login_valid_normal_or_special_2(self):
creds = base64.b64encode(b'john:hello').decode('utf-8')
response = self.client.get(
'/normal-or-special', headers={'Authorization': 'Basic ' + creds})
self.assertEqual(response.data, b'normal_or_special:john')

def test_verify_auth_login_valid_normal_and_special_1(self):
creds = base64.b64encode(b'susan:bye').decode('utf-8')
response = self.client.get(
'/normal-and-special', headers={'Authorization': 'Basic ' + creds})
self.assertEqual(response.data, b'normal_and_special:susan')

def test_verify_auth_login_valid_normal_and_special_2(self):
creds = base64.b64encode(b'john:hello').decode('utf-8')
response = self.client.get(
'/normal-and-special', headers={'Authorization': 'Basic ' + creds})
self.assertEqual(response.status_code, 403)
self.assertTrue('WWW-Authenticate' in response.headers)

def test_verify_auth_login_invalid_password(self):
creds = base64.b64encode(b'john:bye').decode('utf-8')
response = self.client.get(
'/normal', headers={'Authorization': 'Basic ' + creds})
self.assertEqual(response.status_code, 403)
self.assertTrue('WWW-Authenticate' in response.headers)
5 changes: 4 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tox]
envlist=flake8,py27,py35,py36,py37,pypy,pypy3,docs,coverage
envlist=flake8,py27,py35,py36,py37,py38,pypy,pypy3,docs,coverage
skip_missing_interpreters=True

[testenv]
Expand Down Expand Up @@ -29,6 +29,9 @@ basepython=python3.6
[testenv:py37]
basepython=python3.7

[testenv:py38]
basepython=python3.8

[testenv:pypy]
basepython=pypy

Expand Down

0 comments on commit 8178f6d

Please sign in to comment.