-
-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
master.py
131 lines (113 loc) · 4.62 KB
/
master.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import asyncio
import logging
from . import ctx as mitmproxy_ctx
from .addons import termlog
from .proxy.mode_specs import ReverseMode
from .utils import asyncio_utils
from mitmproxy import addonmanager
from mitmproxy import command
from mitmproxy import eventsequence
from mitmproxy import hooks
from mitmproxy import http
from mitmproxy import log
from mitmproxy import options
logger = logging.getLogger(__name__)
class Master:
"""
The master handles mitmproxy's main event loop.
"""
event_loop: asyncio.AbstractEventLoop
_termlog_addon: termlog.TermLog | None = None
def __init__(
self,
opts: options.Options,
event_loop: asyncio.AbstractEventLoop | None = None,
with_termlog: bool = False,
):
self.options: options.Options = opts or options.Options()
self.commands = command.CommandManager(self)
self.addons = addonmanager.AddonManager(self)
if with_termlog:
self._termlog_addon = termlog.TermLog()
self.addons.add(self._termlog_addon)
self.log = log.Log(self) # deprecated, do not use.
self._legacy_log_events = log.LegacyLogEvents(self)
self._legacy_log_events.install()
# We expect an active event loop here already because some addons
# may want to spawn tasks during the initial configuration phase,
# which happens before run().
self.event_loop = event_loop or asyncio.get_running_loop()
self.should_exit = asyncio.Event()
mitmproxy_ctx.master = self
mitmproxy_ctx.log = self.log # deprecated, do not use.
mitmproxy_ctx.options = self.options
async def run(self) -> None:
with (
asyncio_utils.install_exception_handler(self._asyncio_exception_handler),
asyncio_utils.set_eager_task_factory(),
):
self.should_exit.clear()
if ec := self.addons.get("errorcheck"):
await ec.shutdown_if_errored()
if ps := self.addons.get("proxyserver"):
# This may block for some proxy modes, so we also monitor should_exit.
await asyncio.wait(
[
asyncio.create_task(ps.setup_servers()),
asyncio.create_task(self.should_exit.wait()),
],
return_when=asyncio.FIRST_COMPLETED,
)
await self.running()
if ec := self.addons.get("errorcheck"):
await ec.shutdown_if_errored()
ec.finish()
try:
await self.should_exit.wait()
finally:
# .wait might be cancelled (e.g. by sys.exit)
await self.done()
def shutdown(self):
"""
Shut down the proxy. This method is thread-safe.
"""
# We may add an exception argument here.
self.event_loop.call_soon_threadsafe(self.should_exit.set)
async def running(self) -> None:
await self.addons.trigger_event(hooks.RunningHook())
async def done(self) -> None:
await self.addons.trigger_event(hooks.DoneHook())
self._legacy_log_events.uninstall()
if self._termlog_addon is not None:
self._termlog_addon.uninstall()
def _asyncio_exception_handler(self, loop, context) -> None:
try:
exc: Exception = context["exception"]
except KeyError:
logger.error(f"Unhandled asyncio error: {context}")
else:
if isinstance(exc, OSError) and exc.errno == 10038:
return # suppress https://bugs.python.org/issue43253
logger.error(
"Unhandled error in task.",
exc_info=(type(exc), exc, exc.__traceback__),
)
async def load_flow(self, f):
"""
Loads a flow
"""
if (
isinstance(f, http.HTTPFlow)
and len(self.options.mode) == 1
and self.options.mode[0].startswith("reverse:")
):
# When we load flows in reverse proxy mode, we adjust the target host to
# the reverse proxy destination for all flows we load. This makes it very
# easy to replay saved flows against a different host.
# We may change this in the future so that clientplayback always replays to the first mode.
mode = ReverseMode.parse(self.options.mode[0])
assert isinstance(mode, ReverseMode)
f.request.host, f.request.port, *_ = mode.address
f.request.scheme = mode.scheme
for e in eventsequence.iterate(f):
await self.addons.handle_lifecycle(e)