From f29650f91440a61f0c9ee63b3d739ab202ba5b27 Mon Sep 17 00:00:00 2001 From: George Waters Date: Sun, 31 Dec 2017 17:33:22 -0500 Subject: [PATCH 1/2] Added support for access token freshness date. --- flask_jwt_extended/tokens.py | 11 ++++++++++- flask_jwt_extended/utils.py | 4 +++- flask_jwt_extended/view_decorators.py | 12 ++++++++++-- tests/test_view_decorators.py | 6 ++++++ 4 files changed, 29 insertions(+), 4 deletions(-) diff --git a/flask_jwt_extended/tokens.py b/flask_jwt_extended/tokens.py index 6d253f76..0750bdb9 100644 --- a/flask_jwt_extended/tokens.py +++ b/flask_jwt_extended/tokens.py @@ -1,6 +1,8 @@ import datetime import uuid +from calendar import timegm + import jwt from werkzeug.security import safe_str_cmp @@ -40,7 +42,9 @@ def encode_access_token(identity, secret, algorithm, expires_delta, fresh, :param expires_delta: How far in the future this token should expire (set to False to disable expiration) :type expires_delta: datetime.timedelta or False - :param fresh: If this should be a 'fresh' token or not + :param fresh: If this should be a 'fresh' token or not. If a + datetime.timedelta is given this will indicate how long this + token will remain fresh. :param user_claims: Custom claims to include in this token. This data must be json serializable :param csrf: Whether to include a csrf double submit claim in this token @@ -49,6 +53,11 @@ def encode_access_token(identity, secret, algorithm, expires_delta, fresh, :param user_claims_key: Which key should be used to store the user claims :return: Encoded access token """ + + if isinstance(fresh, datetime.timedelta): + now = datetime.datetime.utcnow() + fresh = timegm((now + fresh).utctimetuple()) + token_data = { identity_claim_key: identity, 'fresh': fresh, diff --git a/flask_jwt_extended/utils.py b/flask_jwt_extended/utils.py index e5f92ae9..7af3f9b2 100644 --- a/flask_jwt_extended/utils.py +++ b/flask_jwt_extended/utils.py @@ -98,7 +98,9 @@ def create_access_token(identity, fresh=False, expires_delta=None): json serializable identity out of the object. :param fresh: If this token should be marked as fresh, and can thus access :func:`~flask_jwt_extended.fresh_jwt_required` endpoints. - Defaults to `False`. + Defaults to `False`. This value can also be a + `datetime.timedelta` in which case it will indicate how long + this token will be considered fresh. :param expires_delta: A `datetime.timedelta` for how long this token should last before it expires. Set to False to disable expiration. If this is None, it will use the diff --git a/flask_jwt_extended/view_decorators.py b/flask_jwt_extended/view_decorators.py index 36ca9303..8454b13a 100644 --- a/flask_jwt_extended/view_decorators.py +++ b/flask_jwt_extended/view_decorators.py @@ -1,4 +1,6 @@ from functools import wraps +from datetime import datetime +from calendar import timegm from flask import request try: @@ -81,8 +83,14 @@ def fresh_jwt_required(fn): def wrapper(*args, **kwargs): jwt_data = _decode_jwt_from_request(request_type='access') ctx_stack.top.jwt = jwt_data - if not jwt_data['fresh']: - raise FreshTokenRequired('Fresh token required') + fresh = jwt_data['fresh'] + if isinstance(fresh, bool): + if not fresh: + raise FreshTokenRequired('Fresh token required') + else: + now = timegm(datetime.utcnow().utctimetuple()) + if fresh < now: + raise FreshTokenRequired('Fresh token required') if not verify_token_claims(jwt_data[config.user_claims_key]): raise UserClaimsVerificationError('User claims verification failed') _load_user(jwt_data[config.identity_claim_key]) diff --git a/tests/test_view_decorators.py b/tests/test_view_decorators.py index 8292bd3e..5d7edc61 100644 --- a/tests/test_view_decorators.py +++ b/tests/test_view_decorators.py @@ -78,6 +78,7 @@ def test_fresh_jwt_required(app): with app.test_request_context(): access_token = create_access_token('username') fresh_access_token = create_access_token('username', fresh=True) + fresh_timed_access_token = create_access_token('username', fresh=timedelta(minutes=-1)) refresh_token = create_refresh_token('username') response = test_client.get(url, headers=make_headers(fresh_access_token)) @@ -90,6 +91,11 @@ def test_fresh_jwt_required(app): assert response.status_code == 401 assert json_data == {'msg': 'Fresh token required'} + response = test_client.get(url, headers=make_headers(fresh_timed_access_token)) + json_data = json.loads(response.get_data(as_text=True)) + assert response.status_code == 401 + assert json_data == {'msg': 'Fresh token required'} + response = test_client.get(url, headers=None) json_data = json.loads(response.get_data(as_text=True)) assert response.status_code == 401 From 5947e47887bd6d2aed4e00fa387b6719997d5c6a Mon Sep 17 00:00:00 2001 From: George Waters Date: Sun, 7 Jan 2018 13:09:51 -0500 Subject: [PATCH 2/2] Added unit test for successful freshness date in access token. --- tests/test_view_decorators.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/test_view_decorators.py b/tests/test_view_decorators.py index 5d7edc61..8ef20890 100644 --- a/tests/test_view_decorators.py +++ b/tests/test_view_decorators.py @@ -78,7 +78,8 @@ def test_fresh_jwt_required(app): with app.test_request_context(): access_token = create_access_token('username') fresh_access_token = create_access_token('username', fresh=True) - fresh_timed_access_token = create_access_token('username', fresh=timedelta(minutes=-1)) + fresh_timed_access_token = create_access_token('username', fresh=timedelta(minutes=5)) + stale_timed_access_token = create_access_token('username', fresh=timedelta(minutes=-1)) refresh_token = create_refresh_token('username') response = test_client.get(url, headers=make_headers(fresh_access_token)) @@ -93,6 +94,11 @@ def test_fresh_jwt_required(app): response = test_client.get(url, headers=make_headers(fresh_timed_access_token)) json_data = json.loads(response.get_data(as_text=True)) + assert response.status_code == 200 + assert json_data == {'foo': 'bar'} + + response = test_client.get(url, headers=make_headers(stale_timed_access_token)) + json_data = json.loads(response.get_data(as_text=True)) assert response.status_code == 401 assert json_data == {'msg': 'Fresh token required'}