-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathhttpclient.py
136 lines (119 loc) · 4.22 KB
/
httpclient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
"""Code generated by Speakeasy (https://speakeasy.com). DO NOT EDIT."""
# pyright: reportReturnType = false
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing_extensions import Protocol, runtime_checkable
import httpx
from typing import Any, Optional, Union
@runtime_checkable
class HttpClient(Protocol):
def send(
self,
request: httpx.Request,
*,
stream: bool = False,
auth: Union[
httpx._types.AuthTypes, httpx._client.UseClientDefault, None
] = httpx.USE_CLIENT_DEFAULT,
follow_redirects: Union[
bool, httpx._client.UseClientDefault
] = httpx.USE_CLIENT_DEFAULT,
) -> httpx.Response:
pass
def build_request(
self,
method: str,
url: httpx._types.URLTypes,
*,
content: Optional[httpx._types.RequestContent] = None,
data: Optional[httpx._types.RequestData] = None,
files: Optional[httpx._types.RequestFiles] = None,
json: Optional[Any] = None,
params: Optional[httpx._types.QueryParamTypes] = None,
headers: Optional[httpx._types.HeaderTypes] = None,
cookies: Optional[httpx._types.CookieTypes] = None,
timeout: Union[
httpx._types.TimeoutTypes, httpx._client.UseClientDefault
] = httpx.USE_CLIENT_DEFAULT,
extensions: Optional[httpx._types.RequestExtensions] = None,
) -> httpx.Request:
pass
def close(self) -> None:
pass
@runtime_checkable
class AsyncHttpClient(Protocol):
async def send(
self,
request: httpx.Request,
*,
stream: bool = False,
auth: Union[
httpx._types.AuthTypes, httpx._client.UseClientDefault, None
] = httpx.USE_CLIENT_DEFAULT,
follow_redirects: Union[
bool, httpx._client.UseClientDefault
] = httpx.USE_CLIENT_DEFAULT,
) -> httpx.Response:
pass
def build_request(
self,
method: str,
url: httpx._types.URLTypes,
*,
content: Optional[httpx._types.RequestContent] = None,
data: Optional[httpx._types.RequestData] = None,
files: Optional[httpx._types.RequestFiles] = None,
json: Optional[Any] = None,
params: Optional[httpx._types.QueryParamTypes] = None,
headers: Optional[httpx._types.HeaderTypes] = None,
cookies: Optional[httpx._types.CookieTypes] = None,
timeout: Union[
httpx._types.TimeoutTypes, httpx._client.UseClientDefault
] = httpx.USE_CLIENT_DEFAULT,
extensions: Optional[httpx._types.RequestExtensions] = None,
) -> httpx.Request:
pass
async def aclose(self) -> None:
pass
class ClientOwner(Protocol):
client: Union[HttpClient, None]
async_client: Union[AsyncHttpClient, None]
def close_clients(
owner: ClientOwner,
sync_client: Union[HttpClient, None],
sync_client_supplied: bool,
async_client: Union[AsyncHttpClient, None],
async_client_supplied: bool,
) -> None:
"""
A finalizer function that is meant to be used with weakref.finalize to close
httpx clients used by an SDK so that underlying resources can be garbage
collected.
"""
# Unset the client/async_client properties so there are no more references
# to them from the owning SDK instance and they can be reaped.
owner.client = None
owner.async_client = None
if sync_client is not None and not sync_client_supplied:
try:
sync_client.close()
except Exception:
pass
if async_client is not None and not async_client_supplied:
is_async = False
try:
asyncio.get_running_loop()
is_async = True
except RuntimeError:
pass
try:
# If this function is called in an async loop then start another
# loop in a separate thread to close the async http client.
if is_async:
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(asyncio.run, async_client.aclose())
future.result()
else:
asyncio.run(async_client.aclose())
except Exception:
pass