-
-
Notifications
You must be signed in to change notification settings - Fork 785
/
permissions.py
183 lines (140 loc) · 6.43 KB
/
permissions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
import logging
from django.core.exceptions import ImproperlyConfigured
from rest_framework.exceptions import PermissionDenied
from rest_framework.permissions import SAFE_METHODS, BasePermission, IsAuthenticated
from ...settings import oauth2_settings
from .authentication import OAuth2Authentication
log = logging.getLogger("oauth2_provider")
class TokenHasScope(BasePermission):
"""
The request is authenticated as a user and the token used has the right scope
"""
def has_permission(self, request, view):
token = request.auth
if not token:
return False
if hasattr(token, "scope"): # OAuth 2
required_scopes = self.get_scopes(request, view)
log.debug("Required scopes to access resource: {0}".format(required_scopes))
if token.is_valid(required_scopes):
return True
# Provide information about required scope?
include_required_scope = (
oauth2_settings.ERROR_RESPONSE_WITH_SCOPES
and required_scopes
and not token.is_expired()
and not token.allow_scopes(required_scopes)
)
if include_required_scope:
self.message = {
"detail": PermissionDenied.default_detail,
"required_scopes": list(required_scopes),
}
return False
assert False, (
"TokenHasScope requires the"
"`oauth2_provider.rest_framework.OAuth2Authentication` authentication "
"class to be used."
)
def get_scopes(self, request, view):
try:
return getattr(view, "required_scopes")
except AttributeError:
raise ImproperlyConfigured(
"TokenHasScope requires the view to define the required_scopes attribute"
)
class TokenHasReadWriteScope(TokenHasScope):
"""
The request is authenticated as a user and the token used has the right scope
"""
def get_scopes(self, request, view):
try:
required_scopes = super().get_scopes(request, view)
except ImproperlyConfigured:
required_scopes = []
# TODO: code duplication!! see dispatch in ReadWriteScopedResourceMixin
if request.method.upper() in SAFE_METHODS:
read_write_scope = oauth2_settings.READ_SCOPE
else:
read_write_scope = oauth2_settings.WRITE_SCOPE
return required_scopes + [read_write_scope]
class TokenHasResourceScope(TokenHasScope):
"""
The request is authenticated as a user and the token used has the right scope
"""
def get_scopes(self, request, view):
try:
view_scopes = super().get_scopes(request, view)
except ImproperlyConfigured:
view_scopes = []
if request.method.upper() in SAFE_METHODS:
scope_type = oauth2_settings.READ_SCOPE
else:
scope_type = oauth2_settings.WRITE_SCOPE
required_scopes = ["{}:{}".format(scope, scope_type) for scope in view_scopes]
return required_scopes
class IsAuthenticatedOrTokenHasScope(BasePermission):
"""
The user is authenticated using some backend or the token has the right scope
This only returns True if the user is authenticated, but not using a token
or using a token, and the token has the correct scope.
This is useful when combined with the DjangoModelPermissions to allow people browse
the browsable api's if they log in using the a non token bassed middleware,
and let them access the api's using a rest client with a token
"""
def has_permission(self, request, view):
is_authenticated = IsAuthenticated().has_permission(request, view)
oauth2authenticated = False
if is_authenticated:
oauth2authenticated = isinstance(request.successful_authenticator, OAuth2Authentication)
token_has_scope = TokenHasScope()
return (is_authenticated and not oauth2authenticated) or token_has_scope.has_permission(request, view)
class TokenMatchesOASRequirements(BasePermission):
"""
:attr:alternate_required_scopes: dict keyed by HTTP method name with value: iterable alternate scope lists
This fulfills the [Open API Specification (OAS; formerly Swagger)](https://www.openapis.org/)
list of alternative Security Requirements Objects for oauth2 or openIdConnect:
When a list of Security Requirement Objects is defined on the Open API object or Operation Object,
only one of Security Requirement Objects in the list needs to be satisfied to authorize the request.
[1](https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.0.md#securityRequirementObject)
For each method, a list of lists of allowed scopes is tried in order and the first to match succeeds.
@example
required_alternate_scopes = {
'GET': [['read']],
'POST': [['create1','scope2'], ['alt-scope3'], ['alt-scope4','alt-scope5']],
}
TODO: DRY: subclass TokenHasScope and iterate over values of required_scope?
"""
def has_permission(self, request, view):
token = request.auth
if not token:
return False
if hasattr(token, "scope"): # OAuth 2
required_alternate_scopes = self.get_required_alternate_scopes(request, view)
m = request.method.upper()
if m in required_alternate_scopes:
log.debug(
"Required scopes alternatives to access resource: {0}".format(
required_alternate_scopes[m]
)
)
for alt in required_alternate_scopes[m]:
if token.is_valid(alt):
return True
return False
else:
log.warning("no scope alternates defined for method {0}".format(m))
return False
assert False, (
"TokenMatchesOASRequirements requires the"
"`oauth2_provider.rest_framework.OAuth2Authentication` authentication "
"class to be used."
)
def get_required_alternate_scopes(self, request, view):
try:
return getattr(view, "required_alternate_scopes")
except AttributeError:
raise ImproperlyConfigured(
"TokenMatchesOASRequirements requires the view to"
" define the required_alternate_scopes attribute"
)