forked from mushorg/go-dpi
/
flow.go
152 lines (134 loc) · 4.58 KB
/
flow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// Package types contains the basic types used by the library.
package types
import (
"fmt"
"github.com/google/gopacket"
"github.com/patrickmn/go-cache"
"sync"
"time"
)
var flowTracker *cache.Cache
var flowTrackerMtx sync.Mutex
// ClassificationSource is the module of the library that is responsible for
// the classification of a flow.
type ClassificationSource string
// ClassificationResult contains the detected protocol and the source of
// the classification from a classification attempt.
type ClassificationResult struct {
Protocol Protocol
Source ClassificationSource
}
func (result ClassificationResult) String() string {
return fmt.Sprintf("Detected protocol %v from source %v", result.Protocol, result.Source)
}
// NoSource is returned if no classification was made.
const NoSource = ""
// Flow contains sufficient information to classify a flow.
type Flow struct {
packets []gopacket.Packet
classification ClassificationResult
mtx sync.RWMutex
}
// NewFlow creates an empty flow.
func NewFlow() (flow *Flow) {
flow = new(Flow)
flow.packets = make([]gopacket.Packet, 0)
return
}
// CreateFlowFromPacket creates a flow with a single packet.
func CreateFlowFromPacket(packet gopacket.Packet) (flow *Flow) {
flow = NewFlow()
flow.AddPacket(packet)
return
}
// AddPacket adds a new packet to the flow.
func (flow *Flow) AddPacket(packet gopacket.Packet) {
flow.mtx.Lock()
flow.packets = append(flow.packets, packet)
flow.mtx.Unlock()
}
// GetPackets returns the list of packets in a thread-safe way.
func (flow *Flow) GetPackets() (packets []gopacket.Packet) {
flow.mtx.RLock()
packets = make([]gopacket.Packet, len(flow.packets))
copy(packets, flow.packets)
flow.mtx.RUnlock()
return
}
// SetClassificationResult sets the detected protocol and classification source
// for this flow.
func (flow *Flow) SetClassificationResult(protocol Protocol, source ClassificationSource) {
flow.mtx.Lock()
flow.classification = ClassificationResult{Protocol: protocol, Source: source}
flow.mtx.Unlock()
}
// GetClassificationResult returns the currently detected protocol for this
// flow and the source of that detection.
func (flow *Flow) GetClassificationResult() (result ClassificationResult) {
flow.mtx.RLock()
result = flow.classification
flow.mtx.RUnlock()
return
}
// endpointStrFromFlows creates a string that identifies a flow from the
// network and transport flows of a packet.
func endpointStrFromFlows(networkFlow, transportFlow gopacket.Flow) string {
srcEp, dstEp := transportFlow.Endpoints()
// require a consistent ordering between the endpoints so that packets
// that go in either direction in the flow will map to the same element
// in the flowTracker map
if dstEp.LessThan(srcEp) {
networkFlow = networkFlow.Reverse()
transportFlow = transportFlow.Reverse()
}
gpktIp1, gpktIp2 := networkFlow.Endpoints()
gpktPort1, gpktPort2 := transportFlow.Endpoints()
return fmt.Sprintf("%s:%s,%s:%s", gpktIp1, gpktPort1.String(), gpktIp2, gpktPort2.String())
}
// GetFlowForPacket finds any previous flow that the packet belongs to. It adds
// the packet to that flow and returns the flow.
// If no such flow is found, a new one is created.
func GetFlowForPacket(packet gopacket.Packet) (flow *Flow, isNew bool) {
isNew = true
network := packet.NetworkLayer()
transport := packet.TransportLayer()
if network != nil && transport != nil {
gpktNetworkFlow := network.NetworkFlow()
gpktTransportFlow := transport.TransportFlow()
flowStr := endpointStrFromFlows(gpktNetworkFlow, gpktTransportFlow)
// make sure two simultaneous calls with the same flow string do not
// create a race condition
flowTrackerMtx.Lock()
trackedFlow, ok := flowTracker.Get(flowStr)
if ok {
flow = trackedFlow.(*Flow)
isNew = false
} else {
flow = NewFlow()
}
flowTracker.Set(flowStr, flow, cache.DefaultExpiration)
flowTrackerMtx.Unlock()
flow.AddPacket(packet)
} else {
flow = CreateFlowFromPacket(packet)
}
return
}
// FlushTrackedFlows flushes the map used for tracking flows. Any new packets
// that arrive after this operation will be considered new flows.
func FlushTrackedFlows() {
flowTracker.Flush()
}
// InitCache initializes the flow cache. It must be called before the cache
// is utilised. Flows will be discarded if they are inactive for the given
// duration. If that value is negative, flows will never expire.
func InitCache(expirationTime time.Duration) {
flowTracker = cache.New(expirationTime, 5*time.Minute)
}
// DestroyCache frees the resources used by the flow cache.
func DestroyCache() {
if flowTracker != nil {
flowTracker.Flush()
flowTracker = nil
}
}