-
-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
har.py
154 lines (126 loc) · 5.03 KB
/
har.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
"""Reads HAR files into flow objects"""
import base64
import logging
import time
from datetime import datetime
from mitmproxy import connection
from mitmproxy import exceptions
from mitmproxy import http
from mitmproxy.net.http.headers import infer_content_encoding
logger = logging.getLogger(__name__)
def fix_headers(
request_headers: list[dict[str, str]] | list[tuple[str, str]],
) -> http.Headers:
"""Converts provided headers into (b"header-name", b"header-value") tuples"""
flow_headers: list[tuple[bytes, bytes]] = []
for header in request_headers:
# Applications that use the {"name":item,"value":item} notation are Brave,Chrome,Edge,Firefox,Charles,Fiddler,Insomnia,Safari
if isinstance(header, dict):
key = header["name"]
value = header["value"]
# Application that uses the [name, value] notation is Slack
else:
try:
key = header[0]
value = header[1]
except IndexError as e:
raise exceptions.OptionsError(str(e)) from e
flow_headers.append((key.encode(), value.encode()))
return http.Headers(flow_headers)
def request_to_flow(request_json: dict) -> http.HTTPFlow:
"""
Creates a HTTPFlow object from a given entry in HAR file
"""
timestamp_start = datetime.fromisoformat(
request_json["startedDateTime"].replace("Z", "+00:00")
).timestamp()
timestamp_end = timestamp_start + request_json["time"]
request_method = request_json["request"]["method"]
request_url = request_json["request"]["url"]
server_address = request_json.get("serverIPAddress", None)
request_headers = fix_headers(request_json["request"]["headers"])
http_version_req = request_json["request"]["httpVersion"]
http_version_resp = request_json["response"]["httpVersion"]
request_content = ""
# List contains all the representations of an http request across different HAR files
if request_url.startswith("http://"):
port = 80
else:
port = 443
client_conn = connection.Client(
peername=("127.0.0.1", 0),
sockname=("127.0.0.1", 0),
# TODO Get time info from HAR File
timestamp_start=time.time(),
)
if server_address:
server_conn = connection.Server(address=(server_address, port))
else:
server_conn = connection.Server(address=None)
new_flow = http.HTTPFlow(client_conn, server_conn)
if "postData" in request_json["request"]:
request_content = request_json["request"]["postData"]["text"]
new_flow.request = http.Request.make(
request_method, request_url, request_content, request_headers
)
response_code = request_json["response"]["status"]
# In Firefox HAR files images don't include response bodies
response_content = request_json["response"]["content"].get("text", "")
content_encoding = request_json["response"]["content"].get("encoding", None)
response_headers = fix_headers(request_json["response"]["headers"])
if content_encoding == "base64":
response_content = base64.b64decode(response_content)
elif isinstance(response_content, str):
# Convert text to bytes, as in `Response.set_text`
try:
response_content = http.encoding.encode(
response_content,
(
content_encoding
or infer_content_encoding(response_headers.get("content-type", ""))
),
)
except ValueError:
# Fallback to UTF-8
response_content = response_content.encode(
"utf-8", errors="surrogateescape"
)
# Then encode the content, as in `Response.set_content`
response_content = http.encoding.encode(
response_content, response_headers.get("content-encoding") or "identity"
)
new_flow.response = http.Response(
b"HTTP/1.1",
response_code,
http.status_codes.RESPONSES.get(response_code, "").encode(),
response_headers,
response_content,
None,
timestamp_start,
timestamp_end,
)
# Update timestamps
new_flow.request.timestamp_start = timestamp_start
new_flow.request.timestamp_end = timestamp_end
new_flow.client_conn.timestamp_start = timestamp_start
new_flow.client_conn.timestamp_end = timestamp_end
# Update HTTP version
match http_version_req:
case "http/2.0":
new_flow.request.http_version = "HTTP/2"
case "HTTP/2":
new_flow.request.http_version = "HTTP/2"
case "HTTP/3":
new_flow.request.http_version = "HTTP/3"
case _:
new_flow.request.http_version = "HTTP/1.1"
match http_version_resp:
case "http/2.0":
new_flow.response.http_version = "HTTP/2"
case "HTTP/2":
new_flow.response.http_version = "HTTP/2"
case "HTTP/3":
new_flow.response.http_version = "HTTP/3"
case _:
new_flow.response.http_version = "HTTP/1.1"
return new_flow