-
Notifications
You must be signed in to change notification settings - Fork 0
/
identityproviders.py
151 lines (120 loc) · 4.54 KB
/
identityproviders.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import html
import inspect
import re
import sys
from abc import ABC, abstractmethod
from typing import ClassVar, List, Tuple, Type
if sys.version_info >= (3, 8):
from typing import TypedDict
else:
from typing_extensions import TypedDict
from httpx import URL, AsyncClient, Client
from moodle.exceptions import MoodleException
class IDPInfo(TypedDict):
name: str
iconurl: str
url: str
class IdentityProvider(ABC):
providers: ClassVar[List[Type["IdentityProvider"]]] = []
def __init_subclass__(cls) -> None:
super().__init_subclass__()
if not inspect.isabstract(cls):
IdentityProvider.providers.append(cls)
@classmethod
def get_responsible_idp(
cls, idp_infos: List[IDPInfo]
) -> Tuple[Type["IdentityProvider"], IDPInfo]:
for idp_info in idp_infos:
for idp_type in IdentityProvider.providers:
if idp_type.is_responsible(idp_info):
return idp_type, idp_info
raise MoodleException("No responsible identityprovider found")
def __init__(
self,
wwwroot: str,
username: str,
password: str,
idp: IDPInfo,
) -> None:
super().__init__()
self.wwwroot = wwwroot
self.username = username
self.password = password
self.idp = idp
@staticmethod
@abstractmethod
def is_responsible(idp: IDPInfo) -> bool:
...
def login(self, client: Client) -> None:
return None
def sync_login(self, client: Client) -> None:
return self.login(client)
async def async_login(self, client: AsyncClient) -> None:
with Client(cookies=client.cookies) as sync_client:
self.login(sync_client)
client.cookies.update(sync_client.cookies)
class RWTHSingleSignOn(IdentityProvider):
requires_response_body = True
@staticmethod
def is_responsible(idp: IDPInfo) -> bool:
return idp["name"] == "RWTH Single Sign On"
def sync_login(self, client: Client) -> None:
login_page_url = client.get(self.idp["url"], follow_redirects=True).url
if login_page_url is None:
raise MoodleException("URL unexpectedly not set on response")
if login_page_url.netloc == URL(self.wwwroot).netloc:
# We were redirected to Moodle so we are presumably logged in already
return
redirect_page = client.post(
login_page_url,
data={
"j_username": self.username,
"j_password": self.password,
"_eventId_proceed": "",
},
)
# TODO use python html.parser
formdata = re.search(
r'<form action="(?P<form_submit_url>[^"]*)" method="post">'
r'.*<input type="hidden" name="RelayState" value="(?P<RelayState>[^"]*)"/>'
r'.*<input type="hidden" name="SAMLResponse" value="(?P<SAMLResponse>[^"]*)"/>',
html.unescape(redirect_page.text),
flags=re.MULTILINE | re.DOTALL,
)
if not formdata:
raise MoodleException("Unable to parse login form")
client.post(
formdata["form_submit_url"],
data=formdata.groupdict(),
follow_redirects=True,
)
async def async_login(self, client: AsyncClient) -> None:
login_page_url = (await client.get(self.idp["url"], follow_redirects=True)).url
if login_page_url is None:
raise MoodleException("URL unexpectedly not set on response")
if login_page_url.netloc == URL(self.wwwroot).netloc:
# We were redirected to Moodle so we are presumably logged in already
return
redirect_page = await client.post(
login_page_url,
data={
"j_username": self.username,
"j_password": self.password,
"_eventId_proceed": "",
},
)
# TODO use python html.parser
formdata = re.search(
r'<form action="(?P<form_submit_url>[^"]*)" method="post">'
r'.*<input type="hidden" name="RelayState" value="(?P<RelayState>[^"]*)"/>'
r'.*<input type="hidden" name="SAMLResponse" value="(?P<SAMLResponse>[^"]*)"/>',
html.unescape(redirect_page.text),
flags=re.MULTILINE | re.DOTALL,
)
if not formdata:
raise MoodleException("Unable to parse login form")
await client.post(
formdata["form_submit_url"],
data=formdata.groupdict(),
follow_redirects=True,
)