-
Notifications
You must be signed in to change notification settings - Fork 3
/
ssl_hook_split.py
executable file
·133 lines (123 loc) · 4.55 KB
/
ssl_hook_split.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/bin/env python
import re
import os
import zlib
import sys
RE_IP = re.compile("^\\w+: (0x[\\dA-F]+), address: ([\\d\\.]+), port: (\\d+), familiy: (\\d+)$")
RE_DATA = re.compile("^(\\w+): (0x[\\dA-F]+), time: (\\d+), req: (0x[\\dA-F]+), got: (0x[\\dA-F]+)$")
RE_LEN = re.compile(b'Content-Length: (\\d+)\r\n', re.IGNORECASE)
RE_GZIP = re.compile(b'Content-Encoding: gzip', re.IGNORECASE)
class Connection:
def __init__(self):
self.family = "UNKNOWN"
self.sock_ip = "UNKNOWN"
self.peer_ip = "UNKNOWN"
self.packets = {}
def auto_ungzip(data):
#return data
if not RE_GZIP.search(data):
return data
try:
start = data.index(b"\r\n\r\n") + 4
m = RE_LEN.search(data)
odata = data[start:]
ndata = zlib.decompress(odata, 32 + zlib.MAX_WBITS)
#print("success")
return data[:start] + ndata
except Exception as e:
##print(data)
#print(e)
return data
def read_connections(fname):
with open(fname, "rb") as io:
results = {}
while True:
line = ""
while True:
c = io.read(1)
assert(c)
if c[0] == 0xA:
break
line += chr(c[0])
if line.startswith("end"):
break
if line.startswith("fail"):
raise ValueError(line)
if line.startswith("module"):
continue
if line.startswith("fd_peer") or line.startswith("fd_sock"):
m = RE_IP.match(line)
assert(m)
cid = m.group(1)
address = m.group(2)
port = m.group(3)
family = m.group(4)
if cid not in results:
results[cid] = Connection()
c = results[cid]
c.family = family
if line.startswith("fd_peer"):
c.peer_ip = f"{address}_{port}"
else:
c.sock_ip = f"{address}_{port}"
continue
if line.startswith("read") or line.startswith("write"):
m = RE_DATA.match(line)
assert(m)
direction = m.group(1)
cid = m.group(2)
time = int(m.group(3))
requested = int(m.group(4), 16)
received = int(m.group(5), 16)
data = io.read(received)
assert(len(data) == received)
newline = io.read(1)
assert(newline == b"\n")
if cid not in results:
results[cid] = Connection()
c = results[cid]
key = (time, len(c.packets), direction)
c.packets[key] = list(data)
continue
raise ValueError(f"Unknown type: {line}");
for c in results.values():
last_direction = ""
last_packet = []
npackets = {}
for key in sorted(c.packets.keys()):
direction = key[2]
value = c.packets[key]
if last_direction == direction:
last_packet.extend(value)
else:
last_packet = value
last_direction = direction
npackets[key] = value
c.packets = npackets
return results
def write_connections(connections, dname):
os.makedirs(dname, exist_ok=True)
for cid, c in connections.items():
family, sock, peer = c.family, c.sock_ip, c.peer_ip
fname = f"{dname}/{cid}-{family}-{sock}-{peer}.txt"
last_direction = None
last_data = bytearray()
with open(fname, "wb") as io:
for (time, i, direction), value in c.packets.items():
if last_direction and last_direction != direction:
data = auto_ungzip(bytes(last_data))
io.write(f"{direction} {len(data)} ".encode('ascii') + b"=" * 100 + b'\n')
io.write(data)
io.write(b"\n")
last_data.clear()
last_direction = direction
last_data.extend(value)
if last_direction:
data = auto_ungzip(bytes(last_data))
io.write(f"{direction} {len(data)} ".encode('ascii') + b"=" * 100 + b'\n')
io.write(data)
io.write(b"\n")
src = sys.argv[1]
dst = sys.argv[2] if 2 in sys.argv else src.replace(".txt", "")
cons = read_connections(src)
write_connections(cons, dst)