/
connection.py
352 lines (300 loc) · 12.4 KB
/
connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
import dataclasses
import time
import uuid
import warnings
from abc import ABCMeta
from collections.abc import Sequence
from dataclasses import dataclass
from dataclasses import field
from enum import Flag
from typing import Literal
from mitmproxy import certs
from mitmproxy.coretypes import serializable
from mitmproxy.net import server_spec
from mitmproxy.proxy import mode_specs
from mitmproxy.utils import human
class ConnectionState(Flag):
"""The current state of the underlying socket."""
CLOSED = 0
CAN_READ = 1
CAN_WRITE = 2
OPEN = CAN_READ | CAN_WRITE
TransportProtocol = Literal["tcp", "udp"]
# practically speaking we may have IPv6 addresses with flowinfo and scope_id,
# but type checking isn't good enough to properly handle tuple unions.
# this version at least provides useful type checking messages.
Address = tuple[str, int]
kw_only = {"kw_only": True}
# noinspection PyDataclass
@dataclass(**kw_only)
class Connection(serializable.SerializableDataclass, metaclass=ABCMeta):
"""
Base class for client and server connections.
The connection object only exposes metadata about the connection, but not the underlying socket object.
This is intentional, all I/O should be handled by `mitmproxy.proxy.server` exclusively.
"""
peername: Address | None
"""The remote's `(ip, port)` tuple for this connection."""
sockname: Address | None
"""Our local `(ip, port)` tuple for this connection."""
state: ConnectionState = field(
default=ConnectionState.CLOSED, metadata={"serialize": False}
)
"""The current connection state."""
# all connections have a unique id. While
# f.client_conn == f2.client_conn already holds true for live flows (where we have object identity),
# we also want these semantics for recorded flows.
id: str = field(default_factory=lambda: str(uuid.uuid4()))
"""A unique UUID to identify the connection."""
transport_protocol: TransportProtocol = field(default="tcp")
"""The connection protocol in use."""
error: str | None = None
"""
A string describing a general error with connections to this address.
The purpose of this property is to signal that new connections to the particular endpoint should not be attempted,
for example because it uses an untrusted TLS certificate. Regular (unexpected) disconnects do not set the error
property. This property is only reused per client connection.
"""
tls: bool = False
"""
`True` if TLS should be established, `False` otherwise.
Note that this property only describes if a connection should eventually be protected using TLS.
To check if TLS has already been established, use `Connection.tls_established`.
"""
certificate_list: Sequence[certs.Cert] = ()
"""
The TLS certificate list as sent by the peer.
The first certificate is the end-entity certificate.
> [RFC 8446] Prior to TLS 1.3, "certificate_list" ordering required each
> certificate to certify the one immediately preceding it; however,
> some implementations allowed some flexibility. Servers sometimes
> send both a current and deprecated intermediate for transitional
> purposes, and others are simply configured incorrectly, but these
> cases can nonetheless be validated properly. For maximum
> compatibility, all implementations SHOULD be prepared to handle
> potentially extraneous certificates and arbitrary orderings from any
> TLS version, with the exception of the end-entity certificate which
> MUST be first.
"""
alpn: bytes | None = None
"""The application-layer protocol as negotiated using
[ALPN](https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation)."""
alpn_offers: Sequence[bytes] = ()
"""The ALPN offers as sent in the ClientHello."""
# we may want to add SSL_CIPHER_description here, but that's currently not exposed by cryptography
cipher: str | None = None
"""The active cipher name as returned by OpenSSL's `SSL_CIPHER_get_name`."""
cipher_list: Sequence[str] = ()
"""Ciphers accepted by the proxy server on this connection."""
tls_version: str | None = None
"""The active TLS version."""
sni: str | None = None
"""
The [Server Name Indication (SNI)](https://en.wikipedia.org/wiki/Server_Name_Indication) sent in the ClientHello.
"""
timestamp_start: float | None = None
timestamp_end: float | None = None
"""*Timestamp:* Connection has been closed."""
timestamp_tls_setup: float | None = None
"""*Timestamp:* TLS handshake has been completed successfully."""
@property
def connected(self) -> bool:
"""*Read-only:* `True` if Connection.state is ConnectionState.OPEN, `False` otherwise."""
return self.state is ConnectionState.OPEN
@property
def tls_established(self) -> bool:
"""*Read-only:* `True` if TLS has been established, `False` otherwise."""
return self.timestamp_tls_setup is not None
def __eq__(self, other):
if isinstance(other, Connection):
return self.id == other.id
return False
def __hash__(self):
return hash(self.id)
def __repr__(self):
attrs = {
# ensure these come first.
"id": None,
"address": None,
}
for f in dataclasses.fields(self):
val = getattr(self, f.name)
if val != f.default:
if f.name == "cipher_list":
val = f"<{len(val)} ciphers>"
elif f.name == "id":
val = f"…{val[-6:]}"
attrs[f.name] = val
return f"{type(self).__name__}({attrs!r})"
@property
def alpn_proto_negotiated(self) -> bytes | None: # pragma: no cover
"""*Deprecated:* An outdated alias for Connection.alpn."""
warnings.warn(
"Connection.alpn_proto_negotiated is deprecated, use Connection.alpn instead.",
DeprecationWarning,
stacklevel=2,
)
return self.alpn
# noinspection PyDataclass
@dataclass(eq=False, repr=False, **kw_only)
class Client(Connection):
"""A connection between a client and mitmproxy."""
peername: Address
"""The client's address."""
sockname: Address
"""The local address we received this connection on."""
mitmcert: certs.Cert | None = None
"""
The certificate used by mitmproxy to establish TLS with the client.
"""
proxy_mode: mode_specs.ProxyMode = field(
default=mode_specs.ProxyMode.parse("regular")
)
"""The proxy server type this client has been connecting to."""
timestamp_start: float = field(default_factory=time.time)
"""*Timestamp:* TCP SYN received"""
def __str__(self):
if self.alpn:
tls_state = f", alpn={self.alpn.decode(errors='replace')}"
elif self.tls_established:
tls_state = ", tls"
else:
tls_state = ""
state = self.state.name
assert state
return f"Client({human.format_address(self.peername)}, state={state.lower()}{tls_state})"
@property
def address(self): # pragma: no cover
"""*Deprecated:* An outdated alias for Client.peername."""
warnings.warn(
"Client.address is deprecated, use Client.peername instead.",
DeprecationWarning,
stacklevel=2,
)
return self.peername
@address.setter
def address(self, x): # pragma: no cover
warnings.warn(
"Client.address is deprecated, use Client.peername instead.",
DeprecationWarning,
stacklevel=2,
)
self.peername = x
@property
def cipher_name(self) -> str | None: # pragma: no cover
"""*Deprecated:* An outdated alias for Connection.cipher."""
warnings.warn(
"Client.cipher_name is deprecated, use Client.cipher instead.",
DeprecationWarning,
stacklevel=2,
)
return self.cipher
@property
def clientcert(self) -> certs.Cert | None: # pragma: no cover
"""*Deprecated:* An outdated alias for Connection.certificate_list[0]."""
warnings.warn(
"Client.clientcert is deprecated, use Client.certificate_list instead.",
DeprecationWarning,
stacklevel=2,
)
if self.certificate_list:
return self.certificate_list[0]
else:
return None
@clientcert.setter
def clientcert(self, val): # pragma: no cover
warnings.warn(
"Client.clientcert is deprecated, use Client.certificate_list instead.",
DeprecationWarning,
stacklevel=2,
)
if val:
self.certificate_list = [val]
else:
self.certificate_list = []
# noinspection PyDataclass
@dataclass(eq=False, repr=False, **kw_only)
class Server(Connection):
"""A connection between mitmproxy and an upstream server."""
address: Address | None # type: ignore
"""
The server's `(host, port)` address tuple.
The host can either be a domain or a plain IP address.
Which of those two will be present depends on the proxy mode and the client.
For explicit proxies, this value will reflect what the client instructs mitmproxy to connect to.
For example, if the client starts off a connection with `CONNECT example.com HTTP/1.1`, it will be `example.com`.
For transparent proxies such as WireGuard mode, this value will be an IP address.
"""
peername: Address | None = None
"""
The server's resolved `(ip, port)` tuple. Will be set during connection establishment.
May be `None` in upstream proxy mode when the address is resolved by the upstream proxy only.
"""
sockname: Address | None = None
timestamp_start: float | None = None
"""
*Timestamp:* Connection establishment started.
For IP addresses, this corresponds to sending a TCP SYN; for domains, this corresponds to starting a DNS lookup.
"""
timestamp_tcp_setup: float | None = None
"""*Timestamp:* TCP ACK received."""
via: server_spec.ServerSpec | None = None
"""An optional proxy server specification via which the connection should be established."""
def __str__(self):
if self.alpn:
tls_state = f", alpn={self.alpn.decode(errors='replace')}"
elif self.tls_established:
tls_state = ", tls"
else:
tls_state = ""
if self.sockname:
local_port = f", src_port={self.sockname[1]}"
else:
local_port = ""
state = self.state.name
assert state
return f"Server({human.format_address(self.address)}, state={state.lower()}{tls_state}{local_port})"
def __setattr__(self, name, value):
if name in ("address", "via"):
connection_open = (
self.__dict__.get("state", ConnectionState.CLOSED)
is ConnectionState.OPEN
)
# assigning the current value is okay, that may be an artifact of calling .set_state().
attr_changed = self.__dict__.get(name) != value
if connection_open and attr_changed:
raise RuntimeError(f"Cannot change server.{name} on open connection.")
return super().__setattr__(name, value)
@property
def ip_address(self) -> Address | None: # pragma: no cover
"""*Deprecated:* An outdated alias for `Server.peername`."""
warnings.warn(
"Server.ip_address is deprecated, use Server.peername instead.",
DeprecationWarning,
stacklevel=2,
)
return self.peername
@property
def cert(self) -> certs.Cert | None: # pragma: no cover
"""*Deprecated:* An outdated alias for `Connection.certificate_list[0]`."""
warnings.warn(
"Server.cert is deprecated, use Server.certificate_list instead.",
DeprecationWarning,
stacklevel=2,
)
if self.certificate_list:
return self.certificate_list[0]
else:
return None
@cert.setter
def cert(self, val): # pragma: no cover
warnings.warn(
"Server.cert is deprecated, use Server.certificate_list instead.",
DeprecationWarning,
stacklevel=2,
)
if val:
self.certificate_list = [val]
else:
self.certificate_list = []
__all__ = ["Connection", "Client", "Server", "ConnectionState"]