From 321acbf9bf104c1b0c9516c7cc439ad91018db18 Mon Sep 17 00:00:00 2001 From: Greg Russell Date: Fri, 11 May 2018 11:39:33 -0400 Subject: [PATCH 1/8] move some stuff to inetdiag and cache --- cache/cache.go | 191 +++++++++++++++++++++++++++++++++ inetdiag/inetdiag.go | 46 ++++++++ nl-proto/tools/convert_test.go | 30 +----- 3 files changed, 239 insertions(+), 28 deletions(-) create mode 100644 cache/cache.go diff --git a/cache/cache.go b/cache/cache.go new file mode 100644 index 0000000..b617288 --- /dev/null +++ b/cache/cache.go @@ -0,0 +1,191 @@ +// Package delta keeps a cache of connection info records, and supports updates, +// and delta generation. +package delta + +import ( + "bytes" + "errors" + "fmt" + "io" + "syscall" + + "github.com/m-lab/tcp-info/inetdiag" + "github.com/m-lab/tcp-info/nl-proto/tools" +) + +// Package error messages +var ( + ErrInetDiagParseFailed = errors.New("Error parsing inetdiag message") + ErrLocal = errors.New("Connection is loopback") + ErrUnknownMessageType = errors.New("Unknown netlink message type") +) + +// Cache is a cache of all connection status. +// TODO - need to remove entries from cache when they are not present in netlink messages. +type Cache struct { + // Map from inode to ParsedMessage + Records map[uint32]*inetdiag.ParsedMessage +} + +func NewCache(raw io.WriteCloser, filter bool) *Cache { + return &Cache{Records: make(map[uint32]*inetdiag.ParsedMessage, 1000)} +} + +const countDiffTypes = false + +var DiffCounts = make(map[string]int32, 20) + +func count(name string) { + if countDiffTypes { + DiffCounts[name]++ + } +} + +func IsSame(pm *inetdiag.ParsedMessage, other *inetdiag.ParsedMessage) bool { + diff := len(pm.Attributes) != len(other.Attributes) + + // TODO - this is expensive. Probably shouldn't use it. + if *pm.InetDiagMsg != *other.InetDiagMsg { + //if len(idmDiff) > 0 { + count("idm") + + if pm.InetDiagMsg.IDiagExpires != other.InetDiagMsg.IDiagExpires { + count("expires") + } + if pm.InetDiagMsg.IDiagWqueue != other.InetDiagMsg.IDiagWqueue { + count("wqueue") + } + + tmp := *other.InetDiagMsg + tmp.IDiagExpires = pm.InetDiagMsg.IDiagExpires + + if *pm.InetDiagMsg != tmp { + //log.Println(otherDiff) + count("idm ignoring expires") + diff = true + } + } + + // log.Println(len(pm.Attributes), len(other.Attributes)) + for tp := range pm.Attributes { + now := pm.Attributes[tp] + past := other.Attributes[tp] + if now == nil && past == nil { + continue + } + if now == nil || past == nil { + count("mismatchedAttr") + diff = true + continue + } + if len(now.Value) != len(past.Value) { + count(fmt.Sprintf("AttrLength %d", now.Attr.Type)) + diff = true + } else { + // go run play.go -v -reps=300 2>&1 | grep cookie | sed -e 's/.*cookie/cookie/;' | sed -e 's/last_data_sent.*pmtu/pmtu/;' | sed -e 's/expires:.* inode:/inode:/;' | sort | uniq | wc + switch tp { + case inetdiag.INET_DIAG_MEMINFO: + if 0 != bytes.Compare(now.Value, past.Value) { + count("MemInfo") + diff = true + } + case inetdiag.INET_DIAG_INFO: + if 0 != bytes.Compare(now.Value[:tools.LastDataSentOffset], past.Value[:tools.LastDataSentOffset]) { + count("Early") + diff = true + } + if 0 != bytes.Compare(now.Value[tools.LastDataSentOffset:tools.PmtuOffset], past.Value[tools.LastDataSentOffset:tools.PmtuOffset]) { + count("Last...") + // TODO - if FineGrained + // diff = true + } + if 0 != bytes.Compare(now.Value[tools.PmtuOffset:], past.Value[tools.PmtuOffset:]) { + count("Late") + diff = true + } + case inetdiag.INET_DIAG_VEGASINFO: + if 0 != bytes.Compare(now.Value, past.Value) { + count("Vegas") + diff = true + } + case inetdiag.INET_DIAG_CONG: + if 0 != bytes.Compare(now.Value, past.Value) { + count("Cong") + diff = true + } + case inetdiag.INET_DIAG_TOS: + if 0 != bytes.Compare(now.Value, past.Value) { + count("TOS") + diff = true + } + case inetdiag.INET_DIAG_TCLASS: + if 0 != bytes.Compare(now.Value, past.Value) { + count("TCLASS") + diff = true + } + case inetdiag.INET_DIAG_SKMEMINFO: + if 0 != bytes.Compare(now.Value, past.Value) { + count("SocketMemInfo") + diff = true + } + case inetdiag.INET_DIAG_SHUTDOWN: + if 0 != bytes.Compare(now.Value, past.Value) { + count("SHUTDOWN") + diff = true + } + case inetdiag.INET_DIAG_DCTCPINFO: + if 0 != bytes.Compare(now.Value, past.Value) { + count("DCTPC") + diff = true + } + case inetdiag.INET_DIAG_PROTOCOL: + if 0 != bytes.Compare(now.Value, past.Value) { + count("Protocol") + diff = true + } + case inetdiag.INET_DIAG_SKV6ONLY: + if 0 != bytes.Compare(now.Value, past.Value) { + count("SK6") + diff = true + } + default: + if 0 != bytes.Compare(now.Value, past.Value) { + count("Other") + diff = true + } + } + } + } + return !diff +} + +// Update swaps msg with the cache contents, and returns true if there is +// any meaningful change in the content, aside from trivial count updates. +// TODO - also return the local network address? +func (c *Cache) Update(msg *syscall.NetlinkMessage) (*inetdiag.ParsedMessage, error) { + pm, err := inetdiag.Parse(msg, true) + + if err != nil { + return nil, err + } + + if pm == nil { + return nil, nil + } + + current, ok := c.Records[pm.InetDiagMsg.IDiagInode] + c.Records[pm.InetDiagMsg.IDiagInode] = pm + + if !ok { + // TODO log an error and inc monitoring. + return pm, nil + } + + same := IsSame(pm, current) + + if same { + return nil, nil + } else { + return pm, nil + } +} diff --git a/inetdiag/inetdiag.go b/inetdiag/inetdiag.go index 3f8e838..58c23eb 100644 --- a/inetdiag/inetdiag.go +++ b/inetdiag/inetdiag.go @@ -29,6 +29,7 @@ expressed in host-byte order" */ import ( + "errors" "fmt" "log" "net" @@ -36,6 +37,13 @@ import ( "unsafe" tcpinfo "github.com/m-lab/tcp-info/nl-proto" + "github.com/vishvananda/netlink/nl" +) + +// Error types. +var ( + ErrParseFailed = errors.New("Unable to parse InetDiagMsg") + ErrNotType20 = errors.New("NetlinkMessage wrong type") ) // Constants from linux. @@ -195,3 +203,41 @@ func ParseInetDiagMsg(data []byte) (*InetDiagMsg, []byte) { } return (*InetDiagMsg)(unsafe.Pointer(&data[0])), data[rtaAlignOf(int(unsafe.Sizeof(InetDiagMsg{}))):] } + +// ParsedMessage is a container for parsed InetDiag messages and attributes. +type ParsedMessage struct { + Header syscall.NlMsghdr + InetDiagMsg *InetDiagMsg + Attributes [INET_DIAG_MAX]*syscall.NetlinkRouteAttr +} + +// Parse parsed the NetlinkMessage into a ParsedMessage. If skipLocal is true, it will return nil for +// loopback, local unicast, multicast, and unspecified connections. +func Parse(msg *syscall.NetlinkMessage, skipLocal bool) (*ParsedMessage, error) { + if msg.Header.Type != 20 { + return nil, ErrNotType20 + } + idm, attrBytes := ParseInetDiagMsg(msg.Data) + if idm == nil { + return nil, ErrParseFailed + } + if skipLocal { + srcIP := idm.ID.SrcIP() + if srcIP.IsLoopback() || srcIP.IsLinkLocalUnicast() || srcIP.IsMulticast() || srcIP.IsUnspecified() { + return nil, nil + } + dstIP := idm.ID.DstIP() + if dstIP.IsLoopback() || dstIP.IsLinkLocalUnicast() || dstIP.IsMulticast() || dstIP.IsUnspecified() { + return nil, nil + } + } + parsedMsg := ParsedMessage{Header: msg.Header, InetDiagMsg: idm} + attrs, err := nl.ParseRouteAttr(attrBytes) + if err != nil { + return nil, err + } + for i := range attrs { + parsedMsg.Attributes[attrs[i].Attr.Type] = &attrs[i] + } + return &parsedMsg, nil +} diff --git a/nl-proto/tools/convert_test.go b/nl-proto/tools/convert_test.go index 811f03b..8ba36ba 100644 --- a/nl-proto/tools/convert_test.go +++ b/nl-proto/tools/convert_test.go @@ -12,7 +12,6 @@ import ( tcpinfo "github.com/m-lab/tcp-info/nl-proto" "github.com/m-lab/tcp-info/nl-proto/tools" "github.com/m-lab/tcp-info/zstd" - "github.com/vishvananda/netlink/nl" ) func init() { @@ -44,34 +43,9 @@ var ( ) func convertToProto(msg *syscall.NetlinkMessage, t *testing.T) *tcpinfo.TCPDiagnosticsProto { - if msg.Header.Type != 20 { - t.Error("Skipping unknown message type:", msg.Header) - } - idm, attrBytes := inetdiag.ParseInetDiagMsg(msg.Data) - if idm == nil { - t.Error("Couldn't parse InetDiagMsg") - } - srcIP := idm.ID.SrcIP() - if srcIP.IsLoopback() || srcIP.IsLinkLocalUnicast() || srcIP.IsMulticast() || srcIP.IsUnspecified() { - return nil - } - dstIP := idm.ID.DstIP() - if dstIP.IsLoopback() || dstIP.IsLinkLocalUnicast() || dstIP.IsMulticast() || dstIP.IsUnspecified() { - return nil - } - type ParsedMessage struct { - Header syscall.NlMsghdr - InetDiagMsg *inetdiag.InetDiagMsg - Attributes [inetdiag.INET_DIAG_MAX]*syscall.NetlinkRouteAttr - } - - parsedMsg := ParsedMessage{Header: msg.Header, InetDiagMsg: idm} - attrs, err := nl.ParseRouteAttr(attrBytes) + parsedMsg, err := inetdiag.Parse(msg, true) if err != nil { - t.Error(err) - } - for i := range attrs { - parsedMsg.Attributes[attrs[i].Attr.Type] = &attrs[i] + t.Fatal(err) } return tools.CreateProto(msg.Header, parsedMsg.InetDiagMsg, parsedMsg.Attributes[:]) } From 8d7a3c5a84d1ce4797eec9ae52d306a2fbd8530b Mon Sep 17 00:00:00 2001 From: Greg Russell Date: Fri, 11 May 2018 11:45:55 -0400 Subject: [PATCH 2/8] trim to bare minimum --- cache/cache.go | 175 +++---------------------------------------------- 1 file changed, 9 insertions(+), 166 deletions(-) diff --git a/cache/cache.go b/cache/cache.go index b617288..00900cd 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -1,16 +1,10 @@ -// Package delta keeps a cache of connection info records, and supports updates, -// and delta generation. -package delta +// Package cache keeps a cache of connection info records. +package cache import ( - "bytes" "errors" - "fmt" - "io" - "syscall" "github.com/m-lab/tcp-info/inetdiag" - "github.com/m-lab/tcp-info/nl-proto/tools" ) // Package error messages @@ -27,165 +21,14 @@ type Cache struct { Records map[uint32]*inetdiag.ParsedMessage } -func NewCache(raw io.WriteCloser, filter bool) *Cache { +// NewCache creates a cache object with capacity of 1000. +func NewCache() *Cache { return &Cache{Records: make(map[uint32]*inetdiag.ParsedMessage, 1000)} } -const countDiffTypes = false - -var DiffCounts = make(map[string]int32, 20) - -func count(name string) { - if countDiffTypes { - DiffCounts[name]++ - } -} - -func IsSame(pm *inetdiag.ParsedMessage, other *inetdiag.ParsedMessage) bool { - diff := len(pm.Attributes) != len(other.Attributes) - - // TODO - this is expensive. Probably shouldn't use it. - if *pm.InetDiagMsg != *other.InetDiagMsg { - //if len(idmDiff) > 0 { - count("idm") - - if pm.InetDiagMsg.IDiagExpires != other.InetDiagMsg.IDiagExpires { - count("expires") - } - if pm.InetDiagMsg.IDiagWqueue != other.InetDiagMsg.IDiagWqueue { - count("wqueue") - } - - tmp := *other.InetDiagMsg - tmp.IDiagExpires = pm.InetDiagMsg.IDiagExpires - - if *pm.InetDiagMsg != tmp { - //log.Println(otherDiff) - count("idm ignoring expires") - diff = true - } - } - - // log.Println(len(pm.Attributes), len(other.Attributes)) - for tp := range pm.Attributes { - now := pm.Attributes[tp] - past := other.Attributes[tp] - if now == nil && past == nil { - continue - } - if now == nil || past == nil { - count("mismatchedAttr") - diff = true - continue - } - if len(now.Value) != len(past.Value) { - count(fmt.Sprintf("AttrLength %d", now.Attr.Type)) - diff = true - } else { - // go run play.go -v -reps=300 2>&1 | grep cookie | sed -e 's/.*cookie/cookie/;' | sed -e 's/last_data_sent.*pmtu/pmtu/;' | sed -e 's/expires:.* inode:/inode:/;' | sort | uniq | wc - switch tp { - case inetdiag.INET_DIAG_MEMINFO: - if 0 != bytes.Compare(now.Value, past.Value) { - count("MemInfo") - diff = true - } - case inetdiag.INET_DIAG_INFO: - if 0 != bytes.Compare(now.Value[:tools.LastDataSentOffset], past.Value[:tools.LastDataSentOffset]) { - count("Early") - diff = true - } - if 0 != bytes.Compare(now.Value[tools.LastDataSentOffset:tools.PmtuOffset], past.Value[tools.LastDataSentOffset:tools.PmtuOffset]) { - count("Last...") - // TODO - if FineGrained - // diff = true - } - if 0 != bytes.Compare(now.Value[tools.PmtuOffset:], past.Value[tools.PmtuOffset:]) { - count("Late") - diff = true - } - case inetdiag.INET_DIAG_VEGASINFO: - if 0 != bytes.Compare(now.Value, past.Value) { - count("Vegas") - diff = true - } - case inetdiag.INET_DIAG_CONG: - if 0 != bytes.Compare(now.Value, past.Value) { - count("Cong") - diff = true - } - case inetdiag.INET_DIAG_TOS: - if 0 != bytes.Compare(now.Value, past.Value) { - count("TOS") - diff = true - } - case inetdiag.INET_DIAG_TCLASS: - if 0 != bytes.Compare(now.Value, past.Value) { - count("TCLASS") - diff = true - } - case inetdiag.INET_DIAG_SKMEMINFO: - if 0 != bytes.Compare(now.Value, past.Value) { - count("SocketMemInfo") - diff = true - } - case inetdiag.INET_DIAG_SHUTDOWN: - if 0 != bytes.Compare(now.Value, past.Value) { - count("SHUTDOWN") - diff = true - } - case inetdiag.INET_DIAG_DCTCPINFO: - if 0 != bytes.Compare(now.Value, past.Value) { - count("DCTPC") - diff = true - } - case inetdiag.INET_DIAG_PROTOCOL: - if 0 != bytes.Compare(now.Value, past.Value) { - count("Protocol") - diff = true - } - case inetdiag.INET_DIAG_SKV6ONLY: - if 0 != bytes.Compare(now.Value, past.Value) { - count("SK6") - diff = true - } - default: - if 0 != bytes.Compare(now.Value, past.Value) { - count("Other") - diff = true - } - } - } - } - return !diff -} - -// Update swaps msg with the cache contents, and returns true if there is -// any meaningful change in the content, aside from trivial count updates. -// TODO - also return the local network address? -func (c *Cache) Update(msg *syscall.NetlinkMessage) (*inetdiag.ParsedMessage, error) { - pm, err := inetdiag.Parse(msg, true) - - if err != nil { - return nil, err - } - - if pm == nil { - return nil, nil - } - - current, ok := c.Records[pm.InetDiagMsg.IDiagInode] - c.Records[pm.InetDiagMsg.IDiagInode] = pm - - if !ok { - // TODO log an error and inc monitoring. - return pm, nil - } - - same := IsSame(pm, current) - - if same { - return nil, nil - } else { - return pm, nil - } +// Swap swaps msg with the cache contents, and returns the cached value. +func (c *Cache) Swap(msg *inetdiag.ParsedMessage) (*inetdiag.ParsedMessage, error) { + current := c.Records[msg.InetDiagMsg.IDiagInode] + c.Records[msg.InetDiagMsg.IDiagInode] = msg + return current, nil } From 1da83246e0a6682a6979219b8c1a5dc1018082ab Mon Sep 17 00:00:00 2001 From: Greg Russell Date: Fri, 11 May 2018 18:13:32 -0400 Subject: [PATCH 3/8] EndCycle, unit test --- README.md | 8 ++++---- cache/cache.go | 30 +++++++++++++++++++++++------- cache/cache_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 70 insertions(+), 11 deletions(-) create mode 100644 cache/cache_test.go diff --git a/README.md b/README.md index b0b16ee..fa5ffbf 100644 --- a/README.md +++ b/README.md @@ -11,13 +11,13 @@ This repository uses protobuffers and zstd. To build it locally you will need t compiler ```bash -`wget https://github.com/google/protobuf/releases/download/v3.5.1/protoc-3.5.1-linux-x86_64.zip` -`unzip protoc-3.5.1-linux-x86_64.zip` -`cd nl-proto && ../bin/protoc --go_out=. *.proto` +wget https://github.com/google/protobuf/releases/download/v3.5.1/protoc-3.5.1-linux-x86_64.zip +unzip protoc-3.5.1-linux-x86_64.zip +cd nl-proto && ../bin/protoc --go_out=. *.proto ``` To run the tools, you will also require zstd, which can be installed with: ```bash -`bash <(curl -fsSL https://raw.githubusercontent.com/horta/zstd.install/master/install)` +bash <(curl -fsSL https://raw.githubusercontent.com/horta/zstd.install/master/install) ``` \ No newline at end of file diff --git a/cache/cache.go b/cache/cache.go index 00900cd..f1c1d31 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -18,17 +18,33 @@ var ( // TODO - need to remove entries from cache when they are not present in netlink messages. type Cache struct { // Map from inode to ParsedMessage - Records map[uint32]*inetdiag.ParsedMessage + current map[uint32]*inetdiag.ParsedMessage + past map[uint32]*inetdiag.ParsedMessage } // NewCache creates a cache object with capacity of 1000. func NewCache() *Cache { - return &Cache{Records: make(map[uint32]*inetdiag.ParsedMessage, 1000)} + return &Cache{current: make(map[uint32]*inetdiag.ParsedMessage, 1000), + past: make(map[uint32]*inetdiag.ParsedMessage, 1000)} } -// Swap swaps msg with the cache contents, and returns the cached value. -func (c *Cache) Swap(msg *inetdiag.ParsedMessage) (*inetdiag.ParsedMessage, error) { - current := c.Records[msg.InetDiagMsg.IDiagInode] - c.Records[msg.InetDiagMsg.IDiagInode] = msg - return current, nil +// Update swaps msg with the cache contents, and returns the cached value. +func (c *Cache) Update(msg *inetdiag.ParsedMessage) *inetdiag.ParsedMessage { + inode := msg.InetDiagMsg.IDiagInode + c.current[inode] = msg + tmp, ok := c.past[inode] + if ok { + delete(c.past, inode) + } + return tmp +} + +// EndCycle marks the completion of updates from the kernel messages. +// It returns all messages that did not have corresponding inodes in the most recent +// batch of records from the kernel. +func (c *Cache) EndCycle() map[uint32]*inetdiag.ParsedMessage { + tmp := c.past + c.past = c.current + c.current = make(map[uint32]*inetdiag.ParsedMessage, len(c.past)+10) + return tmp } diff --git a/cache/cache_test.go b/cache/cache_test.go new file mode 100644 index 0000000..9199567 --- /dev/null +++ b/cache/cache_test.go @@ -0,0 +1,43 @@ +package cache_test + +import ( + "testing" + + "github.com/m-lab/tcp-info/cache" + "github.com/m-lab/tcp-info/inetdiag" +) + +func TestUpdate(t *testing.T) { + c := cache.NewCache() + pm1 := inetdiag.ParsedMessage{InetDiagMsg: &inetdiag.InetDiagMsg{IDiagInode: 1234}} + old := c.Update(&pm1) + if old != nil { + t.Error("old should be nil") + } + pm2 := inetdiag.ParsedMessage{InetDiagMsg: &inetdiag.InetDiagMsg{IDiagInode: 4321}} + old = c.Update(&pm2) + if old != nil { + t.Error("old should be nil") + } + + leftover := c.EndCycle() + if len(leftover) > 0 { + t.Error("Should be empty") + } + + pm3 := inetdiag.ParsedMessage{InetDiagMsg: &inetdiag.InetDiagMsg{IDiagInode: 4321}} + old = c.Update(&pm3) + if old == nil { + t.Error("old should NOT be nil") + } + + leftover = c.EndCycle() + if len(leftover) != 1 { + t.Error("Should not be empty") + } + for k := range leftover { + if *leftover[k] != pm1 { + t.Error("Should have found pm1") + } + } +} From ef8940fb26fb3df3f6a930e5d19963a36a580633 Mon Sep 17 00:00:00 2001 From: Greg Russell Date: Fri, 11 May 2018 18:18:07 -0400 Subject: [PATCH 4/8] Add cache to travis tests --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 0d8f3d7..295b2db 100644 --- a/.travis.yml +++ b/.travis.yml @@ -56,7 +56,7 @@ script: - cd $TRAVIS_BUILD_DIR # To start, run all the non-integration tests. -- MODULES="inetdiag zstd nl-proto/tools" +- MODULES="inetdiag zstd nl-proto/tools cache" - for module in $MODULES; do COVER_PKGS=${COVER_PKGS}./$module/..., ; done From 398401e242232a833f62f668d10f2a480876f729 Mon Sep 17 00:00:00 2001 From: Greg Russell Date: Mon, 14 May 2018 14:50:19 -0400 Subject: [PATCH 5/8] use sys/unix instead of vishvananda --- inetdiag/inetdiag.go | 43 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 35 insertions(+), 8 deletions(-) diff --git a/inetdiag/inetdiag.go b/inetdiag/inetdiag.go index 58c23eb..b0f7739 100644 --- a/inetdiag/inetdiag.go +++ b/inetdiag/inetdiag.go @@ -37,7 +37,7 @@ import ( "unsafe" tcpinfo "github.com/m-lab/tcp-info/nl-proto" - "github.com/vishvananda/netlink/nl" + "golang.org/x/sys/unix" ) // Error types. @@ -186,12 +186,6 @@ func (msg *InetDiagMsg) String() string { return fmt.Sprintf("%s, %s, %s", diagFamilyMap[msg.IDiagFamily], tcpinfo.TCPState(msg.IDiagState), msg.ID.String()) } -// rtaAlignOf round the length of a netlink route attribute up to align it -// properly. -func rtaAlignOf(attrlen int) int { - return (attrlen + syscall.RTA_ALIGNTO - 1) & ^(syscall.RTA_ALIGNTO - 1) -} - // ParseInetDiagMsg returns the InetDiagMsg itself, and the aligned byte array containing the message content. // Modified from original to also return attribute data array. func ParseInetDiagMsg(data []byte) (*InetDiagMsg, []byte) { @@ -232,7 +226,7 @@ func Parse(msg *syscall.NetlinkMessage, skipLocal bool) (*ParsedMessage, error) } } parsedMsg := ParsedMessage{Header: msg.Header, InetDiagMsg: idm} - attrs, err := nl.ParseRouteAttr(attrBytes) + attrs, err := ParseRouteAttr(attrBytes) if err != nil { return nil, err } @@ -241,3 +235,36 @@ func Parse(msg *syscall.NetlinkMessage, skipLocal bool) (*ParsedMessage, error) } return &parsedMsg, nil } + +/*********************************************************************************************/ +/* Copied from "github.com/vishvananda/netlink/nl/nl_linux.go" */ +/*********************************************************************************************/ + +// ParseRouteAttr parses a byte array into a NetlinkRouteAttr struct. +func ParseRouteAttr(b []byte) ([]syscall.NetlinkRouteAttr, error) { + var attrs []syscall.NetlinkRouteAttr + for len(b) >= unix.SizeofRtAttr { + a, vbuf, alen, err := netlinkRouteAttrAndValue(b) + if err != nil { + return nil, err + } + // TODO - Resolve this ugly cross-over between sys/unix and syscall. + ra := syscall.NetlinkRouteAttr{Attr: syscall.RtAttr(*a), Value: vbuf[:int(a.Len)-unix.SizeofRtAttr]} + attrs = append(attrs, ra) + b = b[alen:] + } + return attrs, nil +} + +// rtaAlignOf rounds the length of a netlink route attribute up to align it properly. +func rtaAlignOf(attrlen int) int { + return (attrlen + unix.RTA_ALIGNTO - 1) & ^(unix.RTA_ALIGNTO - 1) +} + +func netlinkRouteAttrAndValue(b []byte) (*unix.RtAttr, []byte, int, error) { + a := (*unix.RtAttr)(unsafe.Pointer(&b[0])) + if int(a.Len) < unix.SizeofRtAttr || int(a.Len) > len(b) { + return nil, nil, 0, unix.EINVAL + } + return a, b[unix.SizeofRtAttr:], rtaAlignOf(int(a.Len)), nil +} From e6e27292a5d6673ebe1244e78448db14c8fb849c Mon Sep 17 00:00:00 2001 From: Greg Russell Date: Mon, 14 May 2018 15:31:09 -0400 Subject: [PATCH 6/8] Add TestParse --- inetdiag/inetdiag.go | 3 ++- inetdiag/inetdiag_test.go | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/inetdiag/inetdiag.go b/inetdiag/inetdiag.go index b0f7739..886bf25 100644 --- a/inetdiag/inetdiag.go +++ b/inetdiag/inetdiag.go @@ -36,8 +36,9 @@ import ( "syscall" "unsafe" - tcpinfo "github.com/m-lab/tcp-info/nl-proto" "golang.org/x/sys/unix" + + tcpinfo "github.com/m-lab/tcp-info/nl-proto" ) // Error types. diff --git a/inetdiag/inetdiag_test.go b/inetdiag/inetdiag_test.go index 50ce730..f891f94 100644 --- a/inetdiag/inetdiag_test.go +++ b/inetdiag/inetdiag_test.go @@ -1,12 +1,14 @@ package inetdiag_test import ( + "encoding/json" "log" "syscall" "testing" "unsafe" "github.com/m-lab/tcp-info/inetdiag" + "golang.org/x/sys/unix" tcpinfo "github.com/m-lab/tcp-info/nl-proto" ) @@ -107,3 +109,39 @@ func TestID6(t *testing.T) { t.Errorf("Should not be identified as loopback") } } + +func TestParse(t *testing.T) { + var json1 = `{"Header":{"Len":356,"Type":20,"Flags":2,"Seq":1,"Pid":148940},"Data":"CgEAAOpWE6cmIAAAEAMEFbM+nWqBv4ehJgf4sEANDAoAAAAAAAAAgQAAAAAdWwAAAAAAAAAAAAAAAAAAAAAAAAAAAAC13zIBBQAIAAAAAAAFAAUAIAAAAAUABgAgAAAAFAABAAAAAAAAAAAAAAAAAAAAAAAoAAcAAAAAAICiBQAAAAAAALQAAAAAAAAAAAAAAAAAAAAAAAAAAAAArAACAAEAAAAAB3gBQIoDAECcAABEBQAAuAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAUCEAAAAAAAAgIQAAQCEAANwFAACsywIAJW8AAIRKAAD///9/CgAAAJQFAAADAAAALMkAAIBwAAAAAAAALnUOAAAAAAD///////////ayBAAAAAAASfQPAAAAAADMEQAANRMAAAAAAABiNQAAxAsAAGMIAABX5AUAAAAAAAoABABjdWJpYwAAAA=="}` + nm := syscall.NetlinkMessage{} + err := json.Unmarshal([]byte(json1), &nm) + if err != nil { + log.Fatal(err) + } + mp, err := inetdiag.Parse(&nm, true) + if err != nil { + log.Fatal(err) + } + if mp.Header.Len != 356 { + t.Error("wrong length") + } + if mp.InetDiagMsg.IDiagFamily != unix.AF_INET6 { + t.Error("Should not be IPv6") + } + if len(mp.Attributes) != inetdiag.INET_DIAG_MAX { + t.Error("Should be", inetdiag.INET_DIAG_MAX, "attribute entries") + } + + nonNil := 0 + for i := range mp.Attributes { + if mp.Attributes[i] != nil { + nonNil++ + } + } + if nonNil != 7 { + t.Error("Incorrect number of attribs") + } + + if mp.Attributes[inetdiag.INET_DIAG_INFO] == nil { + t.Error("Should not be nil") + } +} From 9af48b07148a3d55fe05b49a0df1f4264cece240 Mon Sep 17 00:00:00 2001 From: Greg Russell Date: Mon, 14 May 2018 15:43:29 -0400 Subject: [PATCH 7/8] Improve test coverage --- inetdiag/inetdiag.go | 1 - inetdiag/inetdiag_test.go | 31 +++++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/inetdiag/inetdiag.go b/inetdiag/inetdiag.go index 886bf25..f464547 100644 --- a/inetdiag/inetdiag.go +++ b/inetdiag/inetdiag.go @@ -249,7 +249,6 @@ func ParseRouteAttr(b []byte) ([]syscall.NetlinkRouteAttr, error) { if err != nil { return nil, err } - // TODO - Resolve this ugly cross-over between sys/unix and syscall. ra := syscall.NetlinkRouteAttr{Attr: syscall.RtAttr(*a), Value: vbuf[:int(a.Len)-unix.SizeofRtAttr]} attrs = append(attrs, ra) b = b[alen:] diff --git a/inetdiag/inetdiag_test.go b/inetdiag/inetdiag_test.go index f891f94..1883de9 100644 --- a/inetdiag/inetdiag_test.go +++ b/inetdiag/inetdiag_test.go @@ -16,6 +16,11 @@ import ( // This is not exhaustive, but covers the basics. Integration tests will expose any more subtle // problems. +func init() { + // Always prepend the filename and line number. + log.SetFlags(log.LstdFlags | log.Lshortfile) +} + func TestSizes(t *testing.T) { if unsafe.Sizeof(inetdiag.InetDiagSockID{}) != 48 { t.Error("SockID wrong size", unsafe.Sizeof(inetdiag.InetDiagSockID{})) @@ -145,3 +150,29 @@ func TestParse(t *testing.T) { t.Error("Should not be nil") } } + +func TestParseGarbage(t *testing.T) { + var json1 = `{"Header":{"Len":356,"Type":20,"Flags":2,"Seq":1,"Pid":148940},"Data":"CgEAAOpWE6cmIAAAEAMEFbM+nWqBv4ehJgf4sEANDAoAAAAAAAAAgQAAAAAdWwAAAAAAAAAAAAAAAAAAAAAAAAAAAAC13zIBBQAIAAAAAAAFAAUAIAAAAAUABgAgAAAAFAABAAAAAAAAAAAAAAAAAAAAAAAoAAcAAAAAAICiBQAAAAAAALQAAAAAAAAAAAAAAAAAAAAAAAAAAAAArAACAAEAAAAAB3gBQIoDAECcAABEBQAAuAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAUCEAAAAAAAAgIQAAQCEAANwFAACsywIAJW8AAIRKAAD///9/CgAAAJQFAAADAAAALMkAAIBwAAAAAAAALnUOAAAAAAD///////////ayBAAAAAAASfQPAAAAAADMEQAANRMAAAAAAABiNQAAxAsAAGMIAABX5AUAAAAAAAoABABjdWJpYwAAAA=="}` + nm := syscall.NetlinkMessage{} + err := json.Unmarshal([]byte(json1), &nm) + if err != nil { + log.Fatal(err) + } + nm.Header.Type = 10 + _, err = inetdiag.Parse(&nm, false) + if err == nil { + t.Error("Should detect wrong type") + } + + nm.Header.Type = 20 + for i := range nm.Data { + // Replace the attribute records with garbage + nm.Data[i] = byte(i) + } + + _, err = inetdiag.Parse(&nm, false) + if err == nil || err.Error() != "invalid argument" { + t.Error(err) + } + +} From f5c1c211fca7991e6d6ed1daf378681f348bd2b6 Mon Sep 17 00:00:00 2001 From: Greg Russell Date: Mon, 14 May 2018 17:29:25 -0400 Subject: [PATCH 8/8] improve comments, code tweak --- cache/cache.go | 26 +++++++++++++------------- inetdiag/inetdiag_test.go | 14 ++++++++++++-- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/cache/cache.go b/cache/cache.go index f1c1d31..8b936f4 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -15,36 +15,36 @@ var ( ) // Cache is a cache of all connection status. -// TODO - need to remove entries from cache when they are not present in netlink messages. type Cache struct { // Map from inode to ParsedMessage - current map[uint32]*inetdiag.ParsedMessage - past map[uint32]*inetdiag.ParsedMessage + current map[uint32]*inetdiag.ParsedMessage // Cache of most recent messages. + previous map[uint32]*inetdiag.ParsedMessage // Cache of previous round of messages. } // NewCache creates a cache object with capacity of 1000. func NewCache() *Cache { return &Cache{current: make(map[uint32]*inetdiag.ParsedMessage, 1000), - past: make(map[uint32]*inetdiag.ParsedMessage, 1000)} + previous: make(map[uint32]*inetdiag.ParsedMessage, 0)} } -// Update swaps msg with the cache contents, and returns the cached value. +// Update swaps msg with the cache contents, and returns the evicted value. func (c *Cache) Update(msg *inetdiag.ParsedMessage) *inetdiag.ParsedMessage { inode := msg.InetDiagMsg.IDiagInode c.current[inode] = msg - tmp, ok := c.past[inode] + evicted, ok := c.previous[inode] if ok { - delete(c.past, inode) + delete(c.previous, inode) } - return tmp + return evicted } -// EndCycle marks the completion of updates from the kernel messages. +// EndCycle marks the completion of updates from one set of netlink messages. // It returns all messages that did not have corresponding inodes in the most recent -// batch of records from the kernel. +// batch of messages. func (c *Cache) EndCycle() map[uint32]*inetdiag.ParsedMessage { - tmp := c.past - c.past = c.current - c.current = make(map[uint32]*inetdiag.ParsedMessage, len(c.past)+10) + tmp := c.previous + c.previous = c.current + // Allocate a bit more than last time, to accommodate new connections. + c.current = make(map[uint32]*inetdiag.ParsedMessage, len(c.previous)+len(c.previous)/10+10) return tmp } diff --git a/inetdiag/inetdiag_test.go b/inetdiag/inetdiag_test.go index 1883de9..9ce0b5a 100644 --- a/inetdiag/inetdiag_test.go +++ b/inetdiag/inetdiag_test.go @@ -152,19 +152,23 @@ func TestParse(t *testing.T) { } func TestParseGarbage(t *testing.T) { - var json1 = `{"Header":{"Len":356,"Type":20,"Flags":2,"Seq":1,"Pid":148940},"Data":"CgEAAOpWE6cmIAAAEAMEFbM+nWqBv4ehJgf4sEANDAoAAAAAAAAAgQAAAAAdWwAAAAAAAAAAAAAAAAAAAAAAAAAAAAC13zIBBQAIAAAAAAAFAAUAIAAAAAUABgAgAAAAFAABAAAAAAAAAAAAAAAAAAAAAAAoAAcAAAAAAICiBQAAAAAAALQAAAAAAAAAAAAAAAAAAAAAAAAAAAAArAACAAEAAAAAB3gBQIoDAECcAABEBQAAuAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAUCEAAAAAAAAgIQAAQCEAANwFAACsywIAJW8AAIRKAAD///9/CgAAAJQFAAADAAAALMkAAIBwAAAAAAAALnUOAAAAAAD///////////ayBAAAAAAASfQPAAAAAADMEQAANRMAAAAAAABiNQAAxAsAAGMIAABX5AUAAAAAAAoABABjdWJpYwAAAA=="}` + // Json encoding of a good netlink message containing inet diag info. + var good = `{"Header":{"Len":356,"Type":20,"Flags":2,"Seq":1,"Pid":148940},"Data":"CgEAAOpWE6cmIAAAEAMEFbM+nWqBv4ehJgf4sEANDAoAAAAAAAAAgQAAAAAdWwAAAAAAAAAAAAAAAAAAAAAAAAAAAAC13zIBBQAIAAAAAAAFAAUAIAAAAAUABgAgAAAAFAABAAAAAAAAAAAAAAAAAAAAAAAoAAcAAAAAAICiBQAAAAAAALQAAAAAAAAAAAAAAAAAAAAAAAAAAAAArAACAAEAAAAAB3gBQIoDAECcAABEBQAAuAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAUCEAAAAAAAAgIQAAQCEAANwFAACsywIAJW8AAIRKAAD///9/CgAAAJQFAAADAAAALMkAAIBwAAAAAAAALnUOAAAAAAD///////////ayBAAAAAAASfQPAAAAAADMEQAANRMAAAAAAABiNQAAxAsAAGMIAABX5AUAAAAAAAoABABjdWJpYwAAAA=="}` nm := syscall.NetlinkMessage{} - err := json.Unmarshal([]byte(json1), &nm) + err := json.Unmarshal([]byte(good), &nm) if err != nil { log.Fatal(err) } + // Replace the header type with one that we don't support. nm.Header.Type = 10 _, err = inetdiag.Parse(&nm, false) if err == nil { t.Error("Should detect wrong type") } + // Restore the header type. nm.Header.Type = 20 + // Replace the payload with garbage. for i := range nm.Data { // Replace the attribute records with garbage nm.Data[i] = byte(i) @@ -175,4 +179,10 @@ func TestParseGarbage(t *testing.T) { t.Error(err) } + // Replace length with garbage so that data is incomplete. + nm.Header.Len = 400 + _, err = inetdiag.Parse(&nm, false) + if err == nil || err.Error() != "invalid argument" { + t.Error(err) + } }