/
requests.py
119 lines (101 loc) · 3.64 KB
/
requests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import re
from typing import Any, Optional
import requests
class SingletonRequestSessionMixin:
@property
def session(self) -> requests.Session:
if not getattr(self, "_session", False):
self.__class__._session = requests.Session()
self._prepare_session()
return self._session
@session.deleter
def session(self):
self._session.close()
del self.__class__._session
@session.setter
def session(self, value: Any):
if isinstance(value, requests.Session):
if getattr(self, "_session", False):
del self.session
self.__class__._session = value
else:
raise ValueError("Only requests.Session object is allowed.")
def _prepare_session(self):
"""
configure session level settings with predefined instance attributes.
ex) self.session.headers.update({"Authorization": f"Bearer {self._access_token}"})
"""
raise NotImplementedError
def refresh_session(self):
self.session = requests.Session()
self._prepare_session()
def request(self, *args, **kwargs) -> requests.Response:
response = self.session.request(*args, **kwargs)
response.raise_for_status()
return response
def json_response(self, response: requests.Response) -> dict[str, Any]:
try:
return response.json()
except requests.exceptions.JSONDecodeError as e:
if m := re.match(
r"^Invalid control character at: line \d+ column \d+ \(char (\d+)\)$",
e.args[0],
):
response._content = (
str(
response.content,
encoding := response.encoding or response.apparent_encoding,
errors="replace",
)
.replace(response.text[int(m.groups()[0])], "")
.encode(encoding)
)
response._content_consumed = False
else:
raise e
return self.json_response(response)
class RequestPaginateMixin:
def request_page(
self,
method: str,
url: str,
former_response: Optional[requests.Response] = None,
**kwargs,
) -> requests.Response:
raise NotImplementedError
def process_response(
self,
response: requests.Response,
**kwargs,
) -> tuple[list[dict[str, Any]], bool]:
"""
returns 2-element tuple: (instances, has_next)
- instances: post-processed instances data from response
- has_next: boolean indicating response has next page
"""
raise NotImplementedError
def paginate_request(
self,
method: str,
url: str,
*,
max_count: Optional[int] = None,
**kwargs,
) -> list[dict[str, Any]]:
response = self.request_page(method=method, url=url, **kwargs)
response_agg, has_next = self.process_response(response, **kwargs)
if max_count and max_count <= len(response_agg):
return response_agg[:max_count]
while has_next:
response = self.request_page(
method=method, url=url, former_response=response, **kwargs
)
instances, has_next = self.process_response(response, **kwargs)
if max_count and (to_add := max_count - len(response_agg)) <= len(
instances
):
response_agg += instances[:to_add]
break
else:
response_agg += instances
return response_agg