forked from buger/goreplay
-
Notifications
You must be signed in to change notification settings - Fork 0
/
listener.go
141 lines (111 loc) · 3.82 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package raw_socket
import (
"encoding/binary"
"log"
"net"
"strconv"
)
// Capture traffic from socket using RAW_SOCKET's
// http://en.wikipedia.org/wiki/Raw_socket
//
// RAW_SOCKET allow you listen for traffic on any port (e.g. sniffing) because they operate on IP level.
// Ports is TCP feature, same as flow control, reliable transmission and etc.
// Since we can't use default TCP libraries RAWTCPLitener implements own TCP layer
// TCP packets is parsed using tcp_packet.go, and flow control is managed by tcp_message.go
type Listener struct {
messages map[string]*TCPMessage // buffer of TCPMessages waiting to be send
c_packets chan *TCPPacket
c_messages chan *TCPMessage // Messages ready to be send to client
c_del_message chan *TCPMessage // Used for notifications about completed or expired messages
addr string // IP to listen
port int // Port to listen
}
// RAWTCPListen creates a listener to capture traffic from RAW_SOCKET
func NewListener(addr string, port string) (rawListener *Listener) {
rawListener = &Listener{}
rawListener.c_packets = make(chan *TCPPacket, 100)
rawListener.c_messages = make(chan *TCPMessage, 100)
rawListener.c_del_message = make(chan *TCPMessage, 100)
rawListener.messages = make(map[string]*TCPMessage)
rawListener.addr = addr
rawListener.port, _ = strconv.Atoi(port)
go rawListener.listen()
go rawListener.readRAWSocket()
return
}
func (t *Listener) listen() {
for {
select {
// If message ready for deletion it means that its also complete or expired by timeout
case message := <-t.c_del_message:
t.c_messages <- message
delete(t.messages, message.ID)
// We need to use channels to process each packet to avoid data races
case packet := <-t.c_packets:
t.processTCPPacket(packet)
}
}
}
func (t *Listener) readRAWSocket() {
conn, e := net.ListenPacket("ip4:tcp", t.addr)
defer conn.Close()
if e != nil {
log.Fatal(e)
}
buf := make([]byte, 4096*2)
for {
// Note: ReadFrom receive messages without IP header
n, addr, err := conn.ReadFrom(buf)
if err != nil {
log.Println("Error:", err)
continue
}
if n > 0 {
t.parsePacket(addr, buf[:n])
}
}
}
func (t *Listener) parsePacket(addr net.Addr, buf []byte) {
if t.isIncomingDataPacket(buf) {
new_buf := make([]byte, len(buf))
copy(new_buf, buf)
t.c_packets <- ParseTCPPacket(addr, new_buf)
}
}
func (t *Listener) isIncomingDataPacket(buf []byte) bool {
// To avoid full packet parsing every time, we manually parsing values needed for packet filtering
// http://en.wikipedia.org/wiki/Transmission_Control_Protocol
dest_port := binary.BigEndian.Uint16(buf[2:4])
// Because RAW_SOCKET can't be bound to port, we have to control it by ourself
if int(dest_port) == t.port {
// Get the 'data offset' (size of the TCP header in 32-bit words)
dataOffset := (buf[12] & 0xF0) >> 4
// We need only packets with data inside
// Check that the buffer is larger than the size of the TCP header
if len(buf) > int(dataOffset*4) {
// We should create new buffer because go slices is pointers. So buffer data shoud be immutable.
return true
}
}
return false
}
// Trying to add packet to existing message or creating new message
//
// For TCP message unique id is Acknowledgment number (see tcp_packet.go)
func (t *Listener) processTCPPacket(packet *TCPPacket) {
defer func() { recover() }()
var message *TCPMessage
m_id := packet.Addr.String() + strconv.Itoa(int(packet.Ack))
message, ok := t.messages[m_id]
if !ok {
// We sending c_del_message channel, so message object can communicate with Listener and notify it if message completed
message = NewTCPMessage(m_id, t.c_del_message)
t.messages[m_id] = message
}
// Adding packet to message
message.c_packets <- packet
}
// Receive TCP messages from the listener channel
func (t *Listener) Receive() *TCPMessage {
return <-t.c_messages
}