forked from mitmproxy/mitmproxy
-
Notifications
You must be signed in to change notification settings - Fork 2
/
clientplayback.py
294 lines (260 loc) · 10.1 KB
/
clientplayback.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
from __future__ import annotations
import asyncio
import logging
import time
from collections.abc import Sequence
from types import TracebackType
from typing import cast
from typing import Literal
import mitmproxy.types
from mitmproxy import command
from mitmproxy import ctx
from mitmproxy import exceptions
from mitmproxy import flow
from mitmproxy import http
from mitmproxy import io
from mitmproxy.connection import ConnectionState
from mitmproxy.connection import Server
from mitmproxy.hooks import UpdateHook
from mitmproxy.log import ALERT
from mitmproxy.options import Options
from mitmproxy.proxy import commands
from mitmproxy.proxy import events
from mitmproxy.proxy import layers
from mitmproxy.proxy import server
from mitmproxy.proxy.context import Context
from mitmproxy.proxy.layer import CommandGenerator
from mitmproxy.proxy.layers.http import HTTPMode
from mitmproxy.proxy.mode_specs import UpstreamMode
from mitmproxy.utils import asyncio_utils
logger = logging.getLogger(__name__)
class MockServer(layers.http.HttpConnection):
"""
A mock HTTP "server" that just pretends it received a full HTTP request,
which is then processed by the proxy core.
"""
flow: http.HTTPFlow
def __init__(self, flow: http.HTTPFlow, context: Context):
super().__init__(context, context.client)
self.flow = flow
def _handle_event(self, event: events.Event) -> CommandGenerator[None]:
if isinstance(event, events.Start):
content = self.flow.request.raw_content
self.flow.request.timestamp_start = (
self.flow.request.timestamp_end
) = time.time()
yield layers.http.ReceiveHttp(
layers.http.RequestHeaders(
1,
self.flow.request,
end_stream=not (content or self.flow.request.trailers),
replay_flow=self.flow,
)
)
if content:
yield layers.http.ReceiveHttp(layers.http.RequestData(1, content))
if self.flow.request.trailers: # pragma: no cover
# TODO: Cover this once we support HTTP/1 trailers.
yield layers.http.ReceiveHttp(
layers.http.RequestTrailers(1, self.flow.request.trailers)
)
yield layers.http.ReceiveHttp(layers.http.RequestEndOfMessage(1))
elif isinstance(
event,
(
layers.http.ResponseHeaders,
layers.http.ResponseData,
layers.http.ResponseTrailers,
layers.http.ResponseEndOfMessage,
layers.http.ResponseProtocolError,
),
):
pass
else: # pragma: no cover
logger.warning(f"Unexpected event during replay: {event}")
class ReplayHandler(server.ConnectionHandler):
layer: layers.HttpLayer
def __init__(self, flow: http.HTTPFlow, options: Options) -> None:
client = flow.client_conn.copy()
client.state = ConnectionState.OPEN
context = Context(client, options)
context.server = Server(address=(flow.request.host, flow.request.port))
if flow.request.scheme == "https":
context.server.tls = True
context.server.sni = flow.request.pretty_host
if options.mode and options.mode[0].startswith("upstream:"):
mode = UpstreamMode.parse(options.mode[0])
assert isinstance(mode, UpstreamMode) # remove once mypy supports Self.
context.server.via = flow.server_conn.via = (mode.scheme, mode.address)
super().__init__(context)
if options.mode and options.mode[0].startswith("upstream:"):
self.layer = layers.HttpLayer(context, HTTPMode.upstream)
else:
self.layer = layers.HttpLayer(context, HTTPMode.transparent)
self.layer.connections[client] = MockServer(flow, context.fork())
self.flow = flow
self.done = asyncio.Event()
async def replay(self) -> None:
self.server_event(events.Start())
await self.done.wait()
def log(
self,
message: str,
level: int = logging.INFO,
exc_info: Literal[True]
| tuple[type[BaseException] | None, BaseException | None, TracebackType | None]
| None = None,
) -> None:
assert isinstance(level, int)
logger.log(level=level, msg=f"[replay] {message}")
async def handle_hook(self, hook: commands.StartHook) -> None:
(data,) = hook.args()
await ctx.master.addons.handle_lifecycle(hook)
if isinstance(data, flow.Flow):
await data.wait_for_resume()
if isinstance(hook, (layers.http.HttpResponseHook, layers.http.HttpErrorHook)):
if self.transports:
# close server connections
for x in self.transports.values():
if x.handler:
x.handler.cancel()
await asyncio.wait(
[x.handler for x in self.transports.values() if x.handler]
)
# signal completion
self.done.set()
class ClientPlayback:
playback_task: asyncio.Task | None = None
inflight: http.HTTPFlow | None
queue: asyncio.Queue
options: Options
replay_tasks: set[asyncio.Task]
def __init__(self):
self.queue = asyncio.Queue()
self.inflight = None
self.task = None
self.replay_tasks = set()
def running(self):
self.playback_task = asyncio_utils.create_task(
self.playback(), name="client playback"
)
self.options = ctx.options
async def done(self):
if self.playback_task:
self.playback_task.cancel()
try:
await self.playback_task
except asyncio.CancelledError:
pass
async def playback(self):
while True:
self.inflight = await self.queue.get()
try:
assert self.inflight
h = ReplayHandler(self.inflight, self.options)
if ctx.options.client_replay_concurrency == -1:
t = asyncio_utils.create_task(
h.replay(), name="client playback awaiting response"
)
# keep a reference so this is not garbage collected
self.replay_tasks.add(t)
t.add_done_callback(self.replay_tasks.remove)
else:
await h.replay()
except Exception:
logger.exception(f"Client replay has crashed!")
self.queue.task_done()
self.inflight = None
def check(self, f: flow.Flow) -> str | None:
if f.live or f == self.inflight:
return "Can't replay live flow."
if f.intercepted:
return "Can't replay intercepted flow."
if isinstance(f, http.HTTPFlow):
if not f.request:
return "Can't replay flow with missing request."
if f.request.raw_content is None:
return "Can't replay flow with missing content."
if f.websocket is not None:
return "Can't replay WebSocket flows."
else:
return "Can only replay HTTP flows."
return None
def load(self, loader):
loader.add_option(
"client_replay",
Sequence[str],
[],
"Replay client requests from a saved file.",
)
loader.add_option(
"client_replay_concurrency",
int,
1,
"Concurrency limit on in-flight client replay requests. Currently the only valid values are 1 and -1 (no limit).",
)
def configure(self, updated):
if "client_replay" in updated and ctx.options.client_replay:
try:
flows = io.read_flows_from_paths(ctx.options.client_replay)
except exceptions.FlowReadException as e:
raise exceptions.OptionsError(str(e))
self.start_replay(flows)
if "client_replay_concurrency" in updated:
if ctx.options.client_replay_concurrency not in [-1, 1]:
raise exceptions.OptionsError(
"Currently the only valid client_replay_concurrency values are -1 and 1."
)
@command.command("replay.client.count")
def count(self) -> int:
"""
Approximate number of flows queued for replay.
"""
return self.queue.qsize() + int(bool(self.inflight))
@command.command("replay.client.stop")
def stop_replay(self) -> None:
"""
Clear the replay queue.
"""
updated = []
while True:
try:
f = self.queue.get_nowait()
except asyncio.QueueEmpty:
break
else:
self.queue.task_done()
f.revert()
updated.append(f)
ctx.master.addons.trigger(UpdateHook(updated))
logger.log(ALERT, "Client replay queue cleared.")
@command.command("replay.client")
def start_replay(self, flows: Sequence[flow.Flow]) -> None:
"""
Add flows to the replay queue, skipping flows that can't be replayed.
"""
updated: list[http.HTTPFlow] = []
for f in flows:
err = self.check(f)
if err:
logger.warning(err)
continue
http_flow = cast(http.HTTPFlow, f)
# Prepare the flow for replay
http_flow.backup()
http_flow.is_replay = "request"
http_flow.response = None
http_flow.error = None
self.queue.put_nowait(http_flow)
updated.append(http_flow)
ctx.master.addons.trigger(UpdateHook(updated))
@command.command("replay.client.file")
def load_file(self, path: mitmproxy.types.Path) -> None:
"""
Load flows from file, and add them to the replay queue.
"""
try:
flows = io.read_flows_from_paths([path])
except exceptions.FlowReadException as e:
raise exceptions.CommandError(str(e))
self.start_replay(flows)