This repository has been archived by the owner on Oct 24, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
/
base.py
168 lines (125 loc) · 4.7 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# -*- coding: utf-8 -*-
"""base api"""
from functools import partial
from datetime import datetime, timedelta
import inflection
import jwt as jwt_lib
from werkzeug.security import safe_str_cmp
from flask import current_app, make_response
from flask_security import utils, AnonymousUser
from flask_security.utils import md5
from flask_classy import FlaskView
from ..extensions import jwt, auth_datastore
from ..exceptions import ApplicationException
from .errors import api_exception_handler
from .decorators import token_auth_required, roles_required
from .representations.json import output_json
@jwt.authentication_handler
def jwt_authenticate(email, pwd):
"""Register jwt authentication handler
:param email: the provided email
:param pwd: the provided password
:return None or the matching user
"""
user = auth_datastore.find_users(email=email).first()
if user and utils.verify_and_update_password(pwd, user):
return user
@jwt.identity_handler
def jwt_load_user(payload):
"""Register jwt user handler
:param payload:
"""
user = auth_datastore.read_user(payload['sub'])
if user and safe_str_cmp(md5(user.password), payload['pwd']):
return user
return AnonymousUser()
@jwt.jwt_payload_handler
def jwt_make_payload(user, expiration_delta=None, leeway=None):
"""Register jwt payload handler
:param user:
"""
if expiration_delta:
# timedelta config supported only
max_expiration_delta = current_app.config['JWT_EXPIRATION_DELTA_MAX']
expiration_delta = expiration_delta if expiration_delta <= max_expiration_delta \
else max_expiration_delta
else:
# timedelta config supported only
expiration_delta = current_app.config['JWT_EXPIRATION_DELTA']
leeway = timedelta(seconds=leeway) if leeway else timedelta(
seconds=current_app.config['JWT_LEEWAY']) # int config as seconds
utc_now = datetime.utcnow()
return {
'sub': user.id,
'pwd': md5(user.password),
'iat': utc_now,
'exp': utc_now + expiration_delta + leeway
}
@jwt.jwt_encode_handler
def jwt_encode_payload(payload):
"""Register jwt encode handler
:param payload:
"""
return jwt_lib.encode(payload, current_app.config['JWT_SECRET_KEY'],
algorithm=current_app.config['JWT_ALGORITHM'])
@jwt.jwt_decode_handler
def jwt_decode_token(token):
"""Register jwt decode handler
:param token:
"""
return jwt_lib.decode(token, current_app.config['JWT_SECRET_KEY'],
algorithms=current_app.config['JWT_ALGORITHMS'])
@jwt.jwt_error_handler
def jwt_error(ex):
"""Register jwt error handler
:param ex:
:return:
"""
ex = ApplicationException(ex.error, description=ex.description, status_code=ex.status_code)
return api_exception_handler(ex)
# short cut of flask.make_response for making empty response
#: make_empty_response()
#: make_empty_response(201)
#: make_empty_response(201, {'header-key': 'value'})
make_empty_response = partial(make_response, '')
class Resource(FlaskView):
"""The base Resource class that other REST resources should extend from"""
trailing_slash = False
representations = {'application/json': output_json}
special_methods = {
'get': ['GET'],
'put': ['PUT'],
'patch': ['PATCH'],
'post': ['POST'],
'delete': ['DELETE'],
'index': ['GET'],
'create': ['POST'],
'read': ['GET'],
'update': ['PUT'],
'list': ['GET']
}
@classmethod
def get_route_base(cls):
if cls.route_base is None:
base_name = None
if cls.__name__.endswith('API'):
base_name = cls.__name__[:-3]
elif cls.__name__.endswith('Resource'):
base_name = cls.__name__[:-8]
if base_name is not None:
return inflection.dasherize(inflection.pluralize(inflection.underscore(base_name)))
return super(Resource, cls).get_route_base()
@classmethod
def build_route_name(cls, method_name):
if cls.__name__.endswith('API') or cls.__name__.endswith('Resource'):
if cls.route_base is None:
return cls.get_route_base() + ':%s' % method_name
else:
return cls.route_base + ':%s' % method_name
return super(Resource, cls).build_route_name(method_name)
class TokenRequiredResource(Resource):
"""The base resource class that requires token authentication"""
decorators = [token_auth_required()]
class AdminRoleRequiredResource(Resource):
"""The base resource class that requires admin role"""
decorators = [roles_required('admin'), token_auth_required()]