forked from encode/httpcore
-
Notifications
You must be signed in to change notification settings - Fork 0
/
connection.py
171 lines (146 loc) · 5.99 KB
/
connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
from ssl import SSLContext
from typing import List, Optional, Tuple, Union
from socksio import socks4
from .._backends.auto import SyncLock, SyncBackend
from .._types import URL, Headers, Origin, TimeoutDict
from .base import (
SyncByteStream,
SyncHTTPTransport,
ConnectionState,
NewConnectionRequired,
)
from .http2 import SyncHTTP2Connection
from .http11 import SyncHTTP11Connection
class SyncHTTPConnection(SyncHTTPTransport):
def __init__(
self, origin: Origin, http2: bool = False, ssl_context: SSLContext = None,
):
self.origin = origin
self.http2 = http2
self.ssl_context = SSLContext() if ssl_context is None else ssl_context
if self.http2:
self.ssl_context.set_alpn_protocols(["http/1.1", "h2"])
self.connection: Union[None, SyncHTTP11Connection, SyncHTTP2Connection] = None
self.is_http11 = False
self.is_http2 = False
self.connect_failed = False
self.expires_at: Optional[float] = None
self.backend = SyncBackend()
@property
def request_lock(self) -> SyncLock:
# We do this lazily, to make sure backend autodetection always
# runs within an async context.
if not hasattr(self, "_request_lock"):
self._request_lock = self.backend.create_lock()
return self._request_lock
def request(
self,
method: bytes,
url: URL,
headers: Optional[Headers] = None,
stream: SyncByteStream = None,
timeout: Optional[TimeoutDict] = None,
) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]], SyncByteStream]:
assert url[:3] == self.origin
with self.request_lock:
if self.state == ConnectionState.PENDING:
try:
self._connect(timeout)
except Exception:
self.connect_failed = True
raise
elif self.state in (ConnectionState.READY, ConnectionState.IDLE):
pass
elif self.state == ConnectionState.ACTIVE and self.is_http2:
pass
else:
raise NewConnectionRequired()
assert self.connection is not None
return self.connection.request(method, url, headers, stream, timeout)
def _connect(self, timeout: TimeoutDict = None) -> None:
scheme, hostname, port = self.origin
timeout = {} if timeout is None else timeout
ssl_context = self.ssl_context if scheme == b"https" else None
socket = self.backend.open_tcp_stream(
hostname, port, ssl_context, timeout
)
http_version = socket.get_http_version()
if http_version == "HTTP/2":
self.is_http2 = True
self.connection = SyncHTTP2Connection(socket=socket, backend=self.backend)
else:
self.is_http11 = True
self.connection = SyncHTTP11Connection(socket=socket)
@property
def state(self) -> ConnectionState:
if self.connect_failed:
return ConnectionState.CLOSED
elif self.connection is None:
return ConnectionState.PENDING
return self.connection.state
def is_connection_dropped(self) -> bool:
return self.connection is not None and self.connection.is_connection_dropped()
def mark_as_ready(self) -> None:
if self.connection is not None:
self.connection.mark_as_ready()
def start_tls(
self, hostname: bytes, timeout: Optional[TimeoutDict] = None
) -> None:
if self.connection is not None:
self.connection.start_tls(hostname, timeout)
class SyncSOCKSConnection(SyncHTTPConnection):
"""An HTTP/1.1 connection with SOCKS proxy negotiation."""
def __init__(
self,
origin: Origin,
proxy_origin: Origin,
socks_version: str,
user_id: bytes = b"httpcore",
ssl_context: Optional[SSLContext] = None,
) -> None:
self.origin = origin
self.proxy_origin = proxy_origin
self.ssl_context = SSLContext() if ssl_context is None else ssl_context
self.connection: Union[None, SyncHTTP11Connection] = None
self.is_http11 = True
self.is_http2 = False
self.connect_failed = False
self.expires_at: Optional[float] = None
self.backend = SyncBackend()
self.user_id = user_id
self.socks_connection = self._get_socks_connection(socks_version)
def _get_socks_connection(self, socks_version: str) -> socks4.SOCKS4Connection:
if socks_version == "SOCKS4":
return socks4.SOCKS4Connection(user_id=self.user_id)
else:
raise NotImplementedError
def _connect(self, timeout: Optional[TimeoutDict] = None,) -> None:
"""SOCKS4 negotiation prior to creating an HTTP/1.1 connection."""
# First setup the socket to talk to the proxy server
_, hostname, port = self.proxy_origin
timeout = {} if timeout is None else timeout
ssl_context = None
socket = self.backend.open_tcp_stream(
hostname, port, ssl_context, timeout
)
# Use socksio to negotiate the connection with the remote host
request = socks4.SOCKS4Request.from_address(
socks4.SOCKS4Command.CONNECT, (self.origin[1].decode(), self.origin[2])
)
self.socks_connection.send(request)
bytes_to_send = self.socks_connection.data_to_send()
socket.write(bytes_to_send, timeout)
# Read the response from the proxy
data = socket.read(1024, timeout)
event = self.socks_connection.receive_data(data)
# Bail if rejected
if event.reply_code != socks4.SOCKS4ReplyCode.REQUEST_GRANTED:
raise Exception(
"Proxy server could not connect to remote host: {}".format(
event.reply_code
)
)
# Otherwise use the socket as usual
self.connection = SyncHTTP11Connection(
socket=socket, ssl_context=self.ssl_context
)