Skip to content

Commit

Permalink
Merge pull request #251 from johngian/allow-unsecured-jwts
Browse files Browse the repository at this point in the history
Allow using unsecured JWT token
  • Loading branch information
johngian committed Jul 26, 2018
2 parents 5094dc8 + c6945a1 commit 1261e87
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 14 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,6 @@ output/*/index.html
# Sphinx
docs/_build
docs/source

# e2e tests log
integration_tests/ghostdriver.log
6 changes: 6 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
version: '3'
services:
testprovider:
stdin_open: true
tty: true
image: mozillaparsys/oidc_testprovider
ports:
- "8080:8080"
testrp:
stdin_open: true
tty: true
image: mozillaparsys/oidc_testrp:py${PYTHON_VERSION:-3}
ports:
- "8081:8081"
Expand All @@ -23,6 +27,8 @@ services:
pip install $$DJANGO_VERSION &&
./bin/run.sh"
testrunner:
stdin_open: true
tty: true
image: mozillaparsys/oidc_testrunner
volumes:
- ./integration_tests:/integration_tests
Expand Down
11 changes: 11 additions & 0 deletions docs/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,14 @@ of ``mozilla-django-oidc``.
.. seealso::

https://docs.djangoproject.com/en/2.0/topics/http/urls/#url-namespaces

.. py:attribute:: OIDC_ALLOW_UNSECURED_JWT
:default: ``False``

Controls whether the authentication backend is going to allow unsecured JWT tokens (tokens with header ``{"alg":"none"}``).
This needs to be set to ``True`` if OP is returning unsecured JWT tokens and RP wants to accept them.

.. seealso::

https://tools.ietf.org/html/rfc7519#section-6
39 changes: 26 additions & 13 deletions mozilla_django_oidc/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from django.utils.module_loading import import_string
from django.utils import six

from josepy.b64 import b64decode
from josepy.jwk import JWK
from josepy.jws import JWS, Header

Expand Down Expand Up @@ -153,6 +154,19 @@ def retrieve_matching_jwk(self, token):
raise SuspiciousOperation('Could not find a valid JWKS.')
return key

def get_payload_data(self, token, key):
"""Helper method to get the payload of the JWT token."""
if import_from_settings('OIDC_ALLOW_UNSECURED_JWT', False):
header, payload_data, signature = token.split(b'.')
header = json.loads(smart_text(b64decode(header)))

# If config allows unsecured JWTs check the header and return the decoded payload
if 'alg' in header and header['alg'] == 'none':
return b64decode(payload_data)

# By default fallback to verify JWT signatures
return self._verify_jws(token, key)

def verify_token(self, token, **kwargs):
"""Validate the token signature."""
nonce = kwargs.get('nonce')
Expand All @@ -166,23 +180,22 @@ def verify_token(self, token, **kwargs):
else:
key = self.OIDC_RP_CLIENT_SECRET

# Verify the token
verified_token = self._verify_jws(token, key)
payload_data = self.get_payload_data(token, key)

# The 'verified_token' will always be a byte string since it's
# The 'token' will always be a byte string since it's
# the result of base64.urlsafe_b64decode().
# The payload is always the result of base64.urlsafe_b64decode().
# In Python 3 and 2, that's always a byte string.
# In Python3.6, the json.loads() function can accept a byte string
# as it will automagically decode it to a unicode string before
# deserializing https://bugs.python.org/issue17909
verified_id = json.loads(verified_token.decode('utf-8'))
token_nonce = verified_id.get('nonce')
payload = json.loads(payload_data.decode('utf-8'))
token_nonce = payload.get('nonce')

if import_from_settings('OIDC_USE_NONCE', True) and nonce != token_nonce:
msg = 'JWT Nonce verification failed.'
raise SuspiciousOperation(msg)
return verified_id
return payload

def get_token(self, payload):
"""Return token object as a dictionary."""
Expand All @@ -194,8 +207,8 @@ def get_token(self, payload):
response.raise_for_status()
return response.json()

def get_userinfo(self, access_token, id_token, verified_id):
"""Return user details dictionary. The id_token and verified_id are not used in
def get_userinfo(self, access_token, id_token, payload):
"""Return user details dictionary. The id_token and payload are not used in
the default implementation, but may be used when overriding this method"""

user_response = requests.get(
Expand Down Expand Up @@ -241,12 +254,12 @@ def authenticate(self, **kwargs):
access_token = token_info.get('access_token')

# Validate the token
verified_id = self.verify_token(id_token, nonce=nonce)
payload = self.verify_token(id_token, nonce=nonce)

if verified_id:
if payload:
self.store_tokens(access_token, id_token)
try:
return self.get_or_create_user(access_token, id_token, verified_id)
return self.get_or_create_user(access_token, id_token, payload)
except SuspiciousOperation as exc:
LOGGER.warning('failed to get or create user: %s', exc)
return None
Expand All @@ -263,11 +276,11 @@ def store_tokens(self, access_token, id_token):
if import_from_settings('OIDC_STORE_ID_TOKEN', False):
session['oidc_id_token'] = id_token

def get_or_create_user(self, access_token, id_token, verified_id):
def get_or_create_user(self, access_token, id_token, payload):
"""Returns a User instance if 1 user is found. Creates a user if not found
and configured to do so. Returns nothing if multiple users are matched."""

user_info = self.get_userinfo(access_token, id_token, verified_id)
user_info = self.get_userinfo(access_token, id_token, payload)

email = user_info.get('email')

Expand Down
134 changes: 133 additions & 1 deletion tests/test_auth.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import json
from mock import Mock, call, patch

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, hmac
from josepy.b64 import b64encode

from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.exceptions import SuspiciousOperation
from django.test import RequestFactory, TestCase, override_settings
from django.utils import six
from django.utils.encoding import force_bytes
from django.utils.encoding import force_bytes, smart_text

from mozilla_django_oidc.auth import (
default_username_algo,
Expand Down Expand Up @@ -75,6 +79,134 @@ def test_invalid_token(self, request_mock, token_mock):
request_mock.post.return_value = post_json_mock
self.assertEqual(self.backend.authenticate(request=auth_request), None)

@override_settings(OIDC_ALLOW_UNSECURED_JWT=True)
def test_allowed_unsecured_token(self):
"""Test payload data from unsecured token (allowed)."""
header = force_bytes(json.dumps({'alg': 'none'}))
payload = force_bytes(json.dumps({'foo': 'bar'}))
signature = ''
token = force_bytes('{}.{}.{}'.format(
smart_text(b64encode(header)),
smart_text(b64encode(payload)),
signature
))

extracted_payload = self.backend.get_payload_data(token, None)
self.assertEqual(payload, extracted_payload)

@override_settings(OIDC_ALLOW_UNSECURED_JWT=False)
def test_disallowed_unsecured_token(self):
"""Test payload data from unsecured token (disallowed)."""
header = force_bytes(json.dumps({'alg': 'none'}))
payload = force_bytes(json.dumps({'foo': 'bar'}))
signature = ''
token = force_bytes('{}.{}.{}'.format(
smart_text(b64encode(header)),
smart_text(b64encode(payload)),
signature
))

with self.assertRaises(KeyError):
self.backend.get_payload_data(token, None)

@override_settings(OIDC_ALLOW_UNSECURED_JWT=True)
def test_allowed_unsecured_valid_token(self):
"""Test payload data from valid secured token (unsecured allowed)."""
header = force_bytes(json.dumps({'alg': 'HS256', 'typ': 'JWT'}))
payload = force_bytes(json.dumps({'foo': 'bar'}))

# Compute signature
key = b'mysupersecuretestkey'
h = hmac.HMAC(key, hashes.SHA256(), backend=default_backend())
msg = '{}.{}'.format(smart_text(b64encode(header)), smart_text(b64encode(payload)))
h.update(force_bytes(msg))
signature = b64encode(h.finalize())

token = '{}.{}.{}'.format(
smart_text(b64encode(header)),
smart_text(b64encode(payload)),
smart_text(signature)
)
token_bytes = force_bytes(token)
key_text = smart_text(key)
output = self.backend.get_payload_data(token_bytes, key_text)
self.assertEqual(output, payload)

@override_settings(OIDC_ALLOW_UNSECURED_JWT=False)
def test_disallowed_unsecured_valid_token(self):
"""Test payload data from valid secure token (unsecured disallowed)."""
header = force_bytes(json.dumps({'alg': 'HS256', 'typ': 'JWT'}))
payload = force_bytes(json.dumps({'foo': 'bar'}))

# Compute signature
key = b'mysupersecuretestkey'
h = hmac.HMAC(key, hashes.SHA256(), backend=default_backend())
msg = '{}.{}'.format(smart_text(b64encode(header)), smart_text(b64encode(payload)))
h.update(force_bytes(msg))
signature = b64encode(h.finalize())

token = '{}.{}.{}'.format(
smart_text(b64encode(header)),
smart_text(b64encode(payload)),
smart_text(signature)
)
token_bytes = force_bytes(token)
key_text = smart_text(key)
output = self.backend.get_payload_data(token_bytes, key_text)
self.assertEqual(output, payload)

@override_settings(OIDC_ALLOW_UNSECURED_JWT=True)
def test_allowed_unsecured_invalid_token(self):
"""Test payload data from invalid secure token (unsecured allowed)."""
header = force_bytes(json.dumps({'alg': 'HS256', 'typ': 'JWT'}))
payload = force_bytes(json.dumps({'foo': 'bar'}))

# Compute signature
key = b'mysupersecuretestkey'
fake_key = b'mysupersecurefaketestkey'
h = hmac.HMAC(key, hashes.SHA256(), backend=default_backend())
msg = '{}.{}'.format(smart_text(b64encode(header)), smart_text(b64encode(payload)))
h.update(force_bytes(msg))
signature = b64encode(h.finalize())

token = '{}.{}.{}'.format(
smart_text(b64encode(header)),
smart_text(b64encode(payload)),
smart_text(signature)
)
token_bytes = force_bytes(token)
key_text = smart_text(fake_key)

with self.assertRaises(SuspiciousOperation) as ctx:
self.backend.get_payload_data(token_bytes, key_text)
self.assertEqual(ctx.exception.args[0], 'JWS token verification failed.')

@override_settings(OIDC_ALLOW_UNSECURED_JWT=False)
def test_disallowed_unsecured_invalid_token(self):
"""Test payload data from invalid secure token (unsecured disallowed)."""
header = force_bytes(json.dumps({'alg': 'HS256', 'typ': 'JWT'}))
payload = force_bytes(json.dumps({'foo': 'bar'}))

# Compute signature
key = b'mysupersecuretestkey'
fake_key = b'mysupersecurefaketestkey'
h = hmac.HMAC(key, hashes.SHA256(), backend=default_backend())
msg = '{}.{}'.format(smart_text(b64encode(header)), smart_text(b64encode(payload)))
h.update(force_bytes(msg))
signature = b64encode(h.finalize())

token = '{}.{}.{}'.format(
smart_text(b64encode(header)),
smart_text(b64encode(payload)),
smart_text(signature)
)
token_bytes = force_bytes(token)
key_text = smart_text(fake_key)

with self.assertRaises(SuspiciousOperation) as ctx:
self.backend.get_payload_data(token_bytes, key_text)
self.assertEqual(ctx.exception.args[0], 'JWS token verification failed.')

def test_get_user(self):
"""Test get_user method with valid user."""

Expand Down

0 comments on commit 1261e87

Please sign in to comment.