-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
base.py
215 lines (173 loc) · 7.43 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
import time
import random
import hashlib
from social.utils import setting_name, to_setting_name
from social.exceptions import NotAllowedToDisconnect
from social.store import OpenIdStore
class BaseTemplateStrategy(object):
def __init__(self, strategy):
self.strategy = strategy
def render(self, tpl=None, html=None, context=None):
if not tpl and not html:
raise ValueError('Missing template or html parameters')
context = context or {}
if tpl:
return self.render_template(tpl, context)
else:
return self.render_string(html, context)
def render_template(self, tpl, context):
raise NotImplementedError('Implement in subclass')
def render_string(self, html, context):
raise NotImplementedError('Implement in subclass')
class BaseStrategy(object):
ALLOWED_CHARS = 'abcdefghijklmnopqrstuvwxyz' \
'ABCDEFGHIJKLMNOPQRSTUVWXYZ' \
'0123456789'
def __init__(self, backend=None, storage=None, request=None, tpl=None,
backends=None, *args, **kwargs):
tpl = tpl or BaseTemplateStrategy
if not isinstance(tpl, BaseTemplateStrategy):
tpl = tpl(self)
self.tpl = tpl
self.request = request
self.storage = storage
self.backends = backends
if backend:
self.backend_name = backend.name
self.backend = backend(strategy=self, *args, **kwargs)
else:
self.backend_name = None
self.backend = backend
def setting(self, name, default=None):
names = (setting_name(self.backend_name, name),
setting_name(name),
to_setting_name(self.backend_name, name),
name)
for name in names:
try:
return self.get_setting(name)
except (AttributeError, KeyError):
pass
return default
def start(self):
# Clean any partial pipeline info before starting the process
self.clean_partial_pipeline()
if self.backend.uses_redirect():
return self.redirect(self.backend.auth_url())
else:
return self.html(self.backend.auth_html())
def complete(self, *args, **kwargs):
return self.backend.auth_complete(*args, **kwargs)
def continue_pipeline(self, *args, **kwargs):
return self.backend.continue_pipeline(*args, **kwargs)
def disconnect(self, user, association_id=None, *args, **kwargs):
return self.backend.disconnect(
user=user, association_id=association_id,
*args, **kwargs
)
def authenticate(self, *args, **kwargs):
kwargs['strategy'] = self
kwargs['storage'] = self.storage
kwargs['backend'] = self.backend
return self.backend.authenticate(*args, **kwargs)
def create_user(self, *args, **kwargs):
return self.storage.user.create_user(*args, **kwargs)
def get_user(self, *args, **kwargs):
return self.storage.user.get_user(*args, **kwargs)
def session_setdefault(self, name, value):
self.session_set(name, value)
return self.session_get(name)
def openid_session_dict(self, name):
return self.session_setdefault(name, {})
def to_session_value(self, val):
return val
def from_session_value(self, val):
return val
def to_session(self, next, backend, request=None, *args, **kwargs):
return {
'next': next,
'backend': backend.name,
'args': tuple(map(self.to_session_value, args)),
'kwargs': dict((key, self.to_session_value(val))
for key, val in kwargs.items())
}
def from_session(self, session):
return (
session['next'],
session['backend'],
list(map(self.from_session_value, session['args'])),
dict((key, self.from_session_value(val))
for key, val in session['kwargs'].items())
)
def clean_partial_pipeline(self, name='partial_pipeline'):
self.session_pop(name)
def openid_store(self):
return OpenIdStore(self)
def get_pipeline(self):
return self.setting('PIPELINE', (
'social.pipeline.social_auth.social_details',
'social.pipeline.social_auth.social_uid',
'social.pipeline.social_auth.auth_allowed',
'social.pipeline.social_auth.social_user',
'social.pipeline.user.get_username',
# 'social.pipeline.social_auth.associate_by_email',
'social.pipeline.user.create_user',
'social.pipeline.social_auth.associate_user',
'social.pipeline.social_auth.load_extra_data',
'social.pipeline.user.user_details'))
def get_disconnect_pipeline(self):
return self.setting('DISCONNECT_PIPELINE', (
'social.pipeline.disconnect.allowed_to_disconnect',
'social.pipeline.disconnect.get_entries',
'social.pipeline.disconnect.revoke_tokens',
'social.pipeline.disconnect.disconnect'))
def random_string(self, length=12, chars=ALLOWED_CHARS):
# Implementation borrowed from django 1.4
try:
random.SystemRandom()
except NotImplementedError:
key = self.setting('SECRET_KEY', '')
seed = '{0}{1}{2}'.format(random.getstate(), time.time(), key)
random.seed(hashlib.sha256(seed.encode()).digest())
return ''.join([random.choice(chars) for i in range(length)])
def is_integrity_error(self, exception):
return self.storage.is_integrity_error(exception)
def absolute_uri(self, path=None):
uri = self.build_absolute_uri(path)
if self.setting('ON_HTTPS'):
uri = uri.replace('http://', 'https://')
return uri
def get_language(self):
"""Return current language"""
return ''
# Implement the following methods on strategies sub-classes
def redirect(self, url):
"""Return a response redirect to the given URL"""
raise NotImplementedError('Implement in subclass')
def get_setting(self, name):
"""Return value for given setting name"""
raise NotImplementedError('Implement in subclass')
def html(self, content):
"""Return HTTP response with given content"""
raise NotImplementedError('Implement in subclass')
def render_html(self, tpl=None, html=None, context=None):
"""Render given template or raw html with given context"""
return self.tpl.render(tpl, html, context)
def request_data(self, merge=True):
"""Return current request data (POST or GET)"""
raise NotImplementedError('Implement in subclass')
def request_host(self):
"""Return current host value"""
raise NotImplementedError('Implement in subclass')
def session_get(self, name, default=None):
"""Return session value for given key"""
raise NotImplementedError('Implement in subclass')
def session_set(self, name, value):
"""Set session value for given key"""
raise NotImplementedError('Implement in subclass')
def session_pop(self, name):
"""Pop session value for given key"""
raise NotImplementedError('Implement in subclass')
def build_absolute_uri(self, path=None):
"""Build absolute URI with given (optional) path"""
raise NotImplementedError('Implement in subclass')