Skip to content

Commit

Permalink
csrf: implement transparent token rotation
Browse files Browse the repository at this point in the history
* Adds support for transparently rotating the CSRF token during a grace
  period to avoid clients easily being prompted with CSRF errors.
  • Loading branch information
lnielsen committed Dec 5, 2021
1 parent 3baeed1 commit 3e2a4d0
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 14 deletions.
43 changes: 35 additions & 8 deletions invenio_rest/csrf.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
import re
import secrets
import string
from datetime import datetime, timedelta, timezone

from flask import Blueprint, abort, current_app, request
from itsdangerous import BadSignature, TimedJSONWebSignatureSerializer
from itsdangerous import BadSignature, SignatureExpired, \
TimedJSONWebSignatureSerializer
from six import string_types
from six.moves.urllib.parse import urlparse

Expand All @@ -34,27 +36,32 @@
REASON_INSECURE_REFERER = (
"Referer checking failed - Referer is insecure while host is secure."
)
REASON_TOKEN_EXPIRED = "CSRF token expired. Try again."


def _get_csrf_serializer():
def _get_csrf_serializer(expires_in=None):
"""Note that this serializer is used to encode/decode the CSRF cookie.
In case you change this implementation bear in mind that the token
generated must be signed so as to avoid any client-side tampering.
"""
expires_in = expires_in or current_app.config['CSRF_TOKEN_EXPIRES_IN']

return TimedJSONWebSignatureSerializer(
current_app.config.get(
'CSRF_SECRET',
current_app.config.get('SECRET_KEY') or 'CHANGE_ME'),
salt=current_app.config['CSRF_SECRET_SALT'])
salt=current_app.config['CSRF_SECRET_SALT'],
expires_in=expires_in,
)


def _get_random_string(length, allowed_chars):
return ''.join(secrets.choice(allowed_chars) for i in range(length))


def _get_new_csrf_token():
csrf_serializer = _get_csrf_serializer()
def _get_new_csrf_token(expires_in=None):
csrf_serializer = _get_csrf_serializer(expires_in=expires_in)
encoded_token = csrf_serializer.dumps(
_get_random_string(
current_app.config['CSRF_TOKEN_LENGTH'],
Expand All @@ -77,16 +84,27 @@ def _decode_csrf(data):
csrf_serializer = _get_csrf_serializer()
try:
return csrf_serializer.loads(data)
except SignatureExpired as e:
grace_period = timedelta(
seconds=current_app.config['CSRF_TOKEN_GRACE_PERIOD'])
if e.date_signed < datetime.now(tz=timezone.utc) - grace_period:
# Grace period for token rotation exceeded.
_abort400(REASON_TOKEN_EXPIRED)
else:
# Accept expired token, but rotate it to a new one.
reset_token()
return e.payload
except BadSignature:
raise RESTCSRFError()
_abort400(REASON_BAD_TOKEN)


def _set_token(response):
response.set_cookie(
current_app.config['CSRF_COOKIE_NAME'],
_get_new_csrf_token(),
max_age=current_app.config.get(
'CSRF_COOKIE_MAX_AGE', 60*60*24*7*52),
# 1 week for cookie (but we rotate the token every day)
'CSRF_COOKIE_MAX_AGE', 60*60*24*7),
domain=current_app.config.get(
'CSRF_COOKIE_DOMAIN',
current_app.session_interface.get_cookie_domain(
Expand Down Expand Up @@ -175,7 +193,7 @@ def init_app(self, app):
:param app: An instance of :class:`flask.Flask`.
"""
app.config.setdefault('CSRF_COOKIE_NAME', 'csrftoken')
app.config.setdefault('CSRF_HEADER', 'HTTP_X_CSRFTOKEN')
app.config.setdefault('CSRF_HEADER', 'X-CSRFToken')
app.config.setdefault(
'CSRF_METHODS', ['POST', 'PUT', 'PATCH', 'DELETE'])
app.config.setdefault('CSRF_TOKEN_LENGTH', 32)
Expand All @@ -186,6 +204,15 @@ def init_app(self, app):
app.config.setdefault(
'CSRF_COOKIE_SAMESITE',
app.config.get('SESSION_COOKIE_SAMESITE') or 'Lax')
# The token last for 24 hours, but the cookie for 7 days. This allows
# us to implement transparent token rotation during those 7 days. Note,
# that the token is automatically rotated on login, thus you can also
# change PERMANENT_SESSION_LIFETIME
app.config.setdefault('CSRF_TOKEN_EXPIRES_IN', 60*60*24)
# We allow usage of an expired CSRF token during this period. This way
# we can rotate the CSRF token without the user getting an CSRF error.
# Align with CSRF_COOKIE_MAX_AGE
app.config.setdefault('CSRF_TOKEN_GRACE_PERIOD', 60*60*24*7)

@app.after_request
def csrf_send(response):
Expand Down
79 changes: 73 additions & 6 deletions tests/test_csrf.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from invenio_rest.csrf import REASON_BAD_REFERER, REASON_BAD_TOKEN, \
REASON_INSECURE_REFERER, REASON_MALFORMED_REFERER, REASON_NO_CSRF_COOKIE, \
REASON_NO_REFERER, CSRFProtectMiddleware
REASON_NO_REFERER, CSRFProtectMiddleware, _get_new_csrf_token
from invenio_rest.ext import InvenioREST


Expand Down Expand Up @@ -195,27 +195,94 @@ def test_csrf_not_signed_correctly(csrf_app, csrf):
"""Test CSRF malicious attempt with passing malicious cookie and header."""
from itsdangerous import TimedJSONWebSignatureSerializer

from invenio_rest.errors import RESTCSRFError

with csrf_app.test_client() as client:
# try to pass our own signed cookie and header in an attempt to bypass
# the CSRF check
csrf_serializer = TimedJSONWebSignatureSerializer('my_secret')
csrf_serializer = TimedJSONWebSignatureSerializer(
'invalid_secret',
salt='invalid_salt',
)
malicious_cookie = csrf_serializer.dumps(
{'user': 'malicious'}, 'my_secret')
CSRF_COOKIE_NAME = csrf_app.config['CSRF_COOKIE_NAME']
CSRF_HEADER_NAME = csrf_app.config['CSRF_HEADER']
client.set_cookie('localhost', CSRF_COOKIE_NAME, malicious_cookie)

res = client.post(
'/csrf-protected',
data=json.dumps(dict(foo='bar')),
content_type='application/json',
headers={
'X-CSRF-Token': malicious_cookie
CSRF_HEADER_NAME: malicious_cookie
},
)

assert res.json['message'] == RESTCSRFError.description
assert res.json['message'] == REASON_BAD_TOKEN
assert res.status_code == 400


def test_csrf_token_rotation(csrf_app, csrf):
"""Test CSRF token rotation."""
from itsdangerous import TimedJSONWebSignatureSerializer

with csrf_app.test_client() as client:
CSRF_COOKIE_NAME = csrf_app.config['CSRF_COOKIE_NAME']
CSRF_HEADER_NAME = csrf_app.config['CSRF_HEADER']

# Token in grace period - succeeds but token gets rotated
expired_token = _get_new_csrf_token(expires_in=-1)
client.set_cookie('localhost', CSRF_COOKIE_NAME, expired_token)
old_cookie = {
cookie.name: cookie for cookie in client.cookie_jar}['csrftoken']
res = client.post(
'/csrf-protected',
data=json.dumps(dict(foo='bar')),
content_type='application/json',
headers={
CSRF_HEADER_NAME: expired_token
},
)
assert res.status_code == 200
# Token was rotated and new requests succeeds
new_cookie = {
cookie.name: cookie for cookie in client.cookie_jar}['csrftoken']
assert new_cookie.value != old_cookie.value
res = client.post(
'/csrf-protected',
data=json.dumps(dict(foo='bar')),
content_type='application/json',
headers={
CSRF_HEADER_NAME: new_cookie.value
},
)
assert res.status_code == 200
# Subsequent requests doesn't rotate CSRF token
res = client.post(
'/csrf-protected',
data=json.dumps(dict(foo='bar')),
content_type='application/json',
headers={
CSRF_HEADER_NAME: new_cookie.value
},
)
last_cookie = {
cookie.name: cookie for cookie in client.cookie_jar}['csrftoken']
assert new_cookie.value == last_cookie.value
assert res.status_code == 200

# Token outside grace period
# - Hack to have a negative grace period to force the error.
csrf_app.config['CSRF_TOKEN_GRACE_PERIOD'] = -10000
expired_token = _get_new_csrf_token(expires_in=-60*60*24*14)
client.set_cookie('localhost', CSRF_COOKIE_NAME, expired_token)
res = client.post(
'/csrf-protected',
data=json.dumps(dict(foo='bar')),
content_type='application/json',
headers={
CSRF_HEADER_NAME: expired_token
},
)
assert res.status_code == 400


Expand Down

0 comments on commit 3e2a4d0

Please sign in to comment.