-
Notifications
You must be signed in to change notification settings - Fork 1
/
tcp.go
464 lines (409 loc) · 13.7 KB
/
tcp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
// Package saver provides the toold for saving a single flow's packets to disk.
package saver
import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"log"
"os"
"path"
"sync"
"time"
"github.com/m-lab/go/warnonerror"
"github.com/spf13/afero"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcapgo"
"github.com/m-lab/go/anonymize"
"github.com/m-lab/packet-headers/metrics"
)
// A nice example of why go generics might be nice sometimes.
func minInt(x, y int) int {
if x <= y {
return x
}
return y
}
// anonymize a packet by modifying its contained IP addresses in-place.
func anonymizePacket(a anonymize.IPAnonymizer, p gopacket.Packet) {
if p == nil || a == nil {
log.Println("Null packet or anonymizer. Can not anonymizePacket")
return
}
nl := p.NetworkLayer()
if nl == nil {
log.Println("Packets with no network layers should never make it here")
return
}
switch nl.LayerType() {
case layers.LayerTypeIPv4:
a.IP(nl.(*layers.IPv4).SrcIP)
a.IP(nl.(*layers.IPv4).DstIP)
case layers.LayerTypeIPv6:
a.IP(nl.(*layers.IPv6).SrcIP)
a.IP(nl.(*layers.IPv6).DstIP)
}
// If any application layer bytes were set, zero them out.
if p.ApplicationLayer() != nil {
c := p.ApplicationLayer().LayerContents()
for i := 0; i < len(c); i++ {
c[i] = 0
}
}
}
// prebufferWriter writes to a bytes.Buffer, until redirect() is called, then writes to the provided Writer.
// NOT THREAD SAFE.
type prebufferedWriter struct {
buf *bytes.Buffer
writer io.Writer
}
func newPrebufferedWriter() prebufferedWriter {
// Start with modest buffer that might be adequate.
return prebufferedWriter{buf: bytes.NewBuffer(make([]byte, 0, 8192))}
}
func (pw *prebufferedWriter) Redirect(w io.Writer) error {
if w == nil {
return os.ErrInvalid
}
_, err := pw.buf.WriteTo(w)
if err != nil {
return err
}
pw.buf = nil
pw.writer = w
return nil
}
func (pw *prebufferedWriter) Write(p []byte) (int, error) {
if pw.writer != nil {
n, err := pw.writer.Write(p)
return n, err
}
n, err := pw.buf.Write(p)
return n, err
}
// UUIDEvent is passed to the saver along with an event arrival timestamp so
// that the saver can produce an appropriately-named pcap file.
type UUIDEvent struct {
UUID string
Timestamp time.Time
}
func filename(dir string, e UUIDEvent) (string, string) {
return path.Join(dir, e.Timestamp.Format("2006/01/02")), e.UUID + ".pcap.gz"
}
type statusSetter interface {
Set(status string)
Done()
Get() string
}
type status struct {
status string
mu sync.Mutex
}
func newStatus(beginstate string) *status {
metrics.SaverCount.WithLabelValues(beginstate).Inc()
return &status{status: beginstate}
}
func (s *status) Set(newstatus string) {
s.mu.Lock()
defer s.mu.Unlock()
var oldstatus string
oldstatus, s.status = s.status, newstatus
metrics.SaverCount.WithLabelValues(oldstatus).Dec()
metrics.SaverCount.WithLabelValues(newstatus).Inc()
}
func (s *status) Done() {
s.mu.Lock()
defer s.mu.Unlock()
metrics.SaverCount.WithLabelValues(s.status).Dec()
s.status = "stopped"
}
func (s *status) Get() string {
s.mu.Lock()
defer s.mu.Unlock()
return s.status
}
// TCP provides two channels to allow packets to be saved. A well-buffered
// channel for packets and a channel to receive the UUID.
type TCP struct {
// Pchan is the channel down which pointers to packets will be sent.
Pchan chan<- gopacket.Packet
// UUIDChan is the channel that receives UUIDs with timestamps.
UUIDchan chan<- UUIDEvent
// The internal-only readable channels.
pchanRead <-chan gopacket.Packet
uuidchanRead <-chan UUIDEvent
fs afero.Fs
dir string
cancel func()
state statusSetter
anon anonymize.IPAnonymizer
stream bool
id string
stopOnce sync.Once
}
// Increment the error counter when errors are encountered.
func (t *TCP) error(cause string) {
t.state.Set(cause + "error")
metrics.SaverErrors.WithLabelValues(cause).Inc()
}
// Start the process of reading the data and saving it to a file.
func (t *TCP) start(ctx context.Context, uuidDelay, duration time.Duration) {
metrics.SaversStarted.Inc()
defer metrics.SaversStopped.Inc()
defer t.state.Done()
t.savePackets(ctx, uuidDelay, duration)
t.discardPackets(ctx)
}
// savePackets takes packet from the pchan, anonymizes them and buffers the
// resulting pcap file in RAM. Once the passed-in duration has passed, it writes
// the resulting file to disk.
func (t *TCP) savePackets(ctx context.Context, uuidDelay, duration time.Duration) {
pw := newPrebufferedWriter()
zip := gzip.NewWriter(&pw)
// Write PCAP data to the buffer.
w := pcapgo.NewWriterNanos(zip)
// Now save packets until the stream is done or the context is canceled.
t.state.Set("readingcandidatepackets")
// Read the first packet to determine the TCP+IP header size (as IPv6 is variable in size)
packetCtx, packetCancel := context.WithTimeout(ctx, duration)
defer packetCancel()
p, ok := t.readPacket(packetCtx)
if !ok {
// This error should never occur in production. It indicates a
// configuration error or a bug in packet-headers.
log.Println("PCAP capture cancelled with no packets for flow", t.id)
t.error("nopackets")
return
}
headerLen := len(p.Data())
// Now we try to discover the correct header length for the flow by
// discovering the size of everything before the transport layer, then
// adding that size and 60 bytes for the TCP header
// (https://en.wikipedia.org/wiki/Transmission_Control_Protocol#TCP_segment_structure).
// IPv6 supports variable-length headers (unlike IPv4, where the length of
// the IPv4 header is well-defined), so this is actually required as opposed
// to just choosing the right value as a commandline parameter.
//
// This algorithm assumes that IPv6 header lengths are stable for a given
// flow.
tl := p.TransportLayer()
if tl != nil {
// "LayerContents" == the TCP header
// (I don't know why it's not "LayerHeader")
// "LayerPayload" == everything contained within the transport (TCP)
// layer that is not the header (including all bytes for all subsequent
// layers)
//
// So, the data we want to save is: the complete packet before the TCP
// layer, plus the maximum size of a TCP header. We calculate this size
// by subtracting the actual TCP header and payload lengths from the
// overall packet size and then adding 60.
headerLen = len(p.Data()) - len(tl.LayerContents()) - len(tl.LayerPayload()) + 60
}
// Write out the header and the first packet.
w.WriteFileHeader(uint32(headerLen), layers.LinkTypeEthernet)
t.savePacket(w, p, headerLen)
t.state.Set("uuidwait")
// Read packets while waiting for uuid event, or until uuidCtx expires. The
// "uuidtimeout" error conditions below are expected to occur in production
// in at least 3 normal conditions:
//
// (1) each flow that existed prior to the start of the packet-headers
// binary will cause this error at least once,
//
// (2) long-lived flows that send packets so infrequently that the flow gets
// garbage-collected between packet arrivals will cause this error at least
// once per garbage-collection round,
//
// (3) flows that send a SYN but never complete a 3-way handshake will wake
// up the TCP saver infrastructure because it looks like a flow was about to
// be created, but then the kernel never actually creates the flow.
//
// Condition 3 is a little bit worrying, as we want an infrastructure that
// is not too vulnerable to a SYN flood.
// TODO(https://github.com/m-lab/packet-headers/issues/42)
uuidCtx, uuidCancel := context.WithTimeout(packetCtx, uuidDelay)
defer uuidCancel()
var uuidEvent UUIDEvent
uuidloop:
for {
select {
// If the context expires, return. No progress is possible.
case <-uuidCtx.Done():
log.Println("Context expired waiting for UUID for flow", t.id)
t.error("uuidtimedout")
return
// Any packets received while waiting for a UUID should be written to
// the buffer.
case p, ok := <-t.pchanRead:
if ok {
t.savePacket(w, p, headerLen)
}
// Once the UUID arrives, exit the loop.
case uuidEvent, ok = <-t.uuidchanRead:
if !ok {
// If the channel is closed, then we can never get the uuid, so stop capturing.
log.Println("UUID channel closed, PCAP capture cancelled with no UUID for flow", t.id)
t.error("uuidchanclosed")
return
}
// This is the expected common case.
t.state.Set("uuidfound")
// break from the select statement and the surrounding loop.
break uuidloop
}
}
// uuidEvent is now set to a good value.
// Create a file and directory based on the UUID and the time.
t.state.Set("dircreation")
dir, fname := filename(t.dir, uuidEvent)
log.Println("Create", dir)
err := t.fs.MkdirAll(dir, 0777)
if err != nil {
t.state.Set("mkdirerror")
log.Println("Could not create directory", dir, err)
t.error("mkdir")
return
}
fullFilename := path.Join(dir, fname)
if t.stream {
// Switch to file output mode, and write first part of file.
t.state.Set("writepartial")
file, err := t.fs.OpenFile(fullFilename, os.O_WRONLY|os.O_CREATE, 0664)
if err != nil {
t.error("fileopen")
return
}
defer warnonerror.Close(file, fmt.Sprint("Could not close", file.Name()))
err = pw.Redirect(file)
if err != nil {
t.error("writepartial")
return
}
t.state.Set("streaming")
}
// Continue reading packets until duration has elapsed.
for {
p, ok := t.readPacket(packetCtx)
if !ok {
break
}
t.savePacket(w, p, headerLen)
}
err = zip.Close()
if err != nil {
t.error("streaming")
return
}
if !t.stream {
// Switch to file output mode, and write first part of file.
t.state.Set("writefinal")
file, err := t.fs.OpenFile(fullFilename, os.O_WRONLY|os.O_CREATE, 0664)
if err != nil {
t.error("fileopen")
return
}
defer warnonerror.Close(file, fmt.Sprint("Could not close", file.Name()))
err = pw.Redirect(file)
if err != nil {
t.error("writefinal")
return
}
}
// File will be closed at end of function.
log.Println("Successfully wrote", fullFilename, "for flow", t.id)
}
// discardPackets keeps the packet channel empty by throwing away all incoming
// packets.
func (t *TCP) discardPackets(ctx context.Context) {
t.state.Set("discardingpackets")
// Now read until the channel is closed or the passed-in context is cancelled.
keepDiscarding := true
for keepDiscarding {
_, keepDiscarding = t.readPacket(ctx)
}
}
func (t *TCP) readPacket(ctx context.Context) (gopacket.Packet, bool) {
select {
case p, ok := <-t.pchanRead:
return p, ok
case <-ctx.Done():
return nil, false
}
}
func (t *TCP) savePacket(w *pcapgo.Writer, p gopacket.Packet, headerLen int) {
// First we make sure not to save things we should not.
anonymizePacket(t.anon, p)
// By design, pcaps capture packets by saving the first N bytes of each
// packet. Because we can't be sure how big a header will be before we have
// observed the flow, we have set N to be large and we trim packets down
// here to not waste space when saving them to disk and to prevent any
// privacy leaks from application layer data getting saved to .pcap files.
//
// CaptureInfo.CaptureLength specifies the saved length of the captured
// packet. It is distinct from the packet length, because it is how many
// bytes are actually returned from the pcap system, rather than how many
// bytes the packet claims to be. The pcap system does not generate captured
// packets with a CaptureLen larger than the packet size.
info := p.Metadata().CaptureInfo
info.CaptureLength = minInt(info.CaptureLength, headerLen)
data := p.Data()
if len(data) > headerLen {
data = data[:headerLen]
}
w.WritePacket(info, data)
}
// State returns the state of the saver in a form suitable for use as a label
// value in a prometheus vector.
func (t *TCP) State() string {
return t.state.Get()
}
// newTCP makes a new saver.TCP but does not start it. It is here as its own
// function to enable whitebox testing and instrumentation.
// fs MUST be non-null.
func newTCP(dir string, anon anonymize.IPAnonymizer, id string, fs afero.Fs, stream bool) *TCP {
// With a 1500 byte MTU, this is a ~10 millisecond buffer at a line rate of
// 10Gbps:
//
// 10e9 bits/second * .01 second * 1/8 bytes/bit * 1/1500 packets/byte
// = 8333.3 (rounded to 8192) packets
//
// In the worst case, where full packets are captured, this corresponds to
// 125KB for channel capacity and 12.5MB of actual packet data.
//
// If synchronization between UUID creation and packet collection is off by
// more than 10 ms, packets may be missed. However, under load testing we
// never observed capacity greater than 8K. Conditions that are worse than
// load testing will have bigger problems.
pchan := make(chan gopacket.Packet, 8192)
// There should only ever be (at most) one write to the UUIDchan, so a
// capacity of 1 means that the write should never block.
uuidchan := make(chan UUIDEvent, 1)
return &TCP{
Pchan: pchan,
pchanRead: pchan,
UUIDchan: uuidchan,
uuidchanRead: uuidchan,
fs: fs,
dir: dir,
state: newStatus("notstarted"),
anon: anon,
id: id,
stream: stream,
}
}
// StartNew creates a new saver.TCP to save a single TCP flow and starts its
// goroutine. The goroutine can be stopped either by cancelling the passed-in
// context or by closing the Pchan channel. Closing Pchan is the preferred
// method, because it is an unambiguous signal that no more packets should be
// expected for that flow.
//
// It is the caller's responsibility to close Pchan or cancel the context.
// uuidDelay must be smaller than maxDuration.
func StartNew(ctx context.Context, anon anonymize.IPAnonymizer, dir string, uuidDelay, maxDuration time.Duration, id string, stream bool) *TCP {
s := newTCP(dir, anon, id, afero.NewOsFs(), stream)
go s.start(ctx, uuidDelay, maxDuration)
return s
}