-
Notifications
You must be signed in to change notification settings - Fork 1
/
tcp.go
218 lines (195 loc) · 6.87 KB
/
tcp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
// Package demuxer contains the tools for sending packets to the right goroutine to save them to disk.
package demuxer
import (
"context"
"runtime"
"runtime/debug"
"time"
"github.com/m-lab/go/bytecount"
"github.com/google/gopacket"
"github.com/m-lab/go/anonymize"
"github.com/m-lab/packet-headers/metrics"
"github.com/m-lab/packet-headers/saver"
"github.com/prometheus/client_golang/prometheus"
)
// UUIDEvent is the datatype sent to a demuxer's UUIDChan to notify it about the
// UUID of new flows.
type UUIDEvent struct {
saver.UUIDEvent
Flow FlowKey
}
// TCP sends each received TCP/IP packet to the proper saver. If the packet
// is not a TCP/IP packet, then the demuxer will drop it.
//
// Note for those editing this code: demuxer.TCP methods are NOT threadsafe to
// avoid needing a lock in the main packet processing loop.
type TCP struct {
UUIDChan chan<- UUIDEvent
uuidReadChan <-chan UUIDEvent
// We use a generational GC. Every time the GC timer advances, we garbage
// collect all savers in oldFlows and make all the currentFlows into
// oldFlows. It is only through this garbage collection process that
// saver.TCP objects are finalized.
currentFlows map[FlowKey]*saver.TCP
oldFlows map[FlowKey]*saver.TCP
maxIdleRAM bytecount.ByteCount
status status
// Variables required for the construction of new Savers
maxDuration time.Duration
uuidWaitDuration time.Duration
anon anonymize.IPAnonymizer
dataDir string
stream bool
maxHeap uint64
}
type status interface {
GC(stillPresent, discarded int)
}
type promStatus struct{}
func (promStatus) GC(stillPresent, discarded int) {
metrics.DemuxerGarbageCollected.Add(float64(discarded))
metrics.DemuxerSaverCount.Set(float64(stillPresent))
}
func currentHeapAboveThreshold(maxHeap uint64) bool {
ms := runtime.MemStats{}
runtime.ReadMemStats(&ms)
return (ms.HeapAlloc > maxHeap)
}
// GetSaver returns a saver with channels for packets and a uuid.
func (d *TCP) getSaver(ctx context.Context, flow FlowKey) *saver.TCP {
// Read the flow from the flows map, the oldFlows map, or create it.
t, ok := d.currentFlows[flow]
if !ok {
// Either move the saver from oldFlows to currentFlows, or create a new
// one and put it in currentFlows.
t, ok = d.oldFlows[flow]
if ok {
delete(d.oldFlows, flow)
} else {
// Only create a new Saver if we are below the configured maxHeap
// threshold. This condition is required because a SYN flood can
// cause packet-headers to allocate memory faster than partial
// connection timeouts allow the garbage collector to recover used
// memory. The result in that case would be system RAM exhaustion.
//
// NOTE: When we are above the maxHeap threshold, we may lose some
// legitimate measurements. This check is a load shedding strategy
// to keep the process running and prevent resource usage from
// packet-headers spilling over into other parts of the system.
if currentHeapAboveThreshold(d.maxHeap) {
metrics.SaversSkipped.Inc()
return nil
}
t = saver.StartNew(ctx, d.anon, d.dataDir, d.uuidWaitDuration, d.maxDuration, flow.Format(d.anon), d.stream)
}
d.currentFlows[flow] = t
}
// Whether it was retrieved or created, return the saver.TCP.
return t
}
// savePacket saves a packet to the appropriate saver.TCP
func (d *TCP) savePacket(ctx context.Context, packet gopacket.Packet) {
if packet == nil || packet.NetworkLayer() == nil || packet.TransportLayer() == nil {
metrics.DemuxerBadPacket.Inc()
return
}
// Send the packet to the saver.
s := d.getSaver(ctx, fromPacket(packet))
if s == nil {
metrics.MissedPackets.WithLabelValues("skipped").Inc()
return
}
// Don't block on channel write to the saver, but do note when it fails.
select {
case s.Pchan <- packet:
default:
metrics.MissedPackets.WithLabelValues(s.State()).Inc()
}
}
func (d *TCP) assignUUID(ctx context.Context, ev UUIDEvent) {
metrics.DemuxerUUIDCount.Inc()
s := d.getSaver(ctx, ev.Flow)
if s == nil {
metrics.MissedUUIDs.WithLabelValues("skipped").Inc()
return
}
select {
case s.UUIDchan <- ev.UUIDEvent:
default:
metrics.MissedUUIDs.WithLabelValues(s.State()).Inc()
}
}
func (d *TCP) collectGarbage() {
timer := prometheus.NewTimer(metrics.DemuxerGCLatency)
defer timer.ObserveDuration()
// Collect garbage in a separate goroutine.
go func(toBeDeleted map[FlowKey]*saver.TCP) {
for _, s := range toBeDeleted {
close(s.UUIDchan)
close(s.Pchan)
}
// Tell the VM to try and return RAM to the OS.
ms := runtime.MemStats{}
runtime.ReadMemStats(&ms)
if ms.HeapIdle > uint64(d.maxIdleRAM) {
debug.FreeOSMemory()
}
}(d.oldFlows)
// Record GC data.
d.status.GC(len(d.currentFlows), len(d.oldFlows))
// Advance the generation.
d.oldFlows = d.currentFlows
d.currentFlows = make(map[FlowKey]*saver.TCP)
}
// CapturePackets captures the packets from the channel `packets` and hands them
// off to the appropriate saver.TCP object. We can never be entirely sure that a
// flow will receive no more packets - even the "socket closed" signal from the
// kernel doesn't mean there will be no more packets. Therefore, we pass in a
// ticker for garbage collection (`gcTicker`), and when that ticker has fired
// twice without a flow receiving a packet, then that flow is assumed to be
// stopped.
//
// This function can be stopped by cancelling the passed-in context or by
// closing both the passed-in packet channel and the UUIDChan to indicate that
// no future input is possible.
func (d *TCP) CapturePackets(ctx context.Context, packets <-chan gopacket.Packet, gcTicker <-chan time.Time) {
// This is the loop that has to run at high speed. All processing that can
// happen outside this loop should happen outside this loop. No function
// called from this loop should ever block.
var ev UUIDEvent
for {
select {
case packet := <-packets:
// Get a packet and save it.
d.savePacket(ctx, packet)
case ev = <-d.uuidReadChan:
// We are being told about a new uuid
d.assignUUID(ctx, ev)
case <-gcTicker:
// Time to advance the generational garbage collector.
d.collectGarbage()
case <-ctx.Done():
// Context is cancelled.
return
}
}
}
// NewTCP creates a demuxer.TCP, which is the system which chooses which channel
// to send TCP/IP packets for subsequent saving to a file.
func NewTCP(anon anonymize.IPAnonymizer, dataDir string, uuidWaitDuration, maxFlowDuration time.Duration, maxIdleRAM bytecount.ByteCount, stream bool, maxHeap uint64) *TCP {
uuidc := make(chan UUIDEvent, 100)
return &TCP{
UUIDChan: uuidc,
uuidReadChan: uuidc,
currentFlows: make(map[FlowKey]*saver.TCP),
oldFlows: make(map[FlowKey]*saver.TCP),
maxIdleRAM: maxIdleRAM,
status: promStatus{},
anon: anon,
dataDir: dataDir,
maxDuration: maxFlowDuration,
uuidWaitDuration: uuidWaitDuration,
stream: stream,
maxHeap: maxHeap,
}
}