diff --git a/flask_jwt_extended/tokens.py b/flask_jwt_extended/tokens.py index 6d253f76..0750bdb9 100644 --- a/flask_jwt_extended/tokens.py +++ b/flask_jwt_extended/tokens.py @@ -1,6 +1,8 @@ import datetime import uuid +from calendar import timegm + import jwt from werkzeug.security import safe_str_cmp @@ -40,7 +42,9 @@ def encode_access_token(identity, secret, algorithm, expires_delta, fresh, :param expires_delta: How far in the future this token should expire (set to False to disable expiration) :type expires_delta: datetime.timedelta or False - :param fresh: If this should be a 'fresh' token or not + :param fresh: If this should be a 'fresh' token or not. If a + datetime.timedelta is given this will indicate how long this + token will remain fresh. :param user_claims: Custom claims to include in this token. This data must be json serializable :param csrf: Whether to include a csrf double submit claim in this token @@ -49,6 +53,11 @@ def encode_access_token(identity, secret, algorithm, expires_delta, fresh, :param user_claims_key: Which key should be used to store the user claims :return: Encoded access token """ + + if isinstance(fresh, datetime.timedelta): + now = datetime.datetime.utcnow() + fresh = timegm((now + fresh).utctimetuple()) + token_data = { identity_claim_key: identity, 'fresh': fresh, diff --git a/flask_jwt_extended/utils.py b/flask_jwt_extended/utils.py index e5f92ae9..7af3f9b2 100644 --- a/flask_jwt_extended/utils.py +++ b/flask_jwt_extended/utils.py @@ -98,7 +98,9 @@ def create_access_token(identity, fresh=False, expires_delta=None): json serializable identity out of the object. :param fresh: If this token should be marked as fresh, and can thus access :func:`~flask_jwt_extended.fresh_jwt_required` endpoints. - Defaults to `False`. + Defaults to `False`. This value can also be a + `datetime.timedelta` in which case it will indicate how long + this token will be considered fresh. :param expires_delta: A `datetime.timedelta` for how long this token should last before it expires. Set to False to disable expiration. If this is None, it will use the diff --git a/flask_jwt_extended/view_decorators.py b/flask_jwt_extended/view_decorators.py index 36ca9303..8454b13a 100644 --- a/flask_jwt_extended/view_decorators.py +++ b/flask_jwt_extended/view_decorators.py @@ -1,4 +1,6 @@ from functools import wraps +from datetime import datetime +from calendar import timegm from flask import request try: @@ -81,8 +83,14 @@ def fresh_jwt_required(fn): def wrapper(*args, **kwargs): jwt_data = _decode_jwt_from_request(request_type='access') ctx_stack.top.jwt = jwt_data - if not jwt_data['fresh']: - raise FreshTokenRequired('Fresh token required') + fresh = jwt_data['fresh'] + if isinstance(fresh, bool): + if not fresh: + raise FreshTokenRequired('Fresh token required') + else: + now = timegm(datetime.utcnow().utctimetuple()) + if fresh < now: + raise FreshTokenRequired('Fresh token required') if not verify_token_claims(jwt_data[config.user_claims_key]): raise UserClaimsVerificationError('User claims verification failed') _load_user(jwt_data[config.identity_claim_key]) diff --git a/tests/test_view_decorators.py b/tests/test_view_decorators.py index 8292bd3e..8ef20890 100644 --- a/tests/test_view_decorators.py +++ b/tests/test_view_decorators.py @@ -78,6 +78,8 @@ def test_fresh_jwt_required(app): with app.test_request_context(): access_token = create_access_token('username') fresh_access_token = create_access_token('username', fresh=True) + fresh_timed_access_token = create_access_token('username', fresh=timedelta(minutes=5)) + stale_timed_access_token = create_access_token('username', fresh=timedelta(minutes=-1)) refresh_token = create_refresh_token('username') response = test_client.get(url, headers=make_headers(fresh_access_token)) @@ -90,6 +92,16 @@ def test_fresh_jwt_required(app): assert response.status_code == 401 assert json_data == {'msg': 'Fresh token required'} + response = test_client.get(url, headers=make_headers(fresh_timed_access_token)) + json_data = json.loads(response.get_data(as_text=True)) + assert response.status_code == 200 + assert json_data == {'foo': 'bar'} + + response = test_client.get(url, headers=make_headers(stale_timed_access_token)) + json_data = json.loads(response.get_data(as_text=True)) + assert response.status_code == 401 + assert json_data == {'msg': 'Fresh token required'} + response = test_client.get(url, headers=None) json_data = json.loads(response.get_data(as_text=True)) assert response.status_code == 401