/
connection_tracker.go
384 lines (349 loc) · 12 KB
/
connection_tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
// +build linux
package endpoint
import (
"net"
"strconv"
"time"
log "github.com/sirupsen/logrus"
"github.com/typetypetype/conntrack"
"github.com/weaveworks/scope/probe/endpoint/procspy"
"github.com/weaveworks/scope/probe/process"
"github.com/weaveworks/scope/report"
)
type connectionTracker struct {
conf ReporterConfig
flowWalker flowWalker // Interface
ebpfTracker *EbpfTracker
reverseResolver *reverseResolver
// time of the previous ebpf failure, or zero if it didn't fail
ebpfLastFailureTime time.Time
}
func newConnectionTracker(conf ReporterConfig) connectionTracker {
ct := connectionTracker{
conf: conf,
reverseResolver: newReverseResolver(),
}
if conf.UseEbpfConn {
et, err := newEbpfTracker()
if err == nil {
ct.ebpfTracker = et
go feedEBPFInitialState(conf, et)
return ct
}
log.Warnf("Error setting up the eBPF tracker, falling back to proc scanning: %v", err)
}
ct.useProcfs()
return ct
}
func flowToTuple(f conntrack.Conn) (ft fourTuple) {
if f.Orig.Dst.Equal(f.Reply.Src) {
return makeFourTuple(f.Orig.Src, f.Orig.Dst, uint16(f.Orig.SrcPort), uint16(f.Orig.DstPort))
}
// Handle DNAT-ed connections in the initial state
return makeFourTuple(f.Orig.Dst, f.Orig.Src, uint16(f.Orig.DstPort), uint16(f.Orig.SrcPort))
}
func (t *connectionTracker) useProcfs() {
t.ebpfTracker = nil
if t.conf.WalkProc && t.conf.Scanner == nil {
t.conf.Scanner = procspy.NewConnectionScanner(t.conf.ProcessCache, t.conf.SpyProcs)
}
if t.flowWalker == nil {
t.flowWalker = newConntrackFlowWalker(t.conf.UseConntrack, t.conf.ProcRoot, t.conf.BufferSize, false /* natOnly */)
}
}
// ReportConnections calls trackers according to the configuration.
func (t *connectionTracker) ReportConnections(rpt *report.Report) {
hostNodeID := report.MakeHostNodeID(t.conf.HostID)
if t.ebpfTracker != nil {
if !t.ebpfTracker.isDead() {
t.performEbpfTrack(rpt, hostNodeID)
return
}
// We only restart the EbpfTracker if the failures are not too frequent to
// avoid repeatitive restarts.
ebpfLastFailureTime := t.ebpfLastFailureTime
t.ebpfLastFailureTime = time.Now()
if ebpfLastFailureTime.After(time.Now().Add(-1 * time.Minute)) {
// Multiple failures in the last minute, fall back to proc parsing
log.Warnf("ebpf tracker died again, gently falling back to proc scanning")
t.useProcfs()
} else {
// Tolerable failure rate, restart the tracker
log.Warnf("ebpf tracker died, restarting it")
err := t.ebpfTracker.restart()
if err == nil {
feedEBPFInitialState(t.conf, t.ebpfTracker)
t.performEbpfTrack(rpt, hostNodeID)
return
}
log.Warnf("could not restart ebpf tracker, falling back to proc scanning: %v", err)
t.useProcfs()
}
}
// consult the flowWalker for short-lived (conntracked) connections
seenTuples := map[string]fourTuple{}
t.flowWalker.walkFlows(func(f conntrack.Conn, alive bool) {
tuple := flowToTuple(f)
seenTuples[tuple.key()] = tuple
t.addConnection(rpt, "", tuple, 0, 0, 0, 1)
})
if t.conf.WalkProc && t.conf.Scanner != nil {
t.performWalkProc(rpt, hostNodeID, seenTuples)
}
}
func existingFlowsFromConntrack(conf ReporterConfig) map[string]fourTuple {
seenTuples := map[string]fourTuple{}
if !conf.UseConntrack {
// log.Warnf("Not using conntrack: disabled")
} else if err := IsConntrackSupported(conf.ProcRoot); err != nil {
log.Warnf("Not using conntrack: not supported by the kernel: %s", err)
} else if existingFlows, err := conntrack.ConnectionsSize(conf.BufferSize); err != nil {
log.Errorf("conntrack existingConnections error: %v", err)
} else {
for _, f := range existingFlows {
if (f.Status & conntrack.IPS_NAT_MASK) == 0 {
continue
}
tuple := flowToTuple(f)
seenTuples[tuple.key()] = tuple
}
}
return seenTuples
}
func (t *connectionTracker) performWalkProc(rpt *report.Report, hostNodeID string, seenTuples map[string]fourTuple) error {
conns, err := t.conf.Scanner.Connections()
if err != nil {
return err
}
for conn := conns.Next(); conn != nil; conn = conns.Next() {
tuple, namespaceID, incoming := connectionTuple(conn, seenTuples)
if incoming {
t.addConnection(rpt, hostNodeID, reverse(tuple), 0, conn.Proc.PID, namespaceID, 1)
} else {
t.addConnection(rpt, hostNodeID, tuple, conn.Proc.PID, 0, namespaceID, 1)
}
}
return nil
}
// feedEBPFInitialState runs conntrack and proc parsing synchronously only
// once to initialize ebpfTracker
// This is run on a background goroutine during initial setup, so does
// not take *connectionTracker which could change under it
func feedEBPFInitialState(conf ReporterConfig, ebpfTracker *EbpfTracker) {
var processCache *process.CachingWalker
walker := process.NewWalker(conf.ProcRoot, true)
processCache = process.NewCachingWalker(walker)
processCache.Tick()
scanner := procspy.NewSyncConnectionScanner(processCache, conf.SpyProcs)
// Consult conntrack to get the initial state
seenTuples := existingFlowsFromConntrack(conf)
conns, err := scanner.Connections()
if err != nil {
log.Errorf("Error initializing ebpfTracker while scanning /proc, continuing without initial connections: %s", err)
}
scanner.Stop()
processesWaitingInAccept := []int{}
processCache.Walk(func(p, prev process.Process) {
if p.IsWaitingInAccept {
processesWaitingInAccept = append(processesWaitingInAccept, p.PID)
}
})
ebpfTracker.feedInitialConnections(conns, seenTuples, processesWaitingInAccept, report.MakeHostNodeID(conf.HostID))
}
type pidPair struct {
fromPid uint32 // zero if unknown
toPid uint32
}
type mapPortToPids map[uint16]pidPair
func (t *connectionTracker) performEbpfTrack(rpt *report.Report, hostNodeID string) error {
/* Collect the connections by from/to address pairs (scoped by namespace) plus destination port
There are three main cases:
* connections from address+port off-box to a local process
- in this case we know the pid of the local process
* connections from local processes to an off-box address+port
- we will know the pids of the local processes but not the remote
* connections from local processes to a local process
- these connections will each be reported twice by ebpf, as incoming and as outgoing.
*/
type triple struct {
fromAddr, toAddr [net.IPv4len]byte
networkNamespace uint32
toPort uint16
}
connectionsByTriple := make(map[triple]mapPortToPids, 1000)
t.ebpfTracker.walkConnections(func(key ebpfKey, e ebpfDetail) {
var t triple
var fromPort uint16
if e.incoming {
t = triple{
fromAddr: key.toAddr,
toAddr: key.fromAddr,
toPort: key.fromPort,
networkNamespace: key.networkNamespace,
}
fromPort = key.toPort
} else {
t = triple{
fromAddr: key.fromAddr,
toAddr: key.toAddr,
toPort: key.toPort,
networkNamespace: key.networkNamespace,
}
fromPort = key.fromPort
}
portToPids := connectionsByTriple[t]
if portToPids == nil {
portToPids = make(mapPortToPids)
}
pids := portToPids[fromPort]
if e.incoming {
pids.toPid = e.pid
} else {
pids.fromPid = e.pid
}
portToPids[fromPort] = pids
connectionsByTriple[t] = portToPids
})
for triple, portToPids := range connectionsByTriple {
filter, count := makeFilter(portToPids)
seen, sent, skipped := 0, 0, 0
// Now go over everything we collected, reporting connections if they pass the filter.
// With each connection is a count of how many it stands for.
for fromPort, pids := range portToPids {
seen++
if !filter(fromPort) {
skipped++
continue
}
tuple := fourTuple{
fromAddr: triple.fromAddr,
fromPort: fromPort,
toAddr: triple.toAddr,
toPort: triple.toPort,
}
sent++
if sent == count {
// Last one in a group: add in the connections that come after this one.
skipped += (len(portToPids) - seen)
}
t.addConnection(rpt, hostNodeID, tuple, uint(pids.fromPid), uint(pids.toPid), triple.networkNamespace, skipped+1)
skipped = 0
}
}
return nil
}
// Pick a subset of the connections to send, such that if two probes
// on different machines go through the same process there is a good
// chance of overlap.
// return value is a function to filter from ports, and a count of how many will match
func makeFilter(ports mapPortToPids) (filter func(uint16) bool, count int) {
var modulus uint16 = 1
count = len(ports)
// Check they all come from/to the same pid (or zero): if differing we need another strategy to thin them down
var firstToPid, firstFromPid uint32
for _, pids := range ports {
firstFromPid = pids.fromPid
firstToPid = pids.toPid
break
}
for _, pids := range ports {
if pids.fromPid != firstFromPid || pids.toPid != firstToPid {
return func(uint16) bool { return true }, count
}
}
const (
power = 3 // Don't use powers of two to reduce aliasing with ephemeral port number selection.
lowerBound = 3
upperBound = 5
)
// Find modulus such that we choose at least the lower bound, and
// ideally no more than the upper bound
for count > upperBound {
modulus *= power
prevCount := count
// Count how many are sent for this modulus
count = 0
for fromPort := range ports {
if (fromPort % modulus) == 0 {
count++
}
}
if count < lowerBound { // too few: step back and stop there
modulus /= power
count = prevCount
break
}
}
return func(port uint16) bool { return (port % modulus) == 0 }, count
}
// tuple is canonicalised - always opened from-to
func (t *connectionTracker) addConnection(rpt *report.Report, hostNodeID string, ft fourTuple, fromPid, toPid uint, namespaceID uint32, connectionCount int) {
extraToNode := map[string]string{}
extraFromNode := map[string]string{}
if fromPid > 0 {
extraFromNode = map[string]string{
process.PID: strconv.FormatUint(uint64(fromPid), 10),
report.HostNodeID: hostNodeID,
}
}
if toPid > 0 {
extraToNode = map[string]string{
process.PID: strconv.FormatUint(uint64(toPid), 10),
report.HostNodeID: hostNodeID,
}
}
if connectionCount > 1 {
// Tell the app we have elided several connections to a common IP and port onto this one
extraFromNode[report.ConnectionCount] = strconv.Itoa(connectionCount)
}
var (
fromAddr = net.IP(ft.fromAddr[:])
fromNode = t.makeEndpointNode(namespaceID, fromAddr, ft.fromPort, extraFromNode)
toAddr = net.IP(ft.toAddr[:])
toNode = t.makeEndpointNode(namespaceID, toAddr, ft.toPort, extraToNode)
)
rpt.Endpoint.AddNode(fromNode.WithAdjacent(toNode.ID))
rpt.Endpoint.AddNode(toNode)
t.addDNS(rpt, fromAddr.String())
t.addDNS(rpt, toAddr.String())
}
func (t *connectionTracker) makeEndpointNode(namespaceID uint32, addr net.IP, port uint16, extra map[string]string) report.Node {
node := report.MakeNodeWith(report.MakeEndpointNodeIDB(t.conf.HostID, namespaceID, addr, port), nil)
if len(extra) > 0 {
node = node.WithLatests(extra)
}
return node
}
// Add DNS record for address to report, if not already there
func (t *connectionTracker) addDNS(rpt *report.Report, addr string) {
if _, found := rpt.DNS[addr]; !found {
forward := t.conf.DNSSnooper.CachedNamesForIP(addr)
record := report.DNSRecord{
Forward: report.MakeStringSet(forward...),
}
if names, err := t.reverseResolver.get(addr); err == nil && len(names) > 0 {
record.Reverse = report.MakeStringSet(names...)
}
rpt.DNS[addr] = record
}
}
func (t *connectionTracker) Stop() error {
if t.ebpfTracker != nil {
t.ebpfTracker.Stop()
}
if t.flowWalker != nil {
t.flowWalker.stop()
}
t.reverseResolver.stop()
return nil
}
func connectionTuple(conn *procspy.Connection, seenTuples map[string]fourTuple) (fourTuple, uint32, bool) {
tuple := makeFourTuple(conn.LocalAddress, conn.RemoteAddress, conn.LocalPort, conn.RemotePort)
// If we've already seen this connection, we should know the direction
// (or have already figured it out), so we normalize and use the
// canonical direction. Otherwise, we can use a port-heuristic to guess
// the direction.
canonical, ok := seenTuples[tuple.key()]
incoming := (ok && canonical != tuple) || (!ok && tuple.fromPort < tuple.toPort)
return tuple, conn.Proc.NetNamespaceID, incoming
}