-
Notifications
You must be signed in to change notification settings - Fork 7
/
requestor.py
212 lines (170 loc) · 7.24 KB
/
requestor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import base64
import hashlib
import itertools
import logging
from collections import OrderedDict
from typing import Dict
from urllib.parse import quote_plus, urlunparse
from wykop.api.exceptions import WykopAPIError
from wykop.api.api_const import CLIENT_NAME, DOMAIN, PROTOCOL
from wykop.api.models.wykop_connect import WykopConnectLoginInfo
from wykop.core.credentials import Credentials
from wykop.core.parsers import default_parser
from wykop.core.requesters import default_requester
from wykop.utils import force_bytes, force_text, get_version, dictmap, sort_and_remove_none_values, \
validate_named_parameters, validate_api_parameters
log = logging.getLogger(__name__)
class Requestor:
def __init__(self, credentials: Credentials, parser, output='', response_format='json'):
self.credentials = credentials
self.output = output
self.format = response_format
self.userkey = ''
self.requester = default_requester
self.parser = parser
def request(self, rtype, rmethod=None,
named_params=None, api_params=None, post_params=None, file_params=None):
log.debug('Making request')
post_params = sort_and_remove_none_values(post_params)
file_params = file_params or {}
rtype = force_text(rtype)
rmethod = rmethod and force_text(rmethod)
post_params = dictmap(force_bytes, post_params)
url = self.construct_url(
rtype=rtype, rmethod=rmethod, api_params=api_params, named_params=named_params)
headers = self.headers(url, **post_params)
response = self.requester.make_request(
url, post_params, headers, file_params)
if self.parser is None:
return response
return self.parser.parse_response(response)
def authenticate(self, account_key=None, login=None, password=None):
self.credentials.account_key = account_key or self.credentials.account_key
self.credentials.login = login or self.credentials.login
self.credentials.password = password or self.credentials.password
if self.credentials.account_key:
res = self.user_login_with_accountkey(self.credentials.account_key)
elif self.credentials.login and self.credentials.password:
appkey_type = self.credentials.appkey_type()
if appkey_type['official']:
res = self.user_login_with_password(self.credentials.login, self.credentials.password)
else:
# TODO: implement login/connect polyfill
raise WykopAPIError(
0, 'login and password provided on unofficial appkey')
else:
raise WykopAPIError(
0, 'neither accountkey nor login and password are set')
self.userkey = res['userkey']
def user_login_with_accountkey(self, account_key):
post_params = {'accountkey': account_key}
return self.request('login', post_params=post_params)
def user_login_with_password(self, login, password):
post_params = {'login': login, 'password': password}
return self.request('login', post_params=post_params)
def user_login_2fa(self, tfa_token):
post_params = {'code': tfa_token}
return self.request('login', '2fa', post_params=post_params)
def wykop_connect_url(self, redirect_url: str = None):
named_params = self.connect_named_params(redirect_url)
return self.construct_url(rtype='Login', rmethod='Connect', named_params=named_params)
def default_named_params(self):
"""
Gets default named parameters.
"""
return {
'appkey': self.credentials.appkey,
'format': self.format,
'output': self.output,
'userkey': self.userkey,
}
def api_sign(self, url, **post_params):
"""
Gets request api sign.
"""
post_params_values = self.post_params_values(**post_params)
post_params_values_str = ",".join(post_params_values)
post_params_values_bytes = force_bytes(post_params_values_str)
url_bytes = force_bytes(url)
secretkey_bytes = force_bytes(self.credentials.secretkey)
return hashlib.md5(
secretkey_bytes + url_bytes + post_params_values_bytes).hexdigest()
def post_params_values(self, **post_params):
"""
Gets post parameters values list. Required to api sign.
"""
return [force_text(post_params[key])
for key in sorted(post_params.keys())]
def user_agent(self):
"""
Gets User-Agent header.
"""
client_version = get_version()
return '/'.join([CLIENT_NAME, client_version])
def headers(self, url, **post_params):
"""
Gets request headers.
"""
apisign = self.api_sign(url, **post_params)
user_agent = self.user_agent()
return {
'apisign': apisign,
'User-Agent': user_agent,
}
def connect_named_params(self, redirect_url=None):
"""
Gets request api parameters for wykop connect.
"""
apisign = self.api_sign(redirect_url)
named_params = {
'secure': apisign,
}
if redirect_url is not None:
redirect_url_bytes = force_bytes(redirect_url)
redirect_url_encoded = quote_plus(
base64.b64encode(redirect_url_bytes))
named_params.update({
'redirect': redirect_url_encoded,
})
return named_params
def construct_url(self, rtype, rmethod=None, api_params=None, named_params=None):
"""
Constructs request url.
"""
named_params = dictmap(force_text, validate_named_parameters(named_params))
api_params = validate_api_parameters(api_params)
path = self.path(rtype, api_params=api_params,
rmethod=rmethod, named_params=named_params)
urlparts = (PROTOCOL, DOMAIN, path, '', '', '')
return str(urlunparse(urlparts))
def path(self, rtype, api_params, rmethod=None, **named_params):
"""
Gets request path.
"""
pathparts = [rtype]
if rmethod is not None:
pathparts += [rmethod]
if api_params is not None:
pathparts += tuple(api_params)
named_params = self.named_params(**named_params)
if named_params:
pathparts += list(itertools.chain(*named_params.items()))
return '/'.join(pathparts)
def named_params(self, named_params) -> Dict[str, str]:
"""
Gets request method parameters.
"""
params = self.default_named_params()
params.update(named_params)
return {
str(key): str(value)
for key, value in params.items()
if value
}
def valid_sign(self, connect_response: WykopConnectLoginInfo) -> bool:
secret_bytes = force_bytes(self.credentials.secretkey)
app_key_bytes = force_bytes(connect_response.app_key)
login_bytes = force_bytes(connect_response.login)
token_bytes = force_bytes(connect_response.token)
to_hash = secret_bytes + app_key_bytes + login_bytes + token_bytes
return hashlib.md5(to_hash).hexdigest() == connect_response.sign