Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion api/auth/authproviders.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import urllib
import urlparse

from xml.etree import ElementTree

from . import APIAuthProviderException, APIUnknownUserException, APIRefreshTokenException
from .. import config, util
from ..dao import dbutil
Expand Down Expand Up @@ -189,6 +191,7 @@ def set_user_avatar(self, uid, identity):
# If the user has no avatar set, mark their provider_avatar as their chosen avatar.
config.db.users.update_one({'_id': uid, 'avatar': {'$exists': False}}, {'$set':{'avatar': provider_avatar, 'modified': timestamp}})


class WechatOAuthProvider(AuthProvider):

def __init__(self):
Expand Down Expand Up @@ -278,6 +281,58 @@ def set_user_avatar(self, uid, identity): # pragma: no cover
pass


class CASAuthProvider(AuthProvider):

def __init__(self):
super(CASAuthProvider, self).__init__('cas')

def validate_code(self, code, **kwargs):
uid = self.validate_user(code)
return {
'access_token': code,
'uid': uid,
'auth_type': self.auth_type,
'expires': datetime.datetime.utcnow() + datetime.timedelta(days=14)
}

def validate_user(self, token):
service_url = config.get_item('site', 'redirect_url') + self.config['service_url_state']
r = requests.get(self.config['verify_endpoint'], params={'ticket': token, 'service': service_url})
if not r.ok:
raise APIAuthProviderException('User token not valid')

username = self._parse_xml_response(r.content)
uid = username+'@'+self.config['namespace']

self.ensure_user_exists(uid)
self.set_user_gravatar(uid, uid)

return uid

def _parse_xml_response(self, response):

# parse xml
tree = ElementTree.fromstring(response)

# check to see if xml response labeled request as success
# see also: xml parsing in https://github.com/python-cas/python-cas
if tree[0].tag.endswith('authenticationSuccess'):

try:
# get username from response
namespace = tree.tag[0:tree.tag.index('}')+1]
username = tree[0].find('.//' + namespace + 'user').text
except Exception as e: # pylint: disable=broad-except
config.log.warning(e)
raise APIAuthProviderException('Unable to parse response from CAS provider.')

else:
raise APIAuthProviderException('Ticket verification unsuccessful.')

return username



class APIKeyAuthProvider(AuthProvider):
"""
Uses an API key for authentication.
Expand Down Expand Up @@ -339,5 +394,6 @@ def validate_user_api_key(key):
'google' : GoogleOAuthProvider,
'ldap' : JWTAuthProvider,
'wechat' : WechatOAuthProvider,
'api-key' : APIKeyAuthProvider
'api-key' : APIKeyAuthProvider,
'cas' : CASAuthProvider
}
3 changes: 2 additions & 1 deletion api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
'redirect_url': 'https://localhost',
'central_url': 'https://sdmc.scitran.io/api',
'registered': False,
'ssl_cert': None
'ssl_cert': None,
'inactivity_timeout': None
},
'queue': {
'max_retries': 3,
Expand Down
21 changes: 21 additions & 0 deletions api/web/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,27 @@ def authenticate_user_token(self, session_token):
if cached_token:
self.request.logger.debug('looked up cached token in %dms', ((datetime.datetime.utcnow() - timestamp).total_seconds() * 1000.))

# Check if site has inactivity timeout
try:
inactivity_timeout = config.get_item('site', 'inactivity_timeout')
except KeyError:
inactivity_timeout = None

if inactivity_timeout:
last_seen = cached_token.get('last_seen')

# If now - last_seen is greater than inactivity timeout, clear out session
if last_seen and (timestamp - last_seen).total_seconds() > inactivity_timeout:

# Token expired and no refresh token, remove and deny request
config.db.authtokens.delete_one({'_id': cached_token['_id']})
config.db.refreshtokens.delete({'uid': cached_token['uid'], 'auth_type': cached_token['auth_type']})
self.abort(401, 'Inactivity timeout')

# set last_seen to now
config.db.authtokens.update_one({'_id': cached_token['_id']}, {'$set': {'last_seen': timestamp}})
Copy link
Contributor

@kofalt kofalt Sep 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have assumed that the user's last_seen is always getting updated on every request. Are you sure it needs to be set here? If so, was it not being updated elsewhere?



# Check if token is expired
if cached_token.get('expires') and timestamp > cached_token['expires']:

Expand Down
1 change: 1 addition & 0 deletions sample.config
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#SCITRAN_CORE_DRONE_SECRET=""

#SCITRAN_SITE_ID=""
#SCITRAN_SITE_INACTIVITY_TIMEOUT=3600
#SCITRAN_SITE_NAME=""
#SCITRAN_SITE_URL=""
#SCITRAN_SITE_API_URL=""
Expand Down
89 changes: 89 additions & 0 deletions test/unit_tests/python/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,95 @@ def test_jwt_auth(config, as_drone, as_public, api_db):
api_db.users.delete_one({'_id': uid})


def test_cas_auth(config, as_drone, as_public, api_db):
# try to login w/ unconfigured auth provider
r = as_public.post('/login', json={'auth_type': 'cas', 'code': 'test'})
assert r.status_code == 400

# inject cas auth config
config['auth']['cas'] = dict(
service_url_state='?state=cas',
auth_endpoint='http://cas.test/cas/login',
verify_endpoint='http://cas.test/cas/serviceValidate',
namespace='cas.test',
display_string='CAS Auth')

username = 'cas'
uid = username+'@'+config.auth.cas.namespace

with requests_mock.Mocker() as m:
# try to log in w/ cas and invalid token (=code)
m.get(config.auth.cas.verify_endpoint, status_code=400)
r = as_public.post('/login', json={'auth_type': 'cas', 'code': 'test'})
assert r.status_code == 401

xml_response_unsuccessful = """
<cas:serviceResponse xmlns:cas='http://www.yale.edu/tp/cas'>
<cas:authenticationFailure>
</cas:authenticationFailure>
</cas:serviceResponse>
"""

# try to log in w/ cas - pretend provider doesn't return with success
m.get(config.auth.cas.verify_endpoint, content=xml_response_unsuccessful)
r = as_public.post('/login', json={'auth_type': 'cas', 'code': 'test'})
assert r.status_code == 401

xml_response_malformed = """
<cas:serviceResponse xmlns:cas='http://www.yale.edu/tp/cas'>
<cas:authenticationSuccess>
<cas:bad_key>cas</cas:bad_key>
</cas:authenticationSuccess>
</cas:serviceResponse>
"""

# try to log in w/ cas - pretend provider doesn't return valid username response
m.get(config.auth.cas.verify_endpoint, content=xml_response_malformed)
r = as_public.post('/login', json={'auth_type': 'cas', 'code': 'test'})
assert r.status_code == 401

xml_response_successful = """
<cas:serviceResponse xmlns:cas='http://www.yale.edu/tp/cas'>
<cas:authenticationSuccess>
<cas:user>cas</cas:user>
</cas:authenticationSuccess>
</cas:serviceResponse>
"""

# try to log in w/ cas - user not in db (yet)
m.get(config.auth.cas.verify_endpoint, content=xml_response_successful)
r = as_public.post('/login', json={'auth_type': 'cas', 'code': 'test'})
assert r.status_code == 402

# try to log in w/ cas - user added but disabled
assert as_drone.post('/users', json={
'_id': uid, 'disabled': True, 'firstname': 'test', 'lastname': 'test'}).ok
r = as_public.post('/login', json={'auth_type': 'cas', 'code': 'test'})
assert r.status_code == 402

# log in w/ cas (also mock gravatar 404)
m.head(re.compile('https://gravatar.com/avatar'), status_code=404)
as_drone.put('/users/' + uid, json={'disabled': False})
r = as_public.post('/login', json={'auth_type': 'cas', 'code': 'test'})
assert r.ok
assert 'gravatar' not in api_db.users.find_one({'_id': uid})['avatars']
token = r.json['token']

# access api w/ valid token
r = as_public.get('', headers={'Authorization': token})
assert r.ok

# log in w/ cas (now w/ existing gravatar)
m.head(re.compile('https://gravatar.com/avatar'))
r = as_public.post('/login', json={'auth_type': 'cas', 'code': 'test'})
assert r.ok
assert 'gravatar' in api_db.users.find_one({'_id': uid})['avatars']

# clean up
api_db.authtokens.delete_one({'_id': token})
api_db.users.delete_one({'_id': uid})


def test_google_auth(config, as_drone, as_public, api_db):
# inject google auth client_secret into config
config['auth']['google']['client_secret'] = 'test'
Expand Down