-
-
Notifications
You must be signed in to change notification settings - Fork 201
/
auth.py
339 lines (248 loc) · 13.1 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
# from __future__ import annotations
# Standard
import re
import ssl
import urllib3
from datetime import datetime, timedelta
from typing import Dict, Optional, Tuple, Any
# Third
import aiohttp
from multidict import MultiDict
# Local
from .local import LocalErrorResponse, ResponseLanguage
from ..errors import AuthenticationError
from ..locale_v2 import ValorantTranslator
vlr_locale = ValorantTranslator()
# disable urllib3 warnings that might arise from making requests to 127.0.0.1
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def _extract_tokens(data: str) -> str:
"""Extract tokens from data"""
pattern = re.compile('access_token=((?:[a-zA-Z]|\d|\.|-|_)*).*id_token=((?:[a-zA-Z]|\d|\.|-|_)*).*expires_in=(\d*)')
response = pattern.findall(data['response']['parameters']['uri'])[0]
return response
def _extract_tokens_from_uri(URL: str) -> Optional[Tuple[str, Any]]:
try:
accessToken = URL.split("access_token=")[1].split("&scope")[0]
tokenId = URL.split("id_token=")[1].split("&")[0]
return accessToken, tokenId
except IndexError:
raise AuthenticationError('Cookies Invalid')
class ClientSession(aiohttp.ClientSession):
def __init__(self, *args, **kwargs):
ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
ctx.set_ciphers("DEFAULT@SECLEVEL=1")
super().__init__(
*args,
**kwargs,
cookie_jar=aiohttp.CookieJar(),
connector=aiohttp.TCPConnector(ssl=ctx)
)
class Auth:
def __init__(self) -> None:
self._headers: Dict = {
'Content-Type': 'application/json',
'User-Agent': 'RiotClient/51.0.0.4429735.4381201 rso-auth (Windows;10;;Professional, x64)',
'Accept': 'application/json, text/plain, */*',
}
self.user_agent = 'RiotClient/51.0.0.4429735.4381201 rso-auth (Windows;10;;Professional, x64)'
self.locale_code = 'en-US' # default language
self.response = {} # prepare response for local response
def local_response(self) -> LocalErrorResponse:
'''This function is used to check if the local response is enabled.'''
self.response = LocalErrorResponse('AUTH', self.locale_code)
return self.response
async def authenticate(self, username: str, password: str) -> Optional[Dict[str, Any]]:
""" This function is used to authenticate the user. """
# language
local_response = self.local_response()
session = ClientSession()
data = {
"client_id": "play-valorant-web-prod",
"nonce": "1",
"redirect_uri": "https://playvalorant.com/opt_in",
"response_type": "token id_token",
'scope': 'account openid',
}
# headers = {'Content-Type': 'application/json', 'User-Agent': self.user_agent}
r = await session.post('https://auth.riotgames.com/api/v1/authorization', json=data, headers=self._headers)
# prepare cookies for auth request
cookies = {}
cookies['cookie'] = {}
for cookie in r.cookies.items():
cookies['cookie'][cookie[0]] = str(cookie).split('=')[1].split(';')[0]
data = {"type": "auth", "username": username, "password": password, "remember": True}
async with session.put('https://auth.riotgames.com/api/v1/authorization', json=data, headers=self._headers) as r:
data = await r.json()
for cookie in r.cookies.items():
cookies['cookie'][cookie[0]] = str(cookie).split('=')[1].split(';')[0]
# print('Response Status:', r.status)
await session.close()
if data['type'] == 'response':
expiry_token = datetime.now() + timedelta(hours=1)
response = _extract_tokens(data)
access_token = response[0]
token_id = response[1]
expiry_token = datetime.now() + timedelta(minutes=59)
cookies['expiry_token'] = int(datetime.timestamp(expiry_token))
return {'auth': 'response', 'data': {'cookie': cookies, 'access_token': access_token, 'token_id': token_id}}
elif data['type'] == 'multifactor':
if r.status == 429:
raise AuthenticationError(local_response.get('RATELIMIT', 'Please wait a few minutes and try again.'))
label_modal = local_response.get('INPUT_2FA_CODE')
WaitFor2FA = {"auth": "2fa", "cookie": cookies, 'label': label_modal}
if data['multifactor']['method'] == 'email':
WaitFor2FA['message'] = f"{local_response.get('2FA_TO_EMAIL', 'Riot sent a code to')} {data['multifactor']['email']}"
return WaitFor2FA
WaitFor2FA['message'] = local_response.get('2FA_ENABLE', 'You have 2FA enabled!')
return WaitFor2FA
raise AuthenticationError(local_response.get('INVALID_PASSWORD', 'Your username or password may be incorrect!'))
async def get_entitlements_token(self, access_token: str) -> Optional[str]:
""" This function is used to get the entitlements token. """
# language
local_response = self.local_response()
session = ClientSession()
headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {access_token}'}
async with session.post('https://entitlements.auth.riotgames.com/api/token/v1', headers=headers, json={}) as r:
data = await r.json()
await session.close()
try:
entitlements_token = data['entitlements_token']
except KeyError:
raise AuthenticationError(local_response.get('COOKIES_EXPIRED', 'Cookies is expired, plz /login again!'))
else:
return entitlements_token
async def get_userinfo(self, access_token: str) -> Tuple[str, str, str]:
""" This function is used to get the user info. """
# language
local_response = self.local_response()
session = ClientSession()
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {access_token}'
}
async with session.post('https://auth.riotgames.com/userinfo', headers=headers, json={}) as r:
data = await r.json()
await session.close()
try:
puuid = data['sub']
name = data['acct']['game_name']
tag = data['acct']['tag_line']
except KeyError:
raise AuthenticationError(local_response.get('NO_NAME_TAG', 'This user hasn\'t created a name or tagline yet.'))
else:
return puuid, name, tag
async def get_region(self, access_token: str, token_id: str) -> str:
""" This function is used to get the region. """
# language
local_response = self.local_response()
session = ClientSession()
headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {access_token}'}
body = {"id_token": token_id}
async with session.put('https://riot-geo.pas.si.riotgames.com/pas/v1/product/valorant', headers=headers, json=body) as r:
data = await r.json()
await session.close()
try:
region = data['affinities']['live']
except KeyError:
raise AuthenticationError(local_response.get('REGION_NOT_FOUND', 'An unknown error occurred, plz `/login` again'))
else:
return region
async def give2facode(self, twoFAcode: str, cookies: Dict) -> Dict[str, Any]:
""" This function is used to give the 2FA code. """
# language
local_response = self.local_response()
session = ClientSession()
# headers = {'Content-Type': 'application/json', 'User-Agent': self.user_agent}
data = {"type": "multifactor", "code": twoFAcode, "rememberDevice": True}
async with session.put('https://auth.riotgames.com/api/v1/authorization', headers=self._headers, json=data, cookies=cookies['cookie']) as r:
data = await r.json()
await session.close()
if data['type'] == 'response':
cookies = {}
cookies['cookie'] = {}
for cookie in r.cookies.items():
cookies['cookie'][cookie[0]] = str(cookie).split('=')[1].split(';')[0]
uri = data['response']['parameters']['uri']
access_token, token_id = _extract_tokens_from_uri(uri)
return {'auth': 'response', 'data': {'cookie': cookies, 'access_token': access_token, 'token_id': token_id}}
return {'auth': 'failed', 'error': local_response.get('2FA_INVALID_CODE')}
async def redeem_cookies(self, cookies: Dict) -> Tuple[Dict[str, Any], str, str]:
""" This function is used to redeem the cookies. """
# language
local_response = self.local_response()
# cookies = json.loads(cookies)
session = ClientSession()
async with session.get(
"https://auth.riotgames.com/authorize?redirect_uri=https%3A%2F%2Fplayvalorant.com%2Fopt_in&client_id=play-valorant-web-prod&response_type=token%20id_token&scope=account%20openid&nonce=1",
cookies=cookies,
allow_redirects=False
) as r:
data = await r.text()
if r.status != 303:
raise AuthenticationError(local_response.get('COOKIES_EXPIRED'))
if r.headers['Location'].startswith('/login'):
raise AuthenticationError(local_response.get('COOKIES_EXPIRED'))
old_cookie = cookies.copy()
new_cookies = {}
new_cookies['cookie'] = old_cookie
for cookie in r.cookies.items():
new_cookies['cookie'][cookie[0]] = str(cookie).split('=')[1].split(';')[0]
await session.close()
accessToken, tokenId = _extract_tokens_from_uri(data)
entitlements_token = await self.get_entitlements_token(accessToken)
return new_cookies, accessToken, entitlements_token
async def temp_auth(self, username: str, password: str) -> Optional[Dict[str, Any]]:
authenticate = await self.authenticate(username, password)
if authenticate['auth'] == 'response':
access_token = authenticate['data']['access_token']
token_id = authenticate['data']['token_id']
entitlements_token = await self.get_entitlements_token(access_token)
puuid, name, tag = await self.get_userinfo(access_token)
region = await self.get_region(access_token, token_id)
player_name = f'{name}#{tag}' if tag is not None and tag is not None else 'no_username'
headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {access_token}', 'X-Riot-Entitlements-JWT': entitlements_token}
user_data = {'puuid': puuid, 'region': region, 'headers': headers, 'player_name': player_name}
return user_data
raise AuthenticationError(self.local_response().get('TEMP_LOGIN_NOT_SUPPORT_2FA'))
# next update
async def login_with_cookie(self, cookies: Dict) -> Dict[str, Any]:
""" This function is used to login with cookie. """
# language
local_response = ResponseLanguage('cookies', self.locale_code)
cookie_payload = f'ssid={cookies};' if cookies.startswith('e') else cookies
class CookieClientSession(aiohttp.ClientSession):
def __init__(self, *args, **kwargs):
ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
ctx.set_ciphers("DEFAULT@SECLEVEL=1")
super().__init__(
*args,
**kwargs,
connector=aiohttp.TCPConnector(ssl=ctx),
headers=MultiDict({
"Accept-Language": "en-US,en;q=0.9",
"Accept": "application/json, text/plain, */*",
'cookie': cookie_payload,
}))
session = CookieClientSession()
r = await session.get(
"https://auth.riotgames.com/authorize?redirect_uri=https%3A%2F%2Fplayvalorant.com%2Fopt_in&client_id=play-valorant-web-prod&response_type=token%20id_token&scope=account%20openid&nonce=1",
allow_redirects=False,
)
if r.status != 303:
raise AuthenticationError(local_response.get('FAILED'))
await session.close()
# NEW COOKIE
new_cookies = {'cookie': {}}
for cookie in r.cookies.items():
new_cookies['cookie'][cookie[0]] = str(cookie).split('=')[1].split(';')[0]
accessToken, tokenID = _extract_tokens_from_uri(await r.text())
entitlements_token = await self.get_entitlements_token(accessToken)
data = {
'cookies': new_cookies,
'AccessToken': accessToken,
'token_id': tokenID,
'emt': entitlements_token
}
return data
async def refresh_token(self, cookies: Dict) -> Tuple[Dict[str, Any], str, str]:
return await self.redeem_cookies(cookies)