/
views.py
337 lines (269 loc) · 11.9 KB
/
views.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
import logging
import warnings
try:
from urllib.parse import urljoin, urlencode, urlparse # python 3x
except ImportError:
from urllib import urlencode # python 2x
from urlparse import urljoin, urlparse
from django.conf import settings
from django.http import HttpResponse
from django.utils.decorators import method_decorator
from django.utils.encoding import iri_to_uri
from django.views.decorators.cache import never_cache
from django.views.decorators.csrf import csrf_protect
from social_django.utils import psa, STORAGE
from social_django.views import _do_login as social_auth_login
from social_core.backends.oauth import BaseOAuth1
from social_core.utils import get_strategy, parse_qs, user_is_authenticated, setting_name
from social_core.exceptions import AuthException
from rest_framework.generics import GenericAPIView
from rest_framework.response import Response
from rest_framework import status
from rest_framework.authentication import TokenAuthentication
from rest_framework.permissions import AllowAny
from requests.exceptions import HTTPError
from .serializers import (
JWTPairSerializer,
JWTSerializer,
JWTSlidingSerializer,
KnoxSerializer,
OAuth1InputSerializer,
OAuth2InputSerializer,
TokenSerializer,
UserJWTSerializer,
UserJWTSlidingSerializer,
UserKnoxSerializer,
UserJWTPairSerializer,
UserSerializer,
UserTokenSerializer,
)
logger = logging.getLogger(__name__)
REDIRECT_URI = getattr(settings, 'REST_SOCIAL_OAUTH_REDIRECT_URI', '/')
DOMAIN_FROM_ORIGIN = getattr(settings, 'REST_SOCIAL_DOMAIN_FROM_ORIGIN', True)
LOG_AUTH_EXCEPTIONS = getattr(settings, 'REST_SOCIAL_LOG_AUTH_EXCEPTIONS', True)
STRATEGY = getattr(settings, setting_name('STRATEGY'), 'rest_social_auth.strategy.DRFStrategy')
def load_strategy(request=None):
return get_strategy(STRATEGY, STORAGE, request)
@psa(REDIRECT_URI, load_strategy=load_strategy)
def decorate_request(request, backend):
pass
class BaseSocialAuthView(GenericAPIView):
"""
View will login or signin (create) the user from social oauth2.0 provider.
**Input** (default serializer_class_in):
{
"provider": "facebook",
"code": "AQBPBBTjbdnehj51"
}
+ optional
"redirect_uri": "/relative/or/absolute/redirect/uri"
**Output**:
user data in serializer_class format
"""
oauth1_serializer_class_in = OAuth1InputSerializer
oauth2_serializer_class_in = OAuth2InputSerializer
serializer_class = None
permission_classes = (AllowAny, )
def oauth_v1(self):
assert hasattr(self.request, 'backend'), 'Don\'t call this method before decorate_request'
return isinstance(self.request.backend, BaseOAuth1)
def get_serializer_class_in(self):
if self.oauth_v1():
return self.oauth1_serializer_class_in
return self.oauth2_serializer_class_in
def get_serializer_in(self, *args, **kwargs):
"""
Return the serializer instance that should be used for validating and
deserializing input, and for serializing output.
"""
serializer_class = self.get_serializer_class_in()
kwargs['context'] = self.get_serializer_context()
return serializer_class(*args, **kwargs)
def get_serializer_in_data(self):
"""
Compile the incoming data into a form fit for the serializer_in class.
:return: Data for serializer in the form of a dictionary with 'provider' and 'code' keys.
"""
return self.request.data.copy()
@method_decorator(never_cache)
def post(self, request, *args, **kwargs):
input_data = self.get_serializer_in_data()
provider_name = self.get_provider_name(input_data)
if not provider_name:
return self.respond_error("Provider is not specified")
self.set_input_data(request, input_data)
decorate_request(request, provider_name)
serializer_in = self.get_serializer_in(data=input_data)
if self.oauth_v1() and request.backend.OAUTH_TOKEN_PARAMETER_NAME not in input_data:
# oauth1 first stage (1st is get request_token, 2nd is get access_token)
manual_redirect_uri = self.request.auth_data.pop('redirect_uri', None)
manual_redirect_uri = self.get_redirect_uri(manual_redirect_uri)
if manual_redirect_uri:
self.request.backend.redirect_uri = manual_redirect_uri
request_token = parse_qs(request.backend.set_unauthorized_token())
return Response(request_token)
serializer_in.is_valid(raise_exception=True)
try:
user = self.get_object()
except (AuthException, HTTPError) as e:
return self.respond_error(e)
if isinstance(user, HttpResponse):
# error happened and pipeline returned HttpResponse instead of user
return user
resp_data = self.get_serializer(instance=user)
self.do_login(request.backend, user)
return Response(resp_data.data)
def get_object(self):
user = self.request.user
manual_redirect_uri = self.request.auth_data.pop('redirect_uri', None)
manual_redirect_uri = self.get_redirect_uri(manual_redirect_uri)
if manual_redirect_uri:
self.request.backend.redirect_uri = manual_redirect_uri
elif DOMAIN_FROM_ORIGIN:
origin = self.request.strategy.request.META.get('HTTP_ORIGIN')
if origin:
relative_path = urlparse(self.request.backend.redirect_uri).path
url = urlparse(origin)
origin_scheme_host = "%s://%s" % (url.scheme, url.netloc)
location = urljoin(origin_scheme_host, relative_path)
self.request.backend.redirect_uri = iri_to_uri(location)
is_authenticated = user_is_authenticated(user)
user = is_authenticated and user or None
# skip checking state by setting following params to False
# it is responsibility of front-end to check state
# TODO: maybe create an additional resource, where front-end will
# store the state before making a call to oauth provider
# so server can save it in session and consequently check it before
# sending request to acquire access token.
# In case of token authentication we need a way to store an anonymous
# session to do it.
self.request.backend.REDIRECT_STATE = False
self.request.backend.STATE_PARAMETER = False
if self.oauth_v1():
self.save_token_param_in_session()
user = self.request.backend.complete(user=user)
return user
def save_token_param_in_session(self):
"""
Save token param in strategy's session.
This method will allow to use token auth with OAuth1 even if session is not enabled in
django settings (social_core expects that session is enabled).
"""
backend = self.request.backend
session_token_name = backend.name + backend.UNATHORIZED_TOKEN_SUFIX
session = self.request.strategy.session
if (
(isinstance(session, dict) and session_token_name not in session) or
not session.exists(session_token_name)
):
oauth1_token_param = backend.data.get(backend.OAUTH_TOKEN_PARAMETER_NAME)
session[session_token_name] = [
urlencode({
backend.OAUTH_TOKEN_PARAMETER_NAME: oauth1_token_param,
'oauth_token_secret': backend.data.get('oauth_token_secret')
})
]
def do_login(self, backend, user):
"""
Do login action here.
For example in case of session authentication store the session in
cookies.
"""
def set_input_data(self, request, auth_data):
"""
auth_data will be used used as request_data in strategy
"""
request.auth_data = auth_data
def get_redirect_uri(self, manual_redirect_uri):
if not manual_redirect_uri:
manual_redirect_uri = getattr(
settings, 'REST_SOCIAL_OAUTH_ABSOLUTE_REDIRECT_URI', None)
return manual_redirect_uri
def get_provider_name(self, input_data):
if self.kwargs.get('provider'):
return self.kwargs['provider']
return input_data.get('provider')
def respond_error(self, error):
if isinstance(error, Exception):
if not isinstance(error, AuthException) or LOG_AUTH_EXCEPTIONS:
self.log_exception(error)
else:
logger.error(error)
return Response(status=status.HTTP_400_BAD_REQUEST)
def log_exception(self, error):
err_msg = error.args[0] if error.args else ''
if getattr(error, 'response', None) is not None:
try:
err_data = error.response.json()
except (ValueError, AttributeError):
logger.error(u'%s; %s', error, err_msg)
else:
logger.error(u'%s; %s; %s', error, err_msg, err_data)
else:
logger.exception(u'%s; %s', error, err_msg)
class SocialSessionAuthView(BaseSocialAuthView):
serializer_class = UserSerializer
def do_login(self, backend, user):
social_auth_login(backend, user, user.social_user)
@method_decorator(csrf_protect) # just to be sure csrf is not disabled
def post(self, request, *args, **kwargs):
return super(SocialSessionAuthView, self).post(request, *args, **kwargs)
class SocialTokenOnlyAuthView(BaseSocialAuthView):
serializer_class = TokenSerializer
authentication_classes = (TokenAuthentication, )
class SocialTokenUserAuthView(BaseSocialAuthView):
serializer_class = UserTokenSerializer
authentication_classes = (TokenAuthentication, )
class KnoxAuthMixin(object):
def get_authenticators(self):
try:
from knox.auth import TokenAuthentication
except ImportError:
warnings.warn(
'django-rest-knox must be installed for Knox authentication',
ImportWarning,
)
raise
return [TokenAuthentication()]
class SocialKnoxOnlyAuthView(KnoxAuthMixin, BaseSocialAuthView):
serializer_class = KnoxSerializer
class SocialKnoxUserAuthView(KnoxAuthMixin, BaseSocialAuthView):
serializer_class = UserKnoxSerializer
class SimpleJWTAuthMixin(object):
def get_authenticators(self):
try:
from rest_framework_simplejwt.authentication import JWTAuthentication
except ImportError:
warnings.warn(
'django-rest-framework-simplejwt must be installed for JWT authentication',
ImportWarning,
)
raise
return [JWTAuthentication()]
class SocialJWTPairOnlyAuthView(SimpleJWTAuthMixin, BaseSocialAuthView):
serializer_class = JWTPairSerializer
class SocialJWTPairUserAuthView(SimpleJWTAuthMixin, BaseSocialAuthView):
serializer_class = UserJWTPairSerializer
class SocialJWTSlidingOnlyAuthView(SimpleJWTAuthMixin, BaseSocialAuthView):
serializer_class = JWTSlidingSerializer
class SocialJWTSlidingUserAuthView(SimpleJWTAuthMixin, BaseSocialAuthView):
serializer_class = UserJWTSlidingSerializer
# Deprecated views
class JWTAuthMixin(object):
def get_authenticators(self):
warnings.warn(
'Support of djangorestframework-jwt will be removed in 5.0.0 version. '
'Use rest_framework_simplejwt instead.',
DeprecationWarning,
)
try:
from rest_framework_jwt.authentication import JSONWebTokenAuthentication
except ImportError:
warnings.warn('djangorestframework-jwt must be installed for JWT authentication',
ImportWarning)
raise
return [JSONWebTokenAuthentication()]
class SocialJWTOnlyAuthView(JWTAuthMixin, BaseSocialAuthView):
serializer_class = JWTSerializer
class SocialJWTUserAuthView(JWTAuthMixin, BaseSocialAuthView):
serializer_class = UserJWTSerializer