/
archival-record.go
453 lines (408 loc) · 14.3 KB
/
archival-record.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
// Package netlink contains the bare minimum needed to partially parse netlink messages.
package netlink
import (
"bufio"
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"flag"
"io"
"log"
"net"
"strconv"
"time"
"unsafe"
"github.com/m-lab/go/logx"
"github.com/m-lab/tcp-info/inetdiag"
"github.com/m-lab/tcp-info/tcp"
)
/*********************************************************************************************
* Internal representation of NetlinkJSONL messages
*********************************************************************************************/
// Metadata contains the metadata for a particular TCP stream.
type Metadata struct {
UUID string
Sequence int
StartTime time.Time
}
// ArchivalRecord is a container for parsed InetDiag messages and attributes.
type ArchivalRecord struct {
// Timestamp should be truncated to 1 millisecond for best compression.
// Using int64 milliseconds instead reduces compressed size by 0.5 bytes/record, or about 1.5%
Timestamp time.Time `json:",omitempty"`
// Storing the RawIDM instead of the parsed InetDiagMsg reduces Marshalling by 2.6 usec, and
// typical compressed size by 3-4 bytes/record
RawIDM inetdiag.RawInetDiagMsg `json:",omitempty"` // RawInetDiagMsg within NLMsg
// Saving just the .Value fields reduces Marshalling by 1.9 usec.
Attributes [][]byte `json:",omitempty"` // byte slices from RouteAttr.Value, backed by NLMsg
// Metadata contains connection level metadata. It is typically included in the very first record
// in a file.
Metadata *Metadata `json:",omitempty"`
}
// ExcludeConfig provides options for excluding some measurements from archival messages.
type ExcludeConfig struct {
// Local excludes connections from loopback, local unicast, multicast, or unspecified connections.
Local bool
// SrcPorts excludes connections from specific source ports.
SrcPorts map[uint16]bool
DstIPs map[[16]byte]bool
}
// AddSrcPort adds the given port to the set of source ports to exclude.
func (ex *ExcludeConfig) AddSrcPort(port string) error {
i, err := strconv.ParseInt(port, 10, 16)
if err != nil {
return err
}
if ex.SrcPorts == nil {
ex.SrcPorts = map[uint16]bool{}
}
ex.SrcPorts[uint16(i)] = true
return nil
}
// AddDstIP adds the given dst IP address to the set of destination IPs to exclude.
func (ex *ExcludeConfig) AddDstIP(dst string) error {
ip := net.ParseIP(dst)
if ip == nil {
return errors.New("invalid ip: " + dst)
}
if ex.DstIPs == nil {
ex.DstIPs = map[[16]byte]bool{}
}
key := [16]byte{}
if ip.To4() != nil {
// NOTE: The Linux-native byte position for IPv4 addresses is the first four bytes.
// The net.IP package format uses the last four bytes. Copy the net.IP bytes to a
// new array to generate a key for dstIPs.
copy(key[:], ip[12:])
} else {
copy(key[:], ip[:])
}
ex.DstIPs[key] = true
return nil
}
// ParseRouteAttr parses a byte array into slice of NetlinkRouteAttr struct.
// Derived from "github.com/vishvananda/netlink/nl/nl_linux.go"
func ParseRouteAttr(b []byte) ([]NetlinkRouteAttr, error) {
var attrs []NetlinkRouteAttr
for len(b) >= SizeofRtAttr {
a, vbuf, alen, err := netlinkRouteAttrAndValue(b)
if err != nil {
return nil, err
}
ra := NetlinkRouteAttr{Attr: RtAttr(*a), Value: vbuf[:int(a.Len)-SizeofRtAttr]}
attrs = append(attrs, ra)
b = b[alen:]
}
return attrs, nil
}
// MakeArchivalRecord parses the NetlinkMessage into a ArchivalRecord. If
// exclude is not nil, MakeArchivalRecord will return nil for any condition
// matching the exclude config options, e.g. localhost, or source ports. Note
// that Parse does not populate the Timestamp field, so caller should do so.
func MakeArchivalRecord(msg *NetlinkMessage, exclude *ExcludeConfig) (*ArchivalRecord, error) {
if msg.Header.Type != 20 {
return nil, ErrNotType20
}
raw, attrBytes := inetdiag.SplitInetDiagMsg(msg.Data)
if raw == nil {
return nil, ErrParseFailed
}
if exclude != nil {
idm, err := raw.Parse()
if err != nil {
return nil, err
}
if exclude.SrcPorts != nil && exclude.SrcPorts[idm.ID.SPort()] {
return nil, nil
}
if exclude.Local && (isLocal(idm.ID.SrcIP()) || isLocal(idm.ID.DstIP())) {
return nil, nil
}
if exclude.DstIPs != nil && exclude.DstIPs[idm.ID.IDiagDst] {
// Note: byte-key lookup is preferable for performance than
// net.IP-to-String formatting. And, a byte array can be a map key,
// while a net.IP byte slice cannot.
return nil, nil
}
}
record := ArchivalRecord{RawIDM: raw}
attrs, err := ParseRouteAttr(attrBytes)
if err != nil {
return nil, err
}
maxAttrType := uint16(0)
for _, a := range attrs {
t := a.Attr.Type
if t > maxAttrType {
maxAttrType = t
}
}
if maxAttrType > 2*inetdiag.INET_DIAG_MAX {
maxAttrType = 2 * inetdiag.INET_DIAG_MAX
}
record.Attributes = make([][]byte, maxAttrType+1, maxAttrType+1)
for _, a := range attrs {
t := a.Attr.Type
if t > maxAttrType {
log.Println("Error!! Received RouteAttr with very large Type:", t)
continue
}
if record.Attributes[t] != nil {
// TODO - add metric so we can alert on these.
log.Println("Parse error - Attribute appears more than once:", t)
}
record.Attributes[t] = a.Value
}
return &record, nil
}
// ChangeType indicates why a new record is worthwhile saving.
type ChangeType int
// Constants to describe the degree of change between two different ParsedMessages.
const (
NoMajorChange ChangeType = iota
IDiagStateChange // The IDiagState changed
NoTCPInfo // There is no TCPInfo attribute
NewAttribute // There is a new attribute
LostAttribute // There is a dropped attribute
AttributeLength // The length of an attribute changed
StateOrCounterChange // One of the early fields in DIAG_INFO changed.
PacketCountChange // One of the packet/byte/segment counts (or other late field) changed
PreviousWasNil // The previous message was nil
Other // Some other attribute changed
)
// Useful offsets for Compare
const (
lastDataSentOffset = unsafe.Offsetof(tcp.LinuxTCPInfo{}.LastDataSent)
pmtuOffset = unsafe.Offsetof(tcp.LinuxTCPInfo{}.PMTU)
busytimeOffset = unsafe.Offsetof(tcp.LinuxTCPInfo{}.BusyTime)
bytesReceivedOffset = unsafe.Offsetof(tcp.LinuxTCPInfo{}.BytesReceived) // 128
bytesSentOffset = unsafe.Offsetof(tcp.LinuxTCPInfo{}.BytesSent) // 200
)
func isLocal(addr net.IP) bool {
return addr.IsLoopback() || addr.IsLinkLocalUnicast() || addr.IsMulticast() || addr.IsUnspecified()
}
// Compare compares important fields to determine whether significant updates have occurred.
// We ignore a bunch of fields:
// - The TCPInfo fields matching last_* are rapidly changing, but don't have much significance.
// Are they elapsed time fields?
// - The InetDiagMsg.Expires is also rapidly changing in many connections, but also seems
// unimportant.
//
// Significant updates are reflected in the packet, segment and byte count updates, so we
// generally want to record a snapshot when any of those change. They are in the latter
// part of the linux struct, following the pmtu field.
//
// The simplest test that seems to tell us what we care about is to look at all the fields
// in the TCPInfo struct related to packets, bytes, and segments. In addition to the TCPState
// and CAState fields, these are probably adequate, but we also check for new or missing attributes
// and any attribute difference outside of the TCPInfo (INET_DIAG_INFO) attribute.
func (pm *ArchivalRecord) Compare(previous *ArchivalRecord) (ChangeType, error) {
if previous == nil {
return PreviousWasNil, nil
}
// If the TCP state has changed, that is important!
prevIDM, err := previous.RawIDM.Parse()
if err != nil {
return NoMajorChange, ErrParseFailed
}
pmIDM, err := pm.RawIDM.Parse()
if err != nil {
return NoMajorChange, ErrParseFailed
}
if prevIDM.IDiagState != pmIDM.IDiagState {
return IDiagStateChange, nil
}
// TODO - should we validate that ID matches? Otherwise, we shouldn't even be comparing the rest.
// We now allocate only the size
if len(previous.Attributes) <= inetdiag.INET_DIAG_INFO || len(pm.Attributes) <= inetdiag.INET_DIAG_INFO {
return NoTCPInfo, nil
}
a := previous.Attributes[inetdiag.INET_DIAG_INFO]
b := pm.Attributes[inetdiag.INET_DIAG_INFO]
if a == nil || b == nil {
return NoTCPInfo, nil
}
// If any of the byte/segment/package counters have changed, that is what we are most
// interested in.
// NOTE: There are more fields beyond BusyTime, but for now we are ignoring them for diffing purposes.
if 0 != bytes.Compare(a[pmtuOffset:busytimeOffset], b[pmtuOffset:busytimeOffset]) {
return StateOrCounterChange, nil
}
// Check all the earlier fields, too. Usually these won't change unless the counters above
// change, but this way we won't miss something subtle.
if 0 != bytes.Compare(a[:lastDataSentOffset], b[:lastDataSentOffset]) {
return StateOrCounterChange, nil
}
// If any attributes have been added or removed, that is likely significant.
if len(previous.Attributes) < len(pm.Attributes) {
return NewAttribute, nil
}
if len(previous.Attributes) > len(pm.Attributes) {
return LostAttribute, nil
}
// Both slices are the same length, check for other differences...
for tp := range previous.Attributes {
if tp >= len(pm.Attributes) {
return LostAttribute, nil
}
switch tp {
case inetdiag.INET_DIAG_INFO:
// Handled explicitly above.
default:
// Detect any change in anything other than INET_DIAG_INFO
a := previous.Attributes[tp]
b := pm.Attributes[tp]
if a == nil && b != nil {
return NewAttribute, nil
}
if a != nil && b == nil {
return LostAttribute, nil
}
if a == nil && b == nil {
continue
}
if len(a) != len(b) {
return AttributeLength, nil
}
// All others we want to be identical
if 0 != bytes.Compare(a, b) {
return Other, nil
}
}
}
return NoMajorChange, nil
}
/*********************************************************************************************/
/* Utilities for loading data */
/*********************************************************************************************/
// LoadRawNetlinkMessage is a simple utility to read the next NetlinkMessage from a source reader,
// e.g. from a file of naked binary netlink messages.
// NOTE: This is a bit fragile if there are any bit errors in the message headers.
func LoadRawNetlinkMessage(rdr io.Reader) (*NetlinkMessage, error) {
var header NlMsghdr
// TODO - should we pass in LittleEndian as a parameter?
err := binary.Read(rdr, binary.LittleEndian, &header)
if err != nil {
// Note that this may be EOF
return nil, err
}
data := make([]byte, header.Len-uint32(binary.Size(header)))
err = binary.Read(rdr, binary.LittleEndian, data)
if err != nil {
return nil, err
}
return &NetlinkMessage{Header: header, Data: data}, nil
}
// ArchiveReader produces ArchivedRecord structs from some source.
type ArchiveReader interface {
// Next returns the next ArchivalRecord. Returns nil, EOF if no more records, or other error if there is a problem.
Next() (*ArchivalRecord, error)
}
type rawReader struct {
rdr io.Reader
}
// NewRawReader wraps an io.Reader to create and ArchiveReader
func NewRawReader(rdr io.Reader) ArchiveReader {
return &rawReader{rdr: rdr}
}
// Next decodes and returns the next ArchivalRecord.
func (raw *rawReader) Next() (*ArchivalRecord, error) {
msg, err := LoadRawNetlinkMessage(raw.rdr)
if err != nil {
return nil, err
}
return MakeArchivalRecord(msg, nil)
}
type archiveReader struct {
scanner *bufio.Scanner
}
// NewArchiveReader wraps a source of JSONL ArchiveRecords to create ArchiveReader
func NewArchiveReader(rdr io.Reader) ArchiveReader {
sc := bufio.NewScanner(rdr)
return &archiveReader{scanner: sc}
}
// Next decodes and returns the next ArchivalRecord.
func (ar *archiveReader) Next() (*ArchivalRecord, error) {
if !ar.scanner.Scan() {
return nil, io.EOF
}
buf := ar.scanner.Bytes()
record := ArchivalRecord{}
err := json.Unmarshal(buf, &record)
if err != nil {
return nil, err
}
return &record, nil
}
// LoadAllArchivalRecords reads all PMs from a jsonl stream.
func LoadAllArchivalRecords(rdr io.Reader) ([]*ArchivalRecord, error) {
msgs := make([]*ArchivalRecord, 0, 2000) // We typically read a large number of records
pmr := NewArchiveReader(rdr)
for {
pm, err := pmr.Next()
if err != nil {
if err == io.EOF {
return msgs, nil
}
return msgs, err
}
msgs = append(msgs, pm)
}
}
// HasDiagInfo returns true if there is a DIAG_INFO message.
func (pm *ArchivalRecord) HasDiagInfo() bool {
return len(pm.Attributes) > inetdiag.INET_DIAG_INFO
}
var sendLogger = logx.NewLogEvery(nil, time.Second)
var rcvLogger = logx.NewLogEvery(nil, time.Second)
// GetStats returns basic stats from the TCPInfo snapshot.
func (pm *ArchivalRecord) GetStats() (uint64, uint64) {
if len(pm.Attributes) <= inetdiag.INET_DIAG_INFO {
return 0, 0
}
raw := pm.Attributes[inetdiag.INET_DIAG_INFO]
// Ensure the array contains both uint64 fields.
if len(raw) < int(bytesSentOffset+8) || len(raw) < int(bytesReceivedOffset+8) {
return 0, 0
}
// The linux fields are actually uint64, though the LinuxTCPInfo struct uses int64 for bigquery compatibility.
s := *(*uint64)(unsafe.Pointer(&raw[bytesSentOffset]))
r := *(*uint64)(unsafe.Pointer(&raw[bytesReceivedOffset]))
return s, r
}
// SetBytesReceived sets the field for hacking unit tests.
func (pm *ArchivalRecord) SetBytesReceived(value uint64) uint64 {
if flag.Lookup("test.v") == nil {
panic("Allowed only in tests.")
}
if len(pm.Attributes) <= inetdiag.INET_DIAG_INFO {
return 0
}
raw := pm.Attributes[inetdiag.INET_DIAG_INFO]
if len(raw) < int(bytesReceivedOffset+8) {
return 0
}
p := (*uint64)(unsafe.Pointer(&raw[bytesReceivedOffset]))
prev := *p
*p = value
return prev
}
// SetBytesSent sets the field for hacking unit tests.
func (pm *ArchivalRecord) SetBytesSent(value uint64) uint64 {
if flag.Lookup("test.v") == nil {
panic("Allowed only in tests.")
}
if len(pm.Attributes) <= inetdiag.INET_DIAG_INFO {
return 0
}
raw := pm.Attributes[inetdiag.INET_DIAG_INFO]
// Ensure that the full field exists.
if len(raw) < int(bytesSentOffset+8) {
return 0
}
p := (*uint64)(unsafe.Pointer(&raw[bytesSentOffset]))
prev := *p
*p = value
return prev
}