-
-
Notifications
You must be signed in to change notification settings - Fork 543
/
discourse.py
90 lines (71 loc) · 3.15 KB
/
discourse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import hmac
import time
from base64 import urlsafe_b64decode, urlsafe_b64encode
from hashlib import sha256
from urllib.parse import urlencode
from ..exceptions import AuthException, AuthTokenError
from ..utils import parse_qs
from .base import BaseAuth
class DiscourseAuth(BaseAuth):
name = "discourse"
EXTRA_DATA = ["username", "name", "avatar_url"]
def auth_url(self):
"""
Get the URL to which we must redirect in order to authenticate the user
"""
return_url = self.redirect_uri
nonce = self.strategy.random_string(64)
self.add_nonce(nonce)
payload = urlencode({"nonce": nonce, "return_sso_url": return_url})
base_64_payload = urlsafe_b64encode(payload.encode("utf8")).decode("ascii")
payload_signature = hmac.new(
self.setting("SECRET").encode("utf8"),
base_64_payload.encode("utf8"),
sha256,
).hexdigest()
encoded_params = urlencode({"sso": base_64_payload, "sig": payload_signature})
return f"{self.get_idp_url()}?{encoded_params}"
def get_idp_url(self):
return self.setting("SERVER_URL") + "/session/sso_provider"
def get_user_id(self, details, response):
return response["email"]
def get_user_details(self, response):
results = {
"username": response.get("username"),
"email": response.get("email"),
"name": response.get("name"),
"groups": response.get("groups", "").split(","),
"is_staff": response.get("admin") == "true"
or response.get("moderator") == "true",
"is_superuser": response.get("admin") == "true",
}
return results
def add_nonce(self, nonce):
self.strategy.storage.nonce.use(self.setting("SERVER_URL"), time.time(), nonce)
def get_nonce(self, nonce):
return self.strategy.storage.nonce.get(self.setting("SERVER_URL"), nonce)
def delete_nonce(self, nonce):
self.strategy.storage.nonce.delete(nonce)
def auth_complete(self, *args, **kwargs):
"""
The user has been redirected back from the IdP and we should
now log them in, if everything checks out.
"""
request_data = self.strategy.request_data()
sso_params = request_data.get("sso")
sso_signature = request_data.get("sig")
param_signature = hmac.new(
self.setting("SECRET").encode("utf8"), sso_params.encode("utf8"), sha256
).hexdigest()
if not hmac.compare_digest(str(sso_signature), str(param_signature)):
raise AuthException("Could not verify discourse login")
decoded_params = urlsafe_b64decode(sso_params.encode("utf8")).decode("ascii")
# Validate the nonce to ensure the request was not modified
response = parse_qs(decoded_params)
nonce_obj = self.get_nonce(response.get("nonce"))
if nonce_obj:
self.delete_nonce(nonce_obj)
else:
raise AuthTokenError(self, "Incorrect id_token: nonce")
kwargs.update({"sso": "", "sig": "", "backend": self, "response": response})
return self.strategy.authenticate(*args, **kwargs)