Skip to content

Commit

Permalink
Refactor auth backend (#179)
Browse files Browse the repository at this point in the history
* Allow to verify token with other algorithms

* Split up authenticate() method

* Include upstream changes

* Processed PR feedback

* Improved get_userinfo docstring

* Introduced get_verify_key method

* Updated HISTORY.rst

* typo
  • Loading branch information
woutor authored and Peter Bengtsson committed May 9, 2018
1 parent 4e27396 commit 0bd1b2e
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 50 deletions.
1 change: 1 addition & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ History
Thanks `@anlutro`_
* RS256 verification through ``settings.OIDC_OP_JWKS_ENDPOINT``
Thanks `@GermanoGuerrini`_
* Refactor OIDCAuthenticationBackend so that token retrieval methods can be overridden in a subclass when you need to.

Backwards-incompatible changes:

Expand Down
124 changes: 74 additions & 50 deletions mozilla_django_oidc/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,36 @@ def verify_token(self, token, **kwargs):
# In Python3.6, the json.loads() function can accept a byte string
# as it will automagically decode it to a unicode string before
# deserializing https://bugs.python.org/issue17909
token_nonce = json.loads(verified_token.decode('utf-8')).get('nonce')
verified_id = json.loads(verified_token.decode('utf-8'))
token_nonce = verified_id.get('nonce')

if import_from_settings('OIDC_USE_NONCE', True) and nonce != token_nonce:
msg = 'JWT Nonce verification failed.'
raise SuspiciousOperation(msg)
return True
return verified_id

def get_token(self, payload):
"""Return token object as a dictionary."""

response = requests.post(
self.OIDC_OP_TOKEN_ENDPOINT,
data=payload,
verify=import_from_settings('OIDC_VERIFY_SSL', True))
response.raise_for_status()
return response.json()

def get_userinfo(self, access_token, id_token, verified_id):
"""Return user details dictionary. The id_token and verified_id are not used in
the default implementation, but may be used when overriding this method"""

user_response = requests.get(
self.OIDC_OP_USER_ENDPOINT,
headers={
'Authorization': 'Bearer {0}'.format(access_token)
},
verify=import_from_settings('OIDC_VERIFY_SSL', True))
user_response.raise_for_status()
return user_response.json()

def authenticate(self, **kwargs):
"""Authenticates a user based on the OIDC code flow."""
Expand All @@ -188,7 +212,6 @@ def authenticate(self, **kwargs):
state = self.request.GET.get('state')
code = self.request.GET.get('code')
nonce = kwargs.pop('nonce', None)
session = self.request.session

if not code or not state:
return None
Expand All @@ -208,57 +231,58 @@ def authenticate(self, **kwargs):
}

# Get the token
response = requests.post(self.OIDC_OP_TOKEN_ENDPOINT,
data=token_payload,
verify=import_from_settings('OIDC_VERIFY_SSL', True))
response.raise_for_status()
token_info = self.get_token(token_payload)
id_token = token_info.get('id_token')
access_token = token_info.get('access_token')

# Validate the token
token_response = response.json()
id_token = token_response.get('id_token')
if self.verify_token(id_token, nonce=nonce):
access_token = token_response.get('access_token')

if import_from_settings('OIDC_STORE_ACCESS_TOKEN', False):
session['oidc_access_token'] = access_token

if import_from_settings('OIDC_STORE_ID_TOKEN', False):
session['oidc_id_token'] = id_token

user_response = requests.get(self.OIDC_OP_USER_ENDPOINT,
headers={
'Authorization': 'Bearer {0}'.format(access_token)
},
verify=import_from_settings('OIDC_VERIFY_SSL', True))
user_response.raise_for_status()

user_info = user_response.json()
email = user_info.get('email')

claims_verified = self.verify_claims(user_info)
if not claims_verified:
LOGGER.debug('Login failed: Claims verification for %s failed.', email)
return None

# email based filtering
users = self.filter_users_by_claims(user_info)

if len(users) == 1:
return self.update_user(users[0], user_info)
elif len(users) > 1:
# In the rare case that two user accounts have the same email address,
# log and bail. Randomly selecting one seems really wrong.
LOGGER.warn('Multiple users with email address %s.', email)
return None
elif import_from_settings('OIDC_CREATE_USER', True):
user = self.create_user(user_info)
return user
else:
LOGGER.debug('Login failed: No user with email %s found, and '
'OIDC_CREATE_USER is False', email)
return None
verified_id = self.verify_token(id_token, nonce=nonce)

if verified_id:
return self.get_or_create_user(access_token, id_token, verified_id)

return None

def get_or_create_user(self, access_token, id_token, verified_id):
"""Returns a User instance if 1 user is found. Creates a user if not found
and configured to do so. Returns nothing if multiple users are matched."""

session = self.request.session

if import_from_settings('OIDC_STORE_ACCESS_TOKEN', False):
session['oidc_access_token'] = access_token

if import_from_settings('OIDC_STORE_ID_TOKEN', False):
session['oidc_id_token'] = id_token

# get userinfo
user_info = self.get_userinfo(access_token, id_token, verified_id)

email = user_info.get('email')

claims_verified = self.verify_claims(user_info)
if not claims_verified:
LOGGER.debug('Login failed: Claims verification for %s failed.', email)
return None

# email based filtering
users = self.filter_users_by_claims(user_info)

if len(users) == 1:
return self.update_user(users[0], user_info)
elif len(users) > 1:
# In the rare case that two user accounts have the same email address,
# log and bail. Randomly selecting one seems really wrong.
LOGGER.warn('Multiple users with email address %s.', email)
return None
elif import_from_settings('OIDC_CREATE_USER', True):
user = self.create_user(user_info)
return user
else:
LOGGER.debug('Login failed: No user with email %s found, and '
'OIDC_CREATE_USER is False', email)
return None

def get_user(self, user_id):
"""Return a user based on the id."""

Expand Down

0 comments on commit 0bd1b2e

Please sign in to comment.