This repository was archived by the owner on Jan 25, 2023. It is now read-only.
generated from labteral/python-package
-
Notifications
You must be signed in to change notification settings - Fork 77
/
Copy pathsessions.py
104 lines (83 loc) · 3.57 KB
/
sessions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import requests
import tls_client
from urllib.error import HTTPError as HTTPTLSError
from chatgpt.errors import ChatgptError, ChatgptErrorCodes
from chatgpt.utils import random_sleep_time
from tls_client.sessions import TLSClientExeption
class HTTPSessionBase:
DEFAULT_TIMEOUT = 300
DEFAULT_HEADERS = {
""
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"Accept-Language": "en-US,en;q=0.5",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36",
'sec-ch-ua': '"Not?A_Brand";v="8", "Chromium";v="107", "Google Chrome";v="107"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Linux"'
}
def __init__(self, timeout=DEFAULT_TIMEOUT,proxy=None, cookies={}):
"""
Args:
timeout (int, optional): Request timeout time. Defaults to 300.
proxy (str, optional): Wether it uses proxy or not. Defaults to None.
cookies (dict, optional): Cookies to introduce. Defaults to {}.
"""
self._timeout = timeout
self._proxy = proxy
self._cookies = cookies
class HTTPTLSSession(HTTPSessionBase):
def __init__(self, timeout=None, proxy=None, cookies=None, headers = {}):
super().__init__(timeout,proxy,cookies)
self._headers = headers
self._session = tls_client.Session(client_identifier="chrome_107")
if cookies is not None:
self._session.cookies.update(cookies)
def request(self, *args, headers={}, **kwargs):
send_headers = {**self.DEFAULT_HEADERS}
send_headers.update(self._headers)
send_headers.update(headers)
try:
response = self._session.execute_request(
*args, headers=send_headers, timeout_seconds=self._timeout, proxy=self._proxy, **kwargs)
except TLSClientExeption as e:
raise ChatgptError(str(e), ChatgptErrorCodes.CONNECTION_ERROR)
random_sleep_time(0.7,1.4)
if response.status_code == 200:
return response
else:
raise HTTPTLSError(args[1], response.status_code,
response.text, response.headers, None)
@property
def cookies(self):
return self._session.cookies.get_dict()
@cookies.setter
def cookies(self, cookies):
return self._session.cookies.update(cookies)
class HTTPSession(HTTPSessionBase):
def __init__(self, timeout=None, proxy=None, cookies=None, stream=True):
super().__init__(timeout,proxy,cookies)
self._session = requests.Session()
self._session.headers.update(self.DEFAULT_HEADERS)
self._session.stream = stream
if cookies is not None:
self._session.cookies.update(cookies)
def request(self, *args, stream =None, **kwargs):
try:
response = self._session.request(
*args, timeout=self._timeout, stream=stream, **kwargs)
response.raise_for_status()
return response
except ConnectionError as ex:
raise ChatgptError(str(ex), ChatgptErrorCodes.CONNECTION_ERROR)
@property
def headers(self):
return self._session.headers
@headers.setter
def headers(self, headers):
self._session.headers.update(headers)
@property
def cookies(self):
return self._session.cookies.get_dict()
@cookies.setter
def headers(self, cookies):
self._session.cookies.update(cookies)