-
-
Notifications
You must be signed in to change notification settings - Fork 545
/
base.py
259 lines (223 loc) · 9.9 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
import time
from requests import ConnectionError, request
from ..exceptions import AuthFailed
from ..utils import SSLHttpAdapter, module_member, parse_qs, user_agent
class BaseAuth:
"""A authentication backend that authenticates the user based on
the provider response"""
name = "" # provider name, it's stored in database
supports_inactive_user = False # Django auth
ID_KEY = None
EXTRA_DATA = None
GET_ALL_EXTRA_DATA = False
REQUIRES_EMAIL_VALIDATION = False
SEND_USER_AGENT = False
SSL_PROTOCOL = None
def __init__(self, strategy, redirect_uri=None):
self.strategy = strategy
self.redirect_uri = redirect_uri
self.data = self.strategy.request_data()
self.redirect_uri = self.strategy.absolute_uri(self.redirect_uri)
def setting(self, name, default=None):
"""Return setting value from strategy"""
return self.strategy.setting(name, default=default, backend=self)
def start(self):
if self.uses_redirect():
return self.strategy.redirect(self.auth_url())
else:
return self.strategy.html(self.auth_html())
def complete(self, *args, **kwargs):
return self.auth_complete(*args, **kwargs)
def auth_url(self):
"""Must return redirect URL to auth provider"""
raise NotImplementedError("Implement in subclass")
def auth_html(self):
"""Must return login HTML content returned by provider"""
raise NotImplementedError("Implement in subclass")
def auth_complete(self, *args, **kwargs):
"""Completes login process, must return user instance"""
raise NotImplementedError("Implement in subclass")
def process_error(self, data):
"""Process data for errors, raise exception if needed.
Call this method on any override of auth_complete."""
pass
def authenticate(self, *args, **kwargs):
"""Authenticate user using social credentials
Authentication is made if this is the correct backend, backend
verification is made by kwargs inspection for current backend
name presence.
"""
# Validate backend and arguments. Require that the Social Auth
# response be passed in as a keyword argument, to make sure we
# don't match the username/password calling conventions of
# authenticate.
if (
"backend" not in kwargs
or kwargs["backend"].name != self.name
or "strategy" not in kwargs
or "response" not in kwargs
):
return None
self.strategy = kwargs.get("strategy") or self.strategy
self.redirect_uri = kwargs.get("redirect_uri") or self.redirect_uri
self.data = self.strategy.request_data()
kwargs.setdefault("is_new", False)
pipeline = self.strategy.get_pipeline(self)
args, kwargs = self.strategy.clean_authenticate_args(*args, **kwargs)
return self.pipeline(pipeline, *args, **kwargs)
def pipeline(self, pipeline, pipeline_index=0, *args, **kwargs):
out = self.run_pipeline(pipeline, pipeline_index, *args, **kwargs)
if not isinstance(out, dict):
return out
user = out.get("user")
if user:
user.social_user = out.get("social")
user.is_new = out.get("is_new")
return user
def disconnect(self, *args, **kwargs):
pipeline = self.strategy.get_disconnect_pipeline(self)
kwargs["name"] = self.name
kwargs["user_storage"] = self.strategy.storage.user
return self.run_pipeline(pipeline, *args, **kwargs)
def run_pipeline(self, pipeline, pipeline_index=0, *args, **kwargs):
out = kwargs.copy()
out.setdefault("strategy", self.strategy)
out.setdefault("backend", out.pop(self.name, None) or self)
out.setdefault("request", self.strategy.request_data())
out.setdefault("details", {})
if (
not isinstance(pipeline_index, int)
or pipeline_index < 0
or pipeline_index >= len(pipeline)
):
pipeline_index = 0
for idx, name in enumerate(pipeline[pipeline_index:]):
out["pipeline_index"] = pipeline_index + idx
func = module_member(name)
result = func(*args, **out) or {}
if not isinstance(result, dict):
return result
out.update(result)
return out
def extra_data(self, user, uid, response, details=None, *args, **kwargs):
"""Return default extra data to store in extra_data field"""
data = {
# store the last time authentication toke place
"auth_time": int(time.time())
}
extra_data_entries = []
if self.GET_ALL_EXTRA_DATA or self.setting("GET_ALL_EXTRA_DATA", False):
extra_data_entries = response.keys()
else:
extra_data_entries = (self.EXTRA_DATA or []) + self.setting(
"EXTRA_DATA", []
)
for entry in extra_data_entries:
if not isinstance(entry, (list, tuple)):
entry = (entry,)
size = len(entry)
if size >= 1 and size <= 3:
if size == 3:
name, alias, discard = entry
elif size == 2:
(name, alias), discard = entry, False
elif size == 1:
name = alias = entry[0]
discard = False
value = response.get(name, details.get(name, details.get(alias)))
if discard and not value:
continue
data[alias] = value
return data
def auth_allowed(self, response, details):
"""Return True if the user should be allowed to authenticate, by
default check if email is whitelisted (if there's a whitelist)"""
emails = [email.lower() for email in self.setting("WHITELISTED_EMAILS", [])]
domains = [domain.lower() for domain in self.setting("WHITELISTED_DOMAINS", [])]
email = details.get("email")
allowed = True
if email and (emails or domains):
email = email.lower()
domain = email.split("@", 1)[1]
allowed = email in emails or domain in domains
return allowed
def get_user_id(self, details, response):
"""Return a unique ID for the current user, by default from server
response."""
return response.get(self.ID_KEY)
def get_user_details(self, response):
"""Must return user details in a know internal struct:
{'username': <username if any>,
'email': <user email if any>,
'fullname': <user full name if any>,
'first_name': <user first name if any>,
'last_name': <user last name if any>}
"""
raise NotImplementedError("Implement in subclass")
def get_user_names(self, fullname="", first_name="", last_name=""):
# Avoid None values
fullname = fullname or ""
first_name = first_name or ""
last_name = last_name or ""
if fullname and not (first_name or last_name):
try:
first_name, last_name = fullname.split(" ", 1)
except ValueError:
first_name = first_name or fullname or ""
last_name = last_name or ""
fullname = fullname or " ".join((first_name, last_name))
return fullname.strip(), first_name.strip(), last_name.strip()
def get_user(self, user_id):
"""
Return user with given ID from the User model used by this backend.
This is called by django.contrib.auth.middleware.
"""
return self.strategy.get_user(user_id)
def continue_pipeline(self, partial):
"""Continue previous halted pipeline"""
return self.strategy.authenticate(
self, pipeline_index=partial.next_step, *partial.args, **partial.kwargs
)
def auth_extra_arguments(self):
"""Return extra arguments needed on auth process. The defaults can be
overridden by GET parameters."""
extra_arguments = self.setting("AUTH_EXTRA_ARGUMENTS", {}).copy()
extra_arguments.update(
(key, self.data[key]) for key in extra_arguments if key in self.data
)
return extra_arguments
def uses_redirect(self):
"""Return True if this provider uses redirect url method,
otherwise return false."""
return True
def request(self, url, method="GET", *args, **kwargs):
kwargs.setdefault("headers", {})
if self.setting("PROXIES") is not None:
kwargs.setdefault("proxies", self.setting("PROXIES"))
if self.setting("VERIFY_SSL") is not None:
kwargs.setdefault("verify", self.setting("VERIFY_SSL"))
kwargs.setdefault(
"timeout",
self.setting("REQUESTS_TIMEOUT") or self.setting("URLOPEN_TIMEOUT"),
)
if self.SEND_USER_AGENT and "User-Agent" not in kwargs["headers"]:
kwargs["headers"]["User-Agent"] = self.setting("USER_AGENT") or user_agent()
try:
if self.SSL_PROTOCOL:
session = SSLHttpAdapter.ssl_adapter_session(self.SSL_PROTOCOL)
response = session.request(method, url, *args, **kwargs)
else:
response = request(method, url, *args, **kwargs)
except ConnectionError as err:
raise AuthFailed(self, str(err))
response.raise_for_status()
return response
def get_json(self, url, *args, **kwargs):
return self.request(url, *args, **kwargs).json()
def get_querystring(self, url, *args, **kwargs):
return parse_qs(self.request(url, *args, **kwargs).text)
def get_key_and_secret(self):
"""Return tuple with Consumer Key and Consumer Secret for current
service provider. Must return (key, secret), order *must* be respected.
"""
return self.setting("KEY"), self.setting("SECRET")