Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion flask_jwt_extended/tokens.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import datetime
import uuid

from calendar import timegm

import jwt
from werkzeug.security import safe_str_cmp

Expand Down Expand Up @@ -40,7 +42,9 @@ def encode_access_token(identity, secret, algorithm, expires_delta, fresh,
:param expires_delta: How far in the future this token should expire
(set to False to disable expiration)
:type expires_delta: datetime.timedelta or False
:param fresh: If this should be a 'fresh' token or not
:param fresh: If this should be a 'fresh' token or not. If a
datetime.timedelta is given this will indicate how long this
token will remain fresh.
:param user_claims: Custom claims to include in this token. This data must
be json serializable
:param csrf: Whether to include a csrf double submit claim in this token
Expand All @@ -49,6 +53,11 @@ def encode_access_token(identity, secret, algorithm, expires_delta, fresh,
:param user_claims_key: Which key should be used to store the user claims
:return: Encoded access token
"""

if isinstance(fresh, datetime.timedelta):
now = datetime.datetime.utcnow()
fresh = timegm((now + fresh).utctimetuple())

token_data = {
identity_claim_key: identity,
'fresh': fresh,
Expand Down
4 changes: 3 additions & 1 deletion flask_jwt_extended/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ def create_access_token(identity, fresh=False, expires_delta=None):
json serializable identity out of the object.
:param fresh: If this token should be marked as fresh, and can thus access
:func:`~flask_jwt_extended.fresh_jwt_required` endpoints.
Defaults to `False`.
Defaults to `False`. This value can also be a
`datetime.timedelta` in which case it will indicate how long
this token will be considered fresh.
:param expires_delta: A `datetime.timedelta` for how long this token should
last before it expires. Set to False to disable
expiration. If this is None, it will use the
Expand Down
12 changes: 10 additions & 2 deletions flask_jwt_extended/view_decorators.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from functools import wraps
from datetime import datetime
from calendar import timegm

from flask import request
try:
Expand Down Expand Up @@ -81,8 +83,14 @@ def fresh_jwt_required(fn):
def wrapper(*args, **kwargs):
jwt_data = _decode_jwt_from_request(request_type='access')
ctx_stack.top.jwt = jwt_data
if not jwt_data['fresh']:
raise FreshTokenRequired('Fresh token required')
fresh = jwt_data['fresh']
if isinstance(fresh, bool):
if not fresh:
raise FreshTokenRequired('Fresh token required')
else:
now = timegm(datetime.utcnow().utctimetuple())
if fresh < now:
raise FreshTokenRequired('Fresh token required')
if not verify_token_claims(jwt_data[config.user_claims_key]):
raise UserClaimsVerificationError('User claims verification failed')
_load_user(jwt_data[config.identity_claim_key])
Expand Down
12 changes: 12 additions & 0 deletions tests/test_view_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ def test_fresh_jwt_required(app):
with app.test_request_context():
access_token = create_access_token('username')
fresh_access_token = create_access_token('username', fresh=True)
fresh_timed_access_token = create_access_token('username', fresh=timedelta(minutes=5))
stale_timed_access_token = create_access_token('username', fresh=timedelta(minutes=-1))
refresh_token = create_refresh_token('username')

response = test_client.get(url, headers=make_headers(fresh_access_token))
Expand All @@ -90,6 +92,16 @@ def test_fresh_jwt_required(app):
assert response.status_code == 401
assert json_data == {'msg': 'Fresh token required'}

response = test_client.get(url, headers=make_headers(fresh_timed_access_token))
json_data = json.loads(response.get_data(as_text=True))
assert response.status_code == 200
assert json_data == {'foo': 'bar'}

response = test_client.get(url, headers=make_headers(stale_timed_access_token))
json_data = json.loads(response.get_data(as_text=True))
assert response.status_code == 401
assert json_data == {'msg': 'Fresh token required'}

response = test_client.get(url, headers=None)
json_data = json.loads(response.get_data(as_text=True))
assert response.status_code == 401
Expand Down