This repository has been archived by the owner on Sep 28, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 19
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #12 from postatum/91771308_security_schemes
Implement token authn. Add basic views, models for authn.
- Loading branch information
Showing
5 changed files
with
384 additions
and
1 deletion.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,183 @@ | ||
import uuid | ||
import logging | ||
|
||
import cryptacular.bcrypt | ||
from pyramid.security import authenticated_userid, forget | ||
|
||
from nefertari.json_httpexceptions import * | ||
from nefertari import engine as eng | ||
|
||
log = logging.getLogger(__name__) | ||
crypt = cryptacular.bcrypt.BCRYPTPasswordManager() | ||
|
||
|
||
def lower_strip(value): | ||
return (value or '').lower().strip() | ||
|
||
|
||
def crypt_password(password): | ||
""" Crypt :password: if it's not crypted yet """ | ||
if password and not crypt.match(password): | ||
password = unicode(crypt.encode(password)) | ||
return password | ||
|
||
|
||
class AuthUser(eng.BaseDocument): | ||
""" Class that is meant to be User class in Auth system. | ||
Implements basic operations to support Pyramid Ticket-based and custom | ||
ApiKey token-based authentication. | ||
""" | ||
__tablename__ = 'nefertari_authuser' | ||
|
||
id = eng.IdField(primary_key=True) | ||
username = eng.StringField( | ||
min_length=1, max_length=50, unique=True, | ||
required=True, processors=[lower_strip]) | ||
email = eng.StringField( | ||
unique=True, required=True, processors=[lower_strip]) | ||
password = eng.StringField( | ||
min_length=3, required=True, processors=[crypt_password]) | ||
groups = eng.ListField( | ||
item_type=eng.StringField, | ||
choices=['admin', 'user'], default=['user']) | ||
|
||
uid = property(lambda self: str(self.id)) | ||
|
||
def verify_password(self, password): | ||
return crypt.check(self.password, password) | ||
|
||
@classmethod | ||
def get_api_credentials(cls, userid, request): | ||
""" Get username and api token for user with id of :userid: """ | ||
try: | ||
user = cls.get_resource(id=userid) | ||
except Exception as ex: | ||
log.error(unicode(ex)) | ||
forget(request) | ||
if user: | ||
return user.username, user.api_key.token | ||
return None, None | ||
|
||
@classmethod | ||
def authenticate_token(cls, username, token, request): | ||
""" Get user's groups if user with :username: exists and his api key | ||
token equals to :token: | ||
""" | ||
try: | ||
user = cls.get_resource(username=username) | ||
except Exception as ex: | ||
log.error(unicode(ex)) | ||
forget(request) | ||
if user and user.api_key.token == token: | ||
return ['g:%s' % g for g in user.groups] | ||
|
||
@classmethod | ||
def authenticate(cls, params): | ||
""" Authenticate user with login and password from :params: """ | ||
login = params['login'].lower().strip() | ||
key = 'email' if '@' in login else 'username' | ||
try: | ||
user = cls.get_resource(**{key: login}) | ||
except Exception as ex: | ||
log.error(unicode(ex)) | ||
success = False | ||
user = None | ||
|
||
if user: | ||
password = params.get('password', None) | ||
success = (password and user.verify_password(password)) | ||
return success, user | ||
|
||
@classmethod | ||
def groupfinder(cls, userid, request): | ||
""" Return group identifiers of user with id :userid: """ | ||
try: | ||
user = cls.get_resource(id=userid) | ||
except Exception as ex: | ||
log.error(unicode(ex)) | ||
forget(request) | ||
else: | ||
if user: | ||
return ['g:%s' % g for g in user.groups] | ||
|
||
@classmethod | ||
def create_account(cls, params): | ||
""" Create AuthUser instance with data from :params: """ | ||
user_params = dictset(params).subset( | ||
['username', 'email', 'password']) | ||
try: | ||
return cls.get_or_create( | ||
email=user_params['email'], | ||
defaults=user_params) | ||
except JHTTPBadRequest: | ||
raise JHTTPBadRequest('Failed to create account.') | ||
|
||
@classmethod | ||
def get_auth_user_by_id(cls, request): | ||
""" Get user by ID """ | ||
_id = authenticated_userid(request) | ||
if _id: | ||
return cls.get_resource(id=_id) | ||
|
||
@classmethod | ||
def get_auth_user_by_name(cls, request): | ||
""" Get user by username """ | ||
username = authenticated_userid(request) | ||
if username: | ||
return cls.get_resource(username=username) | ||
|
||
|
||
def apikey_token(): | ||
""" Generate ApiKey.token using uuid library. """ | ||
return uuid.uuid4().hex.replace('-', '') | ||
|
||
|
||
def apikey_model(user_model): | ||
""" Generate ApiKey model class and connect it with :user_model:. | ||
ApiKey is generated having relationship to user model class :user_model: | ||
and has One-to-One relationship with backreference. | ||
ApiKey is setup to be auto-generated when new :user_model: is created. | ||
Returns ApiKey document class. If ApiKey is already defined, it is not | ||
generated again. | ||
Arguments: | ||
:user_model: Class that represents user model for which api keys will | ||
be generated and with which ApiKey will have relationship. | ||
""" | ||
try: | ||
return eng.get_document_cls('ApiKey') | ||
except ValueError: | ||
pass | ||
|
||
fk_kwargs = { | ||
'ref_column': None, | ||
} | ||
if hasattr(user_model, '__tablename__'): | ||
fk_kwargs['ref_column'] = '.'.join([user_model.__tablename__, 'id']) | ||
fk_kwargs['ref_column_type'] = eng.IdField | ||
|
||
class ApiKey(eng.BaseDocument): | ||
__tablename__ = 'nefertari_apikey' | ||
|
||
id = eng.IdField(primary_key=True) | ||
token = eng.StringField(default=apikey_token) | ||
user = eng.Relationship( | ||
document=user_model.__name__, | ||
uselist=False, | ||
backref_name='api_key', | ||
backref_uselist=False) | ||
user_id = eng.ForeignKeyField( | ||
ref_document=user_model.__name__, | ||
**fk_kwargs) | ||
|
||
def reset_token(self): | ||
self.update({'token': apikey_token()}) | ||
return self.token | ||
|
||
# Setup ApiKey autogeneration on :user_model: creation | ||
ApiKey.autogenerate_for(user_model, 'user') | ||
|
||
return ApiKey |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
from pyramid.authentication import CallbackAuthenticationPolicy | ||
|
||
from nefertari import engine as eng | ||
from .models import apikey_model | ||
|
||
|
||
class ApiKeyAuthenticationPolicy(CallbackAuthenticationPolicy): | ||
""" ApiKey authentication policy. | ||
Relies of `Authorization` header being used on request, e.g.: | ||
`Authorization: ApiKey username:token` | ||
To use this policy, instantiate it with required arguments, as described | ||
in `__init__` method and register it with Pyramid's `Configurator.set_authentication_policy`. | ||
You may also find usefull `nefertari.authentication.views.TokenAuthenticationView` | ||
view which offers basic functionality to create, claim, reset token. | ||
""" | ||
def __init__(self, user_model, check=None, credentials_callback=None): | ||
""" Init the policy. | ||
Arguments: | ||
:user_model: String name or class of a User model for which ApiKey | ||
model to be generated | ||
:check: A callback passed the username, api_key and the request, | ||
expected to return None if user doesn't exist or a sequence of | ||
principal identifiers (possibly empty) if the user does exist. | ||
If callback is None, the userid will be assumed to exist with | ||
no principals. Optional. | ||
:credentials_callback: A callback passed the userid, expected to | ||
return tuple containing 2 elements: username and user's api key. | ||
Is used to generate 'WWW-Authenticate' header with a value of | ||
valid 'Authorization' request header that should be used to | ||
perform requests. | ||
""" | ||
self.user_model = user_model | ||
if isinstance(self.user_model, basestring): | ||
self.user_model = eng.get_document_cls(self.user_model) | ||
apikey_model(self.user_model) | ||
|
||
self.check = check | ||
self.credentials_callback = credentials_callback | ||
super(ApiKeyAuthenticationPolicy, self).__init__() | ||
|
||
def remember(self, request, userid, **kw): | ||
""" Return 'WWW-Authenticate' header with a value that should be used | ||
in 'Authorization' header. | ||
""" | ||
if self.credentials_callback: | ||
username, token = self.credentials_callback(userid, request) | ||
api_key = 'ApiKey {}:{}'.format(username, token) | ||
return [('WWW-Authenticate', api_key)] | ||
|
||
def forget(self, request): | ||
""" Returns challenge headers. This should be attached to a response | ||
to indicate that credentials are required.""" | ||
return [('WWW-Authenticate', 'ApiKey realm="%s"' % self.realm)] | ||
|
||
def unauthenticated_userid(self, request): | ||
""" Username parsed from the ``Authorization`` request header.""" | ||
credentials = self._get_credentials(request) | ||
if credentials: | ||
return credentials[0] | ||
|
||
def callback(self, username, request): | ||
""" Having :username: return user's identifiers or None. """ | ||
credentials = self._get_credentials(request) | ||
if credentials: | ||
username, api_key = credentials | ||
if self.check: | ||
return self.check(username, api_key, request) | ||
|
||
def _get_credentials(self, request): | ||
""" Extract username and api key token from 'Authorization' header """ | ||
authorization = request.headers.get('Authorization') | ||
if not authorization: | ||
return None | ||
try: | ||
authmeth, authbytes = authorization.split(' ', 1) | ||
except ValueError: # not enough values to unpack | ||
return None | ||
if authmeth.lower() != 'apikey': | ||
return None | ||
|
||
try: | ||
auth = authbytes.decode('utf-8') | ||
except UnicodeDecodeError: | ||
auth = authbytes.decode('latin-1') | ||
|
||
try: | ||
username, api_key = auth.split(':', 1) | ||
except ValueError: # not enough values to unpack | ||
return None | ||
return username, api_key |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
from pyramid.security import remember, forget | ||
|
||
from nefertari.json_httpexceptions import * | ||
from nefertari.view import BaseView | ||
from .models import AuthUser | ||
|
||
|
||
class TicketAuthenticationView(BaseView): | ||
""" View for auth operations to use with Pyramid ticket-based auth. | ||
`login` (POST): Login the user with 'login' and 'password' | ||
`logout`: Logout user | ||
""" | ||
_model_class = AuthUser | ||
|
||
def register(self): | ||
""" Register new user by POSTing all required data. | ||
""" | ||
user, created = self._model_class.create_account(self._params) | ||
|
||
if not created: | ||
raise JHTTPConflict('Looks like you already have an account.') | ||
|
||
headers = remember(self.request, user.uid) | ||
return JHTTPOk('Registered', headers=headers) | ||
|
||
def login(self, **params): | ||
self._params.update(params) | ||
next = self._params.get('next', '') | ||
login_url = self.request.route_url('login') | ||
if next.startswith(login_url): | ||
next = '' # never use the login form itself as next | ||
|
||
unauthorized_url = self._params.get('unauthorized', None) | ||
success, user = self._model_class.authenticate(self._params) | ||
|
||
if success: | ||
headers = remember(self.request, user.uid) | ||
if next: | ||
return JHTTPOk('Logged in', headers=headers) | ||
else: | ||
return JHTTPOk('Logged in', headers=headers) | ||
if user: | ||
if unauthorized_url: | ||
return JHTTPUnauthorized(location=unauthorized_url+'?error=1') | ||
|
||
raise JHTTPUnauthorized('Failed to Login.') | ||
else: | ||
raise JHTTPNotFound('User not found') | ||
|
||
def logout(self): | ||
headers = forget(self.request) | ||
return JHTTPOk('Logged out', headers=headers) | ||
|
||
|
||
class TokenAuthenticationView(BaseView): | ||
""" View for auth operations to use with | ||
`nefertari.authentication.policies.ApiKeyAuthenticationPolicy` | ||
token-based auth. Implements methods: | ||
""" | ||
_model_class = AuthUser | ||
|
||
def register(self): | ||
""" Register new user by POSTing all required data. | ||
User's `Authorization` header value is returned in `WWW-Authenticate` | ||
header. | ||
""" | ||
user, created = self._model_class.create_account(self._params) | ||
|
||
if not created: | ||
raise JHTTPConflict('Looks like you already have an account.') | ||
|
||
headers = remember(self.request, user.uid) | ||
return JHTTPOk('Registered', headers=headers) | ||
|
||
def claim_token(self, **params): | ||
"""Claim current token by POSTing 'login' and 'password'. | ||
User's `Authorization` header value is returned in `WWW-Authenticate` | ||
header. | ||
""" | ||
self._params.update(params) | ||
success, self.user = self._model_class.authenticate(self._params) | ||
|
||
if success: | ||
headers = remember(self.request, self.user.uid) | ||
return JHTTPOk('Token claimed', headers=headers) | ||
if self.user: | ||
raise JHTTPUnauthorized('Wrong login or password') | ||
else: | ||
raise JHTTPNotFound('User not found') | ||
|
||
def token_reset(self, **params): | ||
""" Reset current token by POSTing 'login' and 'password'. | ||
User's `Authorization` header value is returned in `WWW-Authenticate` | ||
header. | ||
""" | ||
response = self.claim_token(**params) | ||
if not self.user: | ||
return response | ||
|
||
self.user.api_key.reset_token() | ||
headers = remember(self.request, self.user.uid) | ||
return JHTTPOk('Registered', headers=headers) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters