Skip to content

Commit

Permalink
shift to authlib for verification token generate/verify
Browse files Browse the repository at this point in the history
- support generated itsdangerous tokens for now
refs #147
  • Loading branch information
guruofgentoo committed Feb 21, 2022
1 parent 74ff5b2 commit e96ac2e
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 21 deletions.
74 changes: 53 additions & 21 deletions keg_auth/model/__init__.py
Expand Up @@ -2,17 +2,18 @@
import binascii
import hashlib
import json
import time

import arrow
import flask
import itsdangerous
import keg_elements.db.utils as dbutils
import passlib.hash
import passlib.pwd
import shortuuid
import sqlalchemy as sa
import sqlalchemy.orm as sa_orm
import sqlalchemy.sql as sa_sql
from authlib import jose
from blazeutils import tolist
from blazeutils.strings import randchars
from keg.db import db
Expand Down Expand Up @@ -262,20 +263,22 @@ def get_token_salt(self):
self.last_login_utc.to('UTC').isoformat() if self.last_login_utc else None
])

def get_token_serializer(self, expires_in):
"""
Create a JWT serializer using itsdangerous
:param expires_in: seconds from token creation until expiration
:return: TimedJSONWebSignatureSerializer instance
"""
return itsdangerous.TimedJSONWebSignatureSerializer(
flask.current_app.config['SECRET_KEY'],
algorithm_name='HS512',
expires_in=expires_in,
signer_kwargs={'digest_method': hashlib.sha512}
def get_token_signature(self, digest_method=hashlib.sha512):
base_key = (
self.get_token_salt()
+ 'signer'
+ flask.current_app.config.get('SECRET_KEY')
)
return digest_method(base_key.encode()).digest()

def get_token_payload(self, payload, expires_in):
now = int(time.time())
exp = now + expires_in
payload['iat'] = payload.get('iat', now)
payload['exp'] = payload.get('exp', exp)
return payload

def token_verify(self, token):
def token_verify(self, token, _use_legacy=False, _block_legacy=False):
"""
Verify a password reset token. The token is validated for:
* user identity
Expand All @@ -292,11 +295,41 @@ def token_verify(self, token):
if isinstance(token, str):
token = token.encode()

serializer = self.get_token_serializer(None)
"""
We used to use itsdangerous to generate/verify these JWT tokens. In version 2.1,
itsdangerous removed those wrappers, so we switched to authlib. A few key
differences need to be handled (temporarily) to support tokens generated with
itsdangerous:
- digest_method was supposed to be sha512, but due to a bug in ID it fell back to SHA1
- iat/exp claims were in the header generated by ID, not the payload
If we need to fall back to legacy mode due to sha512 not matching signature, we can
assume that the token was generated by ID (or by a test mimicing it).
"""
digest_method = hashlib.sha512 if not _use_legacy else hashlib.sha1

try:
payload = serializer.loads(token, salt=self.get_token_salt())
except itsdangerous.BadSignature:
payload = jose.jwt.decode(token, self.get_token_signature(digest_method))
if _use_legacy:
payload['iat'] = payload.header.get('iat')
payload['exp'] = payload.header.get('exp')
payload.validate()
except (
jose.errors.DecodeError,
jose.errors.ExpiredTokenError
):
return False
except jose.errors.BadSignatureError:
if not _use_legacy and not _block_legacy:
# bad sig could mean it's an itsdangerous token, try legacy mode
return self.token_verify(token, _use_legacy=True)
return False

# authlib treats iat/exp claims as optional. We need to make sure they were in
# the payload, and fail if not
if len({'iat', 'exp'} & set(payload.keys())) != 2:
return False

return payload['user_id'] == self.id

def token_generate(self):
Expand All @@ -306,13 +339,12 @@ def token_generate(self):
The value returned by this function must not be persisted.
:return: a string representation of the generated token
"""
serializer = self.get_token_serializer(
payload = self.get_token_payload(
{'user_id': self.id},
flask.current_app.config['KEGAUTH_TOKEN_EXPIRE_MINS'] * 60
)
payload = {
'user_id': self.id,
}
token = serializer.dumps(payload, salt=self.get_token_salt()).decode()
header = {'alg': 'HS512'}
token = jose.jwt.encode(header, payload, self.get_token_signature()).decode()

# Store the plain text version on this instance for ease of use. It will not get
# pesisted to the db, so no security conern.
Expand Down
21 changes: 21 additions & 0 deletions keg_auth/tests/test_model.py
@@ -1,7 +1,10 @@
# Using unicode_literals instead of adding 'u' prefix to all stings that go to SA.
from __future__ import unicode_literals
import base64
import hashlib
import string
import time
from authlib import jose
import arrow
import flask
from keg.db import db
Expand Down Expand Up @@ -155,6 +158,24 @@ def test_token_validation(self):
assert user.token_verify(token)
assert user.token_verify(user._token_plain)

def test_legacy_token(self):
"""
Mimic an itsdangerous token and validate it only verifies in legacy mode.
- sha1 signature instead of sha512
- iat/exp claims are in header, not payload
"""
user = ents.User.testing_create()
base_key = user.get_token_salt() + 'signer' + flask.current_app.config.get('SECRET_KEY')
signature = hashlib.sha1(base_key.encode()).digest()
now = int(time.time())
exp = now + (flask.current_app.config.get('KEGAUTH_TOKEN_EXPIRE_MINS') * 60)
header = {'alg': 'HS512', 'iat': now, 'exp': exp}
payload = {'user_id': user.id}
token = jose.jwt.encode(header, payload, signature)

assert user.token_verify(token)
assert not user.token_verify(token, _block_legacy=True)

def test_token_salt_info_changed(self):
def check_field(field, new_value):
user = ents.User.testing_create(last_login_utc=None)
Expand Down
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -31,6 +31,7 @@
include_package_data=True,
zip_safe=False,
install_requires=[
'authlib',
'bcrypt',
'markdown-it-py',

Expand Down

0 comments on commit e96ac2e

Please sign in to comment.