Skip to content

Commit

Permalink
Merge faa2a44 into 156b703
Browse files Browse the repository at this point in the history
  • Loading branch information
gfr10598 committed Apr 4, 2019
2 parents 156b703 + faa2a44 commit f343388
Show file tree
Hide file tree
Showing 18 changed files with 410 additions and 218 deletions.
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[![GoDoc](https://godoc.org/github.com/m-lab/tcp-info?status.svg)](https://godoc.org/github.com/m-lab/tcp-info) [![Build Status](https://travis-ci.org/m-lab/tcp-info.svg?branch=master)](https://travis-ci.org/m-lab/tcp-info) [![Go Report Card](https://goreportcard.com/badge/github.com/m-lab/tcp-info)](https://goreportcard.com/report/github.com/m-lab/tcp-info) [![Coverage Status](https://coveralls.io/repos/m-lab/tcp-info/badge.svg?branch=master)](https://coveralls.io/github/m-lab/tcp-info?branch=master)

The `tcp-info` tool executes a polling loop that tracks the measurement statistics of every open TCP socket on a system. Data is written, in `jsonl` format, to files compressed using `zstd`. This tool forms the basis of a lot of measurements on the Kubernetes-based [Measurement Lab](https://measurementlab.net) platform.
The `tcp-info` tool executes a polling loop that tracks the measurement statistics of every open TCP socket on a system. Data is written, in `JSONL` format (refered to internally as *ArchivedRecord*), to files compressed using `zstd`. This tool forms the basis of a lot of measurements on the Kubernetes-based [Measurement Lab](https://measurementlab.net) platform.

We expect most people will run this tool using a
docker container. To invoke, with data written to ~/data, and prometheus
Expand All @@ -19,6 +19,7 @@ It then detects differences from one scan to the next, and queues connections th
It logs the intermediate representation through external zstd processes to one file per connection.

The previous version uses protobufs, but we have discontinued that largely because of the increased maintenance overhead, and risk of losing unparsed data.
Instead, we are now using *ArchivedRecord* which is partially parsed netlink messages, mostly in base64 encoded blobs, marshaled to JSONL format, with one JSON object per line.

To run the tests or the collection tool, you will also require zstd, which can be installed with:

Expand All @@ -32,6 +33,17 @@ OR
sudo apt-get update && sudo apt-get install -y zstd
```

# Parse library and command line tools

## The *parse* package

This package is intended to be used as a library for command line tools and other applications that need to parse the ArchivedRecord messages.

## CSV tool

The cmd/csvtool directory contains a tool for parsing ArchivedRecord and producing CSV files. Currently reads netlink-jSONL from stdin and writes CSV to stdout.


# Code Layout

* inetdiag - code related to include/uapi/linux/inet_diag.h. All structs will be in structs.go
Expand Down
16 changes: 8 additions & 8 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@ var (

// Cache is a cache of all connection status.
type Cache struct {
// Map from inode to ParsedMessage
current map[uint64]*netlink.ParsedMessage // Cache of most recent messages.
previous map[uint64]*netlink.ParsedMessage // Cache of previous round of messages.
// Map from inode to ArchivalRecord
current map[uint64]*netlink.ArchivalRecord // Cache of most recent messages.
previous map[uint64]*netlink.ArchivalRecord // Cache of previous round of messages.
cycles int64
}

// NewCache creates a cache object with capacity of 1000.
// The map size is adjusted on every sampling round, but we have to start somewhere.
func NewCache() *Cache {
return &Cache{current: make(map[uint64]*netlink.ParsedMessage, 1000),
previous: make(map[uint64]*netlink.ParsedMessage, 0)}
return &Cache{current: make(map[uint64]*netlink.ArchivalRecord, 1000),
previous: make(map[uint64]*netlink.ArchivalRecord, 0)}
}

// Update swaps msg with the cache contents, and returns the evicted value.
func (c *Cache) Update(msg *netlink.ParsedMessage) (*netlink.ParsedMessage, error) {
func (c *Cache) Update(msg *netlink.ArchivalRecord) (*netlink.ArchivalRecord, error) {
idm, err := msg.RawIDM.Parse()
if err != nil {
return nil, err
Expand All @@ -49,14 +49,14 @@ func (c *Cache) Update(msg *netlink.ParsedMessage) (*netlink.ParsedMessage, erro
// EndCycle marks the completion of updates from one set of netlink messages.
// It returns all messages that did not have corresponding inodes in the most recent
// batch of messages.
func (c *Cache) EndCycle() map[uint64]*netlink.ParsedMessage {
func (c *Cache) EndCycle() map[uint64]*netlink.ArchivalRecord {
metrics.CacheSizeHistogram.Observe(float64(len(c.current)))
tmp := c.previous
c.previous = c.current
// Allocate a bit more than previous size, to accommodate new connections.
// This will grow and shrink with the number of active connections, but
// minimize reallocation.
c.current = make(map[uint64]*netlink.ParsedMessage, len(c.previous)+len(c.previous)/10+10)
c.current = make(map[uint64]*netlink.ArchivalRecord, len(c.previous)+len(c.previous)/10+10)
c.cycles++
return tmp
}
Expand Down
6 changes: 3 additions & 3 deletions cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ func testFatal(t *testing.T, err error) {
}
}

func fakeMsg(t *testing.T, cookie uint64, dport uint16) netlink.ParsedMessage {
func fakeMsg(t *testing.T, cookie uint64, dport uint16) netlink.ArchivalRecord {
var json1 = `{"Header":{"Len":356,"Type":20,"Flags":2,"Seq":1,"Pid":148940},"Data":"CgEAAOpWE6cmIAAAEAMEFbM+nWqBv4ehJgf4sEANDAoAAAAAAAAAgQAAAAAdWwAAAAAAAAAAAAAAAAAAAAAAAAAAAAC13zIBBQAIAAAAAAAFAAUAIAAAAAUABgAgAAAAFAABAAAAAAAAAAAAAAAAAAAAAAAoAAcAAAAAAICiBQAAAAAAALQAAAAAAAAAAAAAAAAAAAAAAAAAAAAArAACAAEAAAAAB3gBQIoDAECcAABEBQAAuAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAUCEAAAAAAAAgIQAAQCEAANwFAACsywIAJW8AAIRKAAD///9/CgAAAJQFAAADAAAALMkAAIBwAAAAAAAALnUOAAAAAAD///////////ayBAAAAAAASfQPAAAAAADMEQAANRMAAAAAAABiNQAAxAsAAGMIAABX5AUAAAAAAAoABABjdWJpYwAAAA=="}`
nm := syscall.NetlinkMessage{}
err := json.Unmarshal([]byte(json1), &nm)
if err != nil {
t.Fatal(err)
}
mp, err := netlink.ParseNetlinkMessage(&nm, true)
mp, err := netlink.MakeArchivalRecord(&nm, true)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestUpdate(t *testing.T) {
}

func TestUpdateWithBadData(t *testing.T) {
var m netlink.ParsedMessage
var m netlink.ArchivalRecord
c := cache.NewCache()
_, err := c.Update(&m)
if err == nil {
Expand Down
10 changes: 5 additions & 5 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ var (
localCount = 0
)

func appendAll(all []*netlink.ParsedMessage, msgs []*syscall.NetlinkMessage, skipLocal bool) []*netlink.ParsedMessage {
func appendAll(all []*netlink.ArchivalRecord, msgs []*syscall.NetlinkMessage, skipLocal bool) []*netlink.ArchivalRecord {
// We use UTC, and truncate to millisecond to improve compression.
// Since the syscall to collect the data takes multiple milliseconds, this truncation seems reasonable.
ts := time.Now().UTC().Truncate(time.Millisecond)
for i := range msgs {
pm, err := netlink.ParseNetlinkMessage(msgs[i], skipLocal)
pm, err := netlink.MakeArchivalRecord(msgs[i], skipLocal)
if err != nil {
log.Println(err)
errCount++
Expand All @@ -39,10 +39,10 @@ func appendAll(all []*netlink.ParsedMessage, msgs []*syscall.NetlinkMessage, ski

// collectDefaultNamespace collects all AF_INET6 and AF_INET connection stats, and sends them
// to svr.
func collectDefaultNamespace(svr chan<- []*netlink.ParsedMessage, skipLocal bool) (int, int) {
func collectDefaultNamespace(svr chan<- []*netlink.ArchivalRecord, skipLocal bool) (int, int) {
// Preallocate space for up to 500 connections. We may want to adjust this upwards if profiling
// indicates a lot of reallocation.
all := make([]*netlink.ParsedMessage, 0, 500)
all := make([]*netlink.ArchivalRecord, 0, 500)
remoteCount := 0
res6, err := OneType(syscall.AF_INET6)
if err != nil {
Expand All @@ -69,7 +69,7 @@ func collectDefaultNamespace(svr chan<- []*netlink.ParsedMessage, skipLocal bool

// Run the collector, either for the specified number of loops, or, if the
// number specified is infinite, run forever.
func Run(ctx context.Context, reps int, svrChan chan<- []*netlink.ParsedMessage, cl saver.CacheLogger, skipLocal bool) (localCount, errCount int) {
func Run(ctx context.Context, reps int, svrChan chan<- []*netlink.ArchivalRecord, cl saver.CacheLogger, skipLocal bool) (localCount, errCount int) {
totalCount := 0
remoteCount := 0
loops := 0
Expand Down
14 changes: 7 additions & 7 deletions collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ type testCacheLogger struct{}

func (t *testCacheLogger) LogCacheStats(_, _ int) {}

func runTest(ctx context.Context, port int) {
func runTest(t *testing.T, ctx context.Context, port int) {
// Open a server socket, connect to it, send data to it until the context is canceled.
address := fmt.Sprintf("localhost:%d", port)
log.Println("Listening on", address)
t.Log("Listening on", address)
localAddr, err := net.ResolveTCPAddr("tcp", address)
rtx.Must(err, "No localhost")
listener, err := net.ListenTCP("tcp", localAddr)
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestRun(t *testing.T) {
port := findPort()

// A nice big buffer on the channel
msgChan := make(chan []*netlink.ParsedMessage, 10000)
msgChan := make(chan []*netlink.ArchivalRecord, 10000)
var wg sync.WaitGroup
wg.Add(2)

Expand All @@ -78,7 +78,7 @@ func TestRun(t *testing.T) {
}()

go func() {
runTest(ctx, port)
runTest(t, ctx, port)
wg.Done()
}()

Expand All @@ -96,7 +96,7 @@ func TestRun(t *testing.T) {

// Make sure we receive multiple different messages regarding the open port
count := 0
var prev *netlink.ParsedMessage
var prev *netlink.ArchivalRecord
for msgs := range msgChan {
changed := false
for _, m := range msgs {
Expand All @@ -108,7 +108,7 @@ func TestRun(t *testing.T) {
if idm != nil && idm.ID.SPort() == uint16(port) {
change, err := m.Compare(prev)
if err != nil {
log.Println(err)
t.Log(err)
} else if change > netlink.NoMajorChange {
prev = m
changed = true
Expand All @@ -124,6 +124,6 @@ func TestRun(t *testing.T) {
}
}

log.Println("Waiting for goroutines to exit")
t.Log("Waiting for goroutines to exit")
wg.Wait()
}
8 changes: 0 additions & 8 deletions collector/socket-monitor.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
package collector

/*
#include <asm/types.h>
#include <sys/socket.h>
#include <linux/netlink.h>
#include <linux/inet_diag.h>
import "C"
*/

import (
"log"
"syscall"
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ flat flat% sum% cum cum%
0.06s 1.20% 86.45% 0.06s 1.20% runtime.memclrNoHeapPointers
0.06s 1.20% 87.65% 0.08s 1.59% runtime.scanobject
0.06s 1.20% 88.84% 0.06s 1.20% syscall.RawSyscall
0.04s 0.8% 89.64% 0.07s 1.39% github.com/m-lab/tcp-info/delta.(*ParsedMessage).IsSame
0.04s 0.8% 89.64% 0.07s 1.39% github.com/m-lab/tcp-info/delta.(*ArchivalRecord).IsSame
0.04s 0.8% 90.44% 0.12s 2.39% runtime.(*mcentral).cacheSpan
0.04s 0.8% 91.24% 0.04s 0.8% runtime.duffcopy
0.04s 0.8% 92.03% 0.04s 0.8% runtime.memmove
Expand Down Expand Up @@ -93,7 +93,7 @@ func main() {
// Make the saver and construct the message channel, buffering up to 2 batches
// of messages without stalling producer. We may want to increase the buffer if
// we observe main() stalling.
svrChan := make(chan []*netlink.ParsedMessage, 2)
svrChan := make(chan []*netlink.ArchivalRecord, 2)
svr := saver.NewSaver("host", "pod", 3)
go svr.MessageSaverLoop(svrChan)

Expand Down
Binary file added netlink/archiveRecords.zstd
Binary file not shown.

0 comments on commit f343388

Please sign in to comment.