Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions flask_jwt_extended/view_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ def jwt_required(fn):
def wrapper(*args, **kwargs):
jwt_data = _decode_jwt_from_request(request_type='access')
ctx_stack.top.jwt = jwt_data
if not verify_token_claims(jwt_data[config.user_claims]):
raise UserClaimsVerificationError('User claims verification failed')
_load_user(jwt_data[config.identity_claim])
return fn(*args, **kwargs)
return wrapper
Expand All @@ -58,6 +60,8 @@ def wrapper(*args, **kwargs):
try:
jwt_data = _decode_jwt_from_request(request_type='access')
ctx_stack.top.jwt = jwt_data
if not verify_token_claims(jwt_data[config.user_claims]):
raise UserClaimsVerificationError('User claims verification failed')
_load_user(jwt_data[config.identity_claim])
except (NoAuthorizationError, InvalidHeaderError):
pass
Expand All @@ -77,12 +81,12 @@ def fresh_jwt_required(fn):
"""
@wraps(fn)
def wrapper(*args, **kwargs):
# Check if the token is fresh
jwt_data = _decode_jwt_from_request(request_type='access')
ctx_stack.top.jwt = jwt_data
if not jwt_data['fresh']:
raise FreshTokenRequired('Fresh token required')

ctx_stack.top.jwt = jwt_data
if not verify_token_claims(jwt_data[config.user_claims]):
raise UserClaimsVerificationError('User claims verification failed')
_load_user(jwt_data[config.identity_claim])
return fn(*args, **kwargs)
return wrapper
Expand Down Expand Up @@ -214,11 +218,6 @@ def _decode_jwt_from_request(request_type):
if decoded_token['type'] != request_type:
raise WrongTokenError('Only {} tokens can access this endpoint'.format(request_type))

# Check if the custom claims in access tokens are valid
if request_type == 'access':
if not verify_token_claims(decoded_token[config.user_claims]):
raise UserClaimsVerificationError('User claims verification failed')

# If blacklisting is enabled, see if this token has been revoked
if _token_blacklisted(decoded_token, request_type):
raise RevokedTokenError('Token has been revoked')
Expand Down
Empty file added tests/__init__.py
Empty file.
69 changes: 69 additions & 0 deletions tests/test_asymmetric_crypto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import pytest
from flask import Flask, jsonify, json

from flask_jwt_extended import JWTManager, jwt_required, create_access_token

RSA_PRIVATE = """
-----BEGIN RSA PRIVATE KEY-----
MIICXgIBAAKBgQDN+p9a9oMyqRzkae8yLdJcEK0O0WesH6JiMz+KDrpUwAoAM/KP
DnxFnROJDSBHyHEmPVn5x8GqV5lQ9+6l97jdEEcPo6wkshycM82fgcxOmvtAy4Uo
xq/AeplYqplhcUTGVuo4ZldOLmN8ksGmzhWpsOdT0bkYipHCn5sWZxd21QIDAQAB
AoGBAMJ0++KVXXEDZMpjFDWsOq898xNNMHG3/8ZzmWXN161RC1/7qt/RjhLuYtX9
NV9vZRrzyrDcHAKj5pMhLgUzpColKzvdG2vKCldUs2b0c8HEGmjsmpmgoI1Tdf9D
G1QK+q9pKHlbj/MLr4vZPX6xEwAFeqRKlzL30JPD+O6mOXs1AkEA8UDzfadH1Y+H
bcNN2COvCqzqJMwLNRMXHDmUsjHfR2gtzk6D5dDyEaL+O4FLiQCaNXGWWoDTy/HJ
Clh1Z0+KYwJBANqRtJ+RvdgHMq0Yd45MMyy0ODGr1B3PoRbUK8EdXpyUNMi1g3iJ
tXMbLywNkTfcEXZTlbbkVYwrEl6P2N1r42cCQQDb9UQLBEFSTRJE2RRYQ/CL4yt3
cTGmqkkfyr/v19ii2jEpMBzBo8eQnPL+fdvIhWwT3gQfb+WqxD9v10bzcmnRAkEA
mzTgeHd7wg3KdJRtQYTmyhXn2Y3VAJ5SG+3qbCW466NqoCQVCeFwEh75rmSr/Giv
lcDhDZCzFuf3EWNAcmuMfQJARsWfM6q7v2p6vkYLLJ7+VvIwookkr6wymF5Zgb9d
E6oTM2EeUPSyyrj5IdsU2JCNBH1m3JnUflz8p8/NYCoOZg==
-----END RSA PRIVATE KEY-----
"""

RSA_PUBLIC = """
-----BEGIN RSA PUBLIC KEY-----
MIGJAoGBAM36n1r2gzKpHORp7zIt0lwQrQ7RZ6wfomIzP4oOulTACgAz8o8OfEWd
E4kNIEfIcSY9WfnHwapXmVD37qX3uN0QRw+jrCSyHJwzzZ+BzE6a+0DLhSjGr8B6
mViqmWFxRMZW6jhmV04uY3ySwabOFamw51PRuRiKkcKfmxZnF3bVAgMBAAE=
-----END RSA PUBLIC KEY-----
"""


@pytest.fixture(scope='function')
def app():
app = Flask(__name__)
app.config['JWT_SECRET_KEY'] = 'foobarbaz'
app.config['JWT_PUBLIC_KEY'] = RSA_PUBLIC
app.config['JWT_PRIVATE_KEY'] = RSA_PRIVATE
JWTManager(app)

@app.route('/protected', methods=['GET'])
@jwt_required
def protected():
return jsonify(foo='bar')

return app


def test_asymmetric_cropto(app):
test_client = app.test_client()

with app.test_request_context():
hs256_token = create_access_token('username')
app.config['JWT_ALGORITHM'] = 'RS256'
rs256_token = create_access_token('username')

# Insure the symmetric token does not work now
access_headers = {'Authorization': 'Bearer {}'.format(hs256_token)}
response = test_client.get('/protected', headers=access_headers)
json_data = json.loads(response.get_data(as_text=True))
assert response.status_code == 422
assert json_data == {'msg': 'The specified alg value is not allowed'}

# Insure the asymmetric token does work
access_headers = {'Authorization': 'Bearer {}'.format(rs256_token)}
response = test_client.get('/protected', headers=access_headers)
json_data = json.loads(response.get_data(as_text=True))
assert response.status_code == 200
assert json_data == {'foo': 'bar'}
Loading