forked from taboola/goreplay
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tcp_message.go
102 lines (80 loc) · 2.4 KB
/
tcp_message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package raw_socket
import (
"log"
"sort"
"time"
)
const MSG_EXPIRE = 200 * time.Millisecond
// TCPMessage ensure that all TCP packets for given request is received, and processed in right sequence
// Its needed because all TCP message can be fragmented or re-transmitted
//
// Each TCP Packet have 2 ids: acknowledgment - message_id, and sequence - packet_id
// Message can be compiled from unique packets with same message_id which sorted by sequence
// Message is received if we didn't receive any packets for 200ms
type TCPMessage struct {
ID string // Message ID
packets []*TCPPacket
timer *time.Timer // Used for expire check
c_packets chan *TCPPacket
c_del_message chan *TCPMessage
}
// NewTCPMessage pointer created from a Acknowledgment number and a channel of messages readuy to be deleted
func NewTCPMessage(ID string, c_del chan *TCPMessage) (msg *TCPMessage) {
msg = &TCPMessage{ID: ID}
msg.c_packets = make(chan *TCPPacket)
msg.c_del_message = c_del // used for notifying that message completed or expired
// Every time we receive packet we reset this timer
msg.timer = time.AfterFunc(MSG_EXPIRE, msg.Timeout)
go msg.listen()
return
}
func (t *TCPMessage) listen() {
for {
select {
case packet, more := <-t.c_packets:
if more {
t.AddPacket(packet)
} else {
// Stop loop if channel closed
return
}
}
}
}
// Timeout notifies message to stop listening, close channel and message ready to be sent
func (t *TCPMessage) Timeout() {
close(t.c_packets) // Notify to stop listen loop and close channel
t.c_del_message <- t // Notify RAWListener that message is ready to be send to replay server
}
// Bytes sorts packets in right orders and return message content
func (t *TCPMessage) Bytes() (output []byte) {
mk := make([]int, len(t.packets))
i := 0
for k, _ := range t.packets {
mk[i] = k
i++
}
sort.Ints(mk)
for _, k := range mk {
output = append(output, t.packets[k].Data...)
}
return
}
// AddPacket to the message and ensure packet uniqueness
// TCP allows that packet can be re-send multiple times
func (t *TCPMessage) AddPacket(packet *TCPPacket) {
packetFound := false
for _, pkt := range t.packets {
if packet.Seq == pkt.Seq {
packetFound = true
break
}
}
if packetFound {
log.Println("Received packet with same sequence")
} else {
t.packets = append(t.packets, packet)
}
// Reset message timeout timer
t.timer.Reset(MSG_EXPIRE)
}