-
Notifications
You must be signed in to change notification settings - Fork 0
/
mixins.py
172 lines (141 loc) · 5.89 KB
/
mixins.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
from __future__ import annotations
from abc import ABCMeta
from functools import wraps
from flask_jwt_extended import jwt_required, get_jwt_identity
from ..base import Identifiable, UserRole
from ..utils import get_or_pop
class AbstractAbortMixin:
def abort(
self,
error_code: int | str,
description: str,
*,
critical: bool = False,
**kwargs,
):
raise NotImplementedError
def doc_abort(
self,
error_code: int | str,
description: str,
*,
critical: bool = False,
):
raise NotImplementedError
def doc_aborts(
self,
*responses: tuple[int | str, str] | tuple[int | str, str, bool],
):
def doc_aborts_wrapper(function):
for response in responses:
function = self.doc_abort(
response[0],
response[1],
critical=response[2] if len(response) == 3 else False,
)(function)
return function
return doc_aborts_wrapper
class DatabaseSearcherMixin(AbstractAbortMixin, metaclass=ABCMeta):
def database_searcher(
self,
identifiable: type[Identifiable],
*,
input_field_name: str = None,
result_field_name: str = None,
check_only: bool = False,
error_code: int | str = " 404",
):
"""
- Uses incoming id argument to find something :class:`Identifiable` in the database.
- If the entity wasn't found, will return a 404 response, which is documented automatically.
- Can pass (entity's id or entity) and session objects to the decorated function.
:param identifiable: identifiable to search for
:param input_field_name: overrides default name of a parameter to search by
[default: identifiable.__name__.lower() + "_id"]
:param result_field_name: overrides default name of found object [default: identifiable.__name__.lower()]
:param check_only: (default: False) if True, checks if entity exists and passes id to the decorated function
:param error_code: (default: " 404") an override for the documentation code for a not-found error
"""
if input_field_name is None:
input_field_name = identifiable.__name__.lower() + "_id"
if result_field_name is None:
result_field_name = identifiable.__name__.lower()
int_error_code: int = int(error_code)
# TODO redo doc_abort & abort to handle this automagically
def searcher_wrapper(function):
@self.doc_abort(error_code, identifiable.not_found_text, critical=True)
@wraps(function)
def searcher_inner(*args, **kwargs):
target_id: int = get_or_pop(kwargs, input_field_name, check_only)
if (result := identifiable.find_by_id(target_id)) is None:
self.abort(int_error_code, identifiable.not_found_text)
if not check_only:
kwargs[result_field_name] = result
return function(*args, **kwargs)
return searcher_inner
return searcher_wrapper
class JWTAuthorizerMixin(AbstractAbortMixin, metaclass=ABCMeta):
auth_errors: list[tuple[int | str, str, bool]] = [
("401 ", "JWTError", True),
("422 ", "InvalidJWT", True),
]
def _get_identity(self) -> dict | None:
try:
jwt: dict = get_jwt_identity()
except Exception:
return None
if not isinstance(jwt, dict):
return None
return jwt
@staticmethod
def with_required_jwt(**kwargs):
return jwt_required(**kwargs)
@staticmethod
def with_optional_jwt(**kwargs):
return jwt_required(optional=True, **kwargs)
def jwt_authorizer(
self,
role: type[UserRole],
auth_name: str = "",
*,
result_field_name: str = None,
optional: bool = False,
check_only: bool = False,
):
"""
- Authorizes user by JWT-token.
- If token is missing or is not processable, falls back on flask-jwt-extended error handlers.
- If user doesn't exist or doesn't have the role required, sends the corresponding response.
- All error responses are added to the documentation automatically.
- Can pass user and session objects to the decorated function.
:param role: role to expect
:param auth_name: which identity to use for searching. "" is the default for single-auth setups
:param optional: (default: False)
:param check_only: (default: False) if True, user object won't be passed to the decorated function
:param result_field_name: overrides default name of found object [default: role.__name__.lower()]
"""
auth_errors = self.auth_errors.copy()
auth_errors.append(role.unauthorized_error + (True,))
if result_field_name is None:
result_field_name = role.__name__.lower()
def authorizer_wrapper(function):
@self.doc_aborts(*auth_errors)
@jwt_required(optional=optional)
@wraps(function)
def authorizer_inner(*args, **kwargs):
if (
(t := self._get_identity()) is None
or (identity := t.get(auth_name, None)) is None
):
if optional:
kwargs[role.__name__.lower()] = None
return function(*args, **kwargs)
self.abort(*role.unauthorized_error)
result = role.find_by_identity(identity)
if result is None:
self.abort(*role.unauthorized_error)
if not check_only:
kwargs[result_field_name] = result
return function(*args, **kwargs)
return authorizer_inner
return authorizer_wrapper