diff --git a/.travis.yml b/.travis.yml index 097d650..0d8f3d7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,4 @@ -# Travis configuration for gardener. +# Travis configuration for tcp-info fast sidestream tool. language: go services: - docker @@ -37,7 +37,8 @@ cache: install: # Install dependencies -- GO_IMPORTS=$(go list -f '{{join .Imports "\n"}}{{"\n"}}{{join .TestImports "\n"}}' ./... | sort | uniq | grep -v etl-gardener) +# This filters out all imports from the local project, and all "base" imports that don't contain slash. +- GO_IMPORTS=$(go list -f '{{join .Imports "\n"}}{{"\n"}}{{join .TestImports "\n"}}' ./... | sort | uniq | grep -e / | grep -v m-lab/tcp-info) - go get -u -v -d $GO_IMPORTS - go get github.com/golang/protobuf/protoc-gen-go/ @@ -55,7 +56,7 @@ script: - cd $TRAVIS_BUILD_DIR # To start, run all the non-integration tests. -- MODULES="inetdiag zstd" +- MODULES="inetdiag zstd nl-proto/tools" - for module in $MODULES; do COVER_PKGS=${COVER_PKGS}./$module/..., ; done diff --git a/inetdiag/inetdiag.go b/inetdiag/inetdiag.go index 8c4b5ee..3f8e838 100644 --- a/inetdiag/inetdiag.go +++ b/inetdiag/inetdiag.go @@ -34,6 +34,8 @@ import ( "net" "syscall" "unsafe" + + tcpinfo "github.com/m-lab/tcp-info/nl-proto" ) // Constants from linux. @@ -42,40 +44,30 @@ const ( SOCK_DIAG_BY_FAMILY = 20 // uapi/linux/sock_diag.h ) -// netinet/tcp.h -const ( - _ = iota - TCP_ESTABLISHED = iota - TCP_SYN_SENT - TCP_SYN_RECV - TCP_FIN_WAIT1 - TCP_FIN_WAIT2 - TCP_TIME_WAIT - TCP_CLOSE - TCP_CLOSE_WAIT - TCP_LAST_ACK - TCP_LISTEN - TCP_CLOSING -) - +// inet_diag.h const ( - TCP_ALL_STATES = 0xFFF + INET_DIAG_NONE = iota + INET_DIAG_MEMINFO + INET_DIAG_INFO + INET_DIAG_VEGASINFO + INET_DIAG_CONG + INET_DIAG_TOS + INET_DIAG_TCLASS + INET_DIAG_SKMEMINFO + INET_DIAG_SHUTDOWN + INET_DIAG_DCTCPINFO + INET_DIAG_PROTOCOL + INET_DIAG_SKV6ONLY + INET_DIAG_LOCALS + INET_DIAG_PEERS + INET_DIAG_PAD + INET_DIAG_MARK + INET_DIAG_BBRINFO + INET_DIAG_CLASS_ID + INET_DIAG_MD5SIG + INET_DIAG_MAX ) -var tcpStatesMap = map[uint8]string{ - TCP_ESTABLISHED: "established", - TCP_SYN_SENT: "syn_sent", - TCP_SYN_RECV: "syn_recv", - TCP_FIN_WAIT1: "fin_wait1", - TCP_FIN_WAIT2: "fin_wait2", - TCP_TIME_WAIT: "time_wait", - TCP_CLOSE: "close", - TCP_CLOSE_WAIT: "close_wait", - TCP_LAST_ACK: "last_ack", - TCP_LISTEN: "listen", - TCP_CLOSING: "closing", -} - var diagFamilyMap = map[uint8]string{ syscall.AF_INET: "tcp", syscall.AF_INET6: "tcp6", @@ -121,7 +113,7 @@ func isIpv6(original [16]byte) bool { } func ipv4(original [16]byte) net.IP { - return net.IPv4(original[0], original[1], original[2], original[3]) + return net.IPv4(original[0], original[1], original[2], original[3]).To4() } func ipv6(original [16]byte) net.IP { @@ -183,7 +175,7 @@ type InetDiagMsg struct { } func (msg *InetDiagMsg) String() string { - return fmt.Sprintf("%s, %s, %s", diagFamilyMap[msg.IDiagFamily], tcpStatesMap[msg.IDiagState], msg.ID.String()) + return fmt.Sprintf("%s, %s, %s", diagFamilyMap[msg.IDiagFamily], tcpinfo.TCPState(msg.IDiagState), msg.ID.String()) } // rtaAlignOf round the length of a netlink route attribute up to align it diff --git a/inetdiag/inetdiag_test.go b/inetdiag/inetdiag_test.go index 61a1998..50ce730 100644 --- a/inetdiag/inetdiag_test.go +++ b/inetdiag/inetdiag_test.go @@ -7,6 +7,8 @@ import ( "unsafe" "github.com/m-lab/tcp-info/inetdiag" + + tcpinfo "github.com/m-lab/tcp-info/nl-proto" ) // This is not exhaustive, but covers the basics. Integration tests will expose any more subtle @@ -32,7 +34,7 @@ func TestParseInetDiagMsg(t *testing.T) { if hdr.IDiagFamily != syscall.AF_INET { t.Errorf("Failed %+v\n", hdr) } - if hdr.IDiagState != inetdiag.TCP_SYN_RECV { + if tcpinfo.TCPState(hdr.IDiagState) != tcpinfo.TCPState_SYN_RECV { t.Errorf("Failed %+v\n", hdr) } @@ -45,7 +47,7 @@ func TestSerialize(t *testing.T) { v2 := inetdiag.NewInetDiagReqV2(syscall.AF_INET, 23, 0x0E) data := v2.Serialize() if v2.Len() != len(data) { - t.Error("That's odd") + t.Error(data, "should be length", v2.Len()) } } diff --git a/nl-proto/tools/convert.go b/nl-proto/tools/convert.go new file mode 100644 index 0000000..42752bf --- /dev/null +++ b/nl-proto/tools/convert.go @@ -0,0 +1,277 @@ +// Package tools contains tools to convert netlink messages to protobuf message types. +// It contains structs for raw linux route attribute messages related to tcp-info, +// and code for copying them into protobufs defined in tcpinfo.proto. +package tools + +import ( + "log" + "syscall" + "unsafe" + + "github.com/m-lab/tcp-info/inetdiag" + tcpinfo "github.com/m-lab/tcp-info/nl-proto" + + // Hack to force loading library, which is currently used only in nested test. + _ "github.com/vishvananda/netlink/nl" +) + +// ParseCong returns the congestion algorithm string +func ParseCong(rta *syscall.NetlinkRouteAttr) string { + return string(rta.Value[:len(rta.Value)-1]) +} + +// HeaderToProto creates an InetDiagMsgProto from the InetDiagMsg message. +func HeaderToProto(hdr *inetdiag.InetDiagMsg) *tcpinfo.InetDiagMsgProto { + p := tcpinfo.InetDiagMsgProto{} + p.Family = tcpinfo.InetDiagMsgProto_AddressFamily(hdr.IDiagFamily) + p.State = tcpinfo.TCPState(hdr.IDiagState) + p.Timer = uint32(hdr.IDiagTimer) + p.Retrans = uint32(hdr.IDiagRetrans) + p.SockId = &tcpinfo.InetSocketIDProto{} + src := tcpinfo.EndPoint{} + p.SockId.Source = &src + src.Port = uint32(hdr.ID.IDiagSPort) + src.Ip = append(src.Ip, hdr.ID.SrcIP()...) + dst := tcpinfo.EndPoint{} + p.SockId.Destination = &dst + dst.Port = uint32(hdr.ID.IDiagDPort) + dst.Ip = append(dst.Ip, hdr.ID.DstIP()...) + p.SockId.Interface = hdr.ID.IDiagIf + p.SockId.Cookie = uint64(hdr.ID.IDiagCookie[0])<<32 + uint64(hdr.ID.IDiagCookie[1]) + p.Expires = hdr.IDiagExpires + p.Rqueue = hdr.IDiagRqueue + p.Wqueue = hdr.IDiagWqueue + p.Uid = hdr.IDiagUID + p.Inode = hdr.IDiagInode + + return &p +} + +// AttrToField fills the appropriate proto subfield from a route attribute. +func AttrToField(all *tcpinfo.TCPDiagnosticsProto, rta *syscall.NetlinkRouteAttr) { + switch rta.Attr.Type { + case inetdiag.INET_DIAG_INFO: + ldiwr := ParseLinuxTCPInfo(rta) + all.TcpInfo = ldiwr.ToProto() + case inetdiag.INET_DIAG_CONG: + all.CongestionAlgorithm = ParseCong(rta) + case inetdiag.INET_DIAG_SHUTDOWN: + all.Shutdown = &tcpinfo.TCPDiagnosticsProto_ShutdownMask{ShutdownMask: uint32(rta.Value[0])} + case inetdiag.INET_DIAG_MEMINFO: + memInfo := ParseMemInfo(rta) + if memInfo != nil { + all.MemInfo = &tcpinfo.MemInfoProto{} + *all.MemInfo = *memInfo // Copy, to avoid references the attribute + } + case inetdiag.INET_DIAG_SKMEMINFO: + memInfo := ParseSockMemInfo(rta) + if memInfo != nil { + all.SocketMem = &tcpinfo.SocketMemInfoProto{} + *all.SocketMem = *memInfo // Copy, to avoid references the attribute + } + case inetdiag.INET_DIAG_TOS: + // TODO - already seeing these. Issue #10 + case inetdiag.INET_DIAG_TCLASS: + // TODO - already seeing these. Issue #10 + + // We are not seeing these so far. Should implement BBRINFO soon though. + // TODO case inetdiag.INET_DIAG_BBRINFO: + // TODO case inetdiag.INET_DIAG_VEGASINFO: + // TODO case inetdiag.INET_DIAG_SKV6ONLY: + // TODO case inetdiag.INET_DIAG_MARK: + // TODO case inetdiag.INET_DIAG_PROTOCOL: + // Used only for multicast messages. Not expected for our use cases. + default: + log.Printf("Not processing %+v\n", rta) + // TODO(gfr) - should LOG(WARNING) on missing cases. + } +} + +// CreateProto creates a fully populated TCPDiagnosticsProto from the parsed elements of a type 20 netlink message. +// This assumes the netlink message is type 20, and behavior is undefined if it is not. +func CreateProto(header syscall.NlMsghdr, idm *inetdiag.InetDiagMsg, attrs []*syscall.NetlinkRouteAttr) *tcpinfo.TCPDiagnosticsProto { + all := tcpinfo.TCPDiagnosticsProto{} + all.InetDiagMsg = HeaderToProto(idm) + for i := range attrs { + if attrs[i] != nil { + AttrToField(&all, attrs[i]) + } + } + + return &all +} + +// LinuxTCPInfo is the linux defined structure returned in RouteAttr DIAG_INFO messages. +// It corresponds to the struct tcp_info in include/uapi/linux/tcp.h +type LinuxTCPInfo struct { + state uint8 + caState uint8 + retransmits uint8 + probes uint8 + backoff uint8 + options uint8 + wscale uint8 //snd_wscale : 4, tcpi_rcv_wscale : 4; + appLimited uint8 //delivery_rate_app_limited:1; + + rto uint32 // offset 8 + ato uint32 + sndMss uint32 + rcvMss uint32 + + unacked uint32 // offset 24 + sacked uint32 + lost uint32 + retrans uint32 + fackets uint32 + + /* Times. */ + lastDataSent uint32 // offset 44 + lastAckSent uint32 /* Not remembered, sorry. */ // offset 48 + lastDataRecv uint32 // offset 52 + lastAckRecv uint32 // offset 56 + + /* Metrics. */ + pmtu uint32 + rcvSsThresh uint32 + rtt uint32 + rttvar uint32 + sndSsThresh uint32 + sndCwnd uint32 + advmss uint32 + reordering uint32 + + rcvRtt uint32 + rcvSpace uint32 + + totalRetrans uint32 + + pacingRate int64 // This is often -1, so better for it to be signed + maxPacingRate int64 // This is often -1, so better to be signed. + bytesAcked uint64 /* RFC4898 tcpEStatsAppHCThruOctetsAcked */ + bytesReceived uint64 /* RFC4898 tcpEStatsAppHCThruOctetsReceived */ + segsOut uint32 /* RFC4898 tcpEStatsPerfSegsOut */ + segsIn uint32 /* RFC4898 tcpEStatsPerfSegsIn */ + + notsentBytes uint32 + minRtt uint32 + dataSegsIn uint32 /* RFC4898 tcpEStatsDataSegsIn */ + dataSegsOut uint32 /* RFC4898 tcpEStatsDataSegsOut */ + + deliveryRate uint64 + + busyTime uint64 /* Time (usec) busy sending data */ + rwndLimited uint64 /* Time (usec) limited by receive window */ + sndbufLimited uint64 /* Time (usec) limited by send buffer */ +} + +// ToProto converts a LinuxTCPInfo struct to a TCPInfoProto +func (tcp *LinuxTCPInfo) ToProto() *tcpinfo.TCPInfoProto { + var p tcpinfo.TCPInfoProto + p.State = tcpinfo.TCPState(tcp.state) + + p.CaState = uint32(tcp.caState) + p.Retransmits = uint32(tcp.retransmits) + p.Probes = uint32(tcp.probes) + p.Backoff = uint32(tcp.backoff) + opts := tcp.options + p.Options = uint32(opts) + p.TsOpt = opts&0x01 > 0 + p.SackOpt = opts&0x02 > 0 + p.WscaleOpt = opts&0x04 > 0 + p.EcnOpt = opts&0x08 > 0 + p.EcnseenOpt = opts&0x10 > 0 + p.FastopenOpt = opts&0x20 > 0 + + p.RcvWscale = uint32(tcp.wscale & 0x0F) + p.SndWscale = uint32(tcp.wscale >> 4) + p.DeliveryRateAppLimited = tcp.appLimited > 0 + + p.Rto = tcp.rto + p.Ato = tcp.ato + p.SndMss = tcp.sndMss + p.RcvMss = tcp.rcvMss + + p.Unacked = tcp.unacked + p.Sacked = tcp.sacked + p.Lost = tcp.lost + p.Retrans = tcp.retrans + p.Fackets = tcp.fackets + p.LastDataSent = tcp.lastDataSent + p.LastAckSent = tcp.lastAckSent + p.LastDataRecv = tcp.lastDataRecv + p.LastAckRecv = tcp.lastAckRecv + + p.Pmtu = tcp.pmtu + if tcp.rcvSsThresh < 0xFFFF { + p.RcvSsthresh = tcp.rcvSsThresh + } + p.Rtt = tcp.rtt + p.Rttvar = tcp.rttvar + p.SndSsthresh = tcp.sndSsThresh + p.SndCwnd = tcp.sndCwnd + p.Advmss = tcp.advmss + p.Reordering = tcp.reordering + + p.RcvRtt = tcp.rcvRtt + p.RcvSpace = tcp.rcvSpace + p.TotalRetrans = tcp.totalRetrans + + p.PacingRate = tcp.pacingRate + p.MaxPacingRate = tcp.maxPacingRate + p.BytesAcked = tcp.bytesAcked + p.BytesReceived = tcp.bytesReceived + + p.SegsOut = tcp.segsOut + p.SegsIn = tcp.segsIn + + p.NotsentBytes = tcp.notsentBytes + p.MinRtt = tcp.minRtt + p.DataSegsIn = tcp.dataSegsIn + p.DataSegsOut = tcp.dataSegsOut + + p.DeliveryRate = tcp.deliveryRate + + return &p +} + +// Useful offsets +const ( + LastDataSentOffset = unsafe.Offsetof(LinuxTCPInfo{}.lastDataSent) + PmtuOffset = unsafe.Offsetof(LinuxTCPInfo{}.pmtu) +) + +// ParseLinuxTCPInfo maps the rta Value onto a TCPInfo struct. It may have to copy the +// bytes. +func ParseLinuxTCPInfo(rta *syscall.NetlinkRouteAttr) *LinuxTCPInfo { + structSize := (int)(unsafe.Sizeof(LinuxTCPInfo{})) + data := rta.Value + //log.Println(len(rta.Value), "vs", structSize) + if len(rta.Value) < structSize { + // log.Println(len(rta.Value), "vs", structSize) + data = make([]byte, structSize) + copy(data, rta.Value) + } + return (*LinuxTCPInfo)(unsafe.Pointer(&data[0])) +} + +// ParseSockMemInfo maps the rta Value onto a SockMemInfoProto. +// Since this struct is very simple, it can be mapped directly, instead of using an +// intermediate struct. +func ParseSockMemInfo(rta *syscall.NetlinkRouteAttr) *tcpinfo.SocketMemInfoProto { + if len(rta.Value) != 36 { + log.Println(len(rta.Value)) + return nil + } + return (*tcpinfo.SocketMemInfoProto)(unsafe.Pointer(&rta.Value[0])) +} + +// ParseMemInfo maps the rta Value onto a MemInfoProto. +// Since this struct is very simple, it can be mapped directly, instead of using an +// intermediate struct. +func ParseMemInfo(rta *syscall.NetlinkRouteAttr) *tcpinfo.MemInfoProto { + if len(rta.Value) != 16 { + log.Println(len(rta.Value)) + return nil + } + return (*tcpinfo.MemInfoProto)(unsafe.Pointer(&rta.Value[0])) +} diff --git a/nl-proto/tools/convert_test.go b/nl-proto/tools/convert_test.go new file mode 100644 index 0000000..811f03b --- /dev/null +++ b/nl-proto/tools/convert_test.go @@ -0,0 +1,133 @@ +package tools_test + +import ( + "encoding/binary" + "errors" + "io" + "log" + "syscall" + "testing" + + "github.com/m-lab/tcp-info/inetdiag" + tcpinfo "github.com/m-lab/tcp-info/nl-proto" + "github.com/m-lab/tcp-info/nl-proto/tools" + "github.com/m-lab/tcp-info/zstd" + "github.com/vishvananda/netlink/nl" +) + +func init() { + // Always prepend the filename and line number. + log.SetFlags(log.LstdFlags | log.Lshortfile) +} + +// NextMsg reads the next NetlinkMessage from a source readers. +func nextMsg(rdr io.Reader) (*syscall.NetlinkMessage, error) { + var header syscall.NlMsghdr + err := binary.Read(rdr, binary.LittleEndian, &header) + if err != nil { + return nil, err + } + data := make([]byte, header.Len-uint32(binary.Size(header))) + err = binary.Read(rdr, binary.LittleEndian, data) + if err != nil { + return nil, err + } + + return &syscall.NetlinkMessage{Header: header, Data: data}, nil +} + +// Package error messages +var ( + ErrInetDiagParseFailed = errors.New("Error parsing inetdiag message") + ErrLocal = errors.New("Connection is loopback") + ErrUnknownMessageType = errors.New("Unknown netlink message type") +) + +func convertToProto(msg *syscall.NetlinkMessage, t *testing.T) *tcpinfo.TCPDiagnosticsProto { + if msg.Header.Type != 20 { + t.Error("Skipping unknown message type:", msg.Header) + } + idm, attrBytes := inetdiag.ParseInetDiagMsg(msg.Data) + if idm == nil { + t.Error("Couldn't parse InetDiagMsg") + } + srcIP := idm.ID.SrcIP() + if srcIP.IsLoopback() || srcIP.IsLinkLocalUnicast() || srcIP.IsMulticast() || srcIP.IsUnspecified() { + return nil + } + dstIP := idm.ID.DstIP() + if dstIP.IsLoopback() || dstIP.IsLinkLocalUnicast() || dstIP.IsMulticast() || dstIP.IsUnspecified() { + return nil + } + type ParsedMessage struct { + Header syscall.NlMsghdr + InetDiagMsg *inetdiag.InetDiagMsg + Attributes [inetdiag.INET_DIAG_MAX]*syscall.NetlinkRouteAttr + } + + parsedMsg := ParsedMessage{Header: msg.Header, InetDiagMsg: idm} + attrs, err := nl.ParseRouteAttr(attrBytes) + if err != nil { + t.Error(err) + } + for i := range attrs { + parsedMsg.Attributes[attrs[i].Attr.Type] = &attrs[i] + } + return tools.CreateProto(msg.Header, parsedMsg.InetDiagMsg, parsedMsg.Attributes[:]) +} + +func TestReader(t *testing.T) { + // Cache info new 140 err 0 same 277 local 789 diff 3 total 1209 + // 1209 sockets 143 remotes 403 per iteration + source := "testdata/testdata.zst" + log.Println("Reading messages from", source) + rdr := zstd.NewReader(source) + parsed := 0 + src4 := 0 + dst4 := 0 + for { + msg, err := nextMsg(rdr) + if err != nil { + if err == io.EOF { + break + } + t.Fatal(err) + } + + p := convertToProto(msg, t) + if p.InetDiagMsg == nil { + t.Fatal("InetDiagMsg missing") + } + if p.CongestionAlgorithm != "cubic" { + t.Error(p.CongestionAlgorithm, []byte(p.CongestionAlgorithm)) + } + if p.MemInfo == nil { + t.Error("MemInfo missing") + } + if p.TcpInfo == nil { + t.Fatal("TcpInfo missing") + } + if p.TcpInfo.State != p.InetDiagMsg.State { + t.Fatal("State mismatch") + } + if len(p.InetDiagMsg.SockId.Source.Ip) == 4 { + src4++ + } + if len(p.InetDiagMsg.SockId.Destination.Ip) == 4 { + dst4++ + } + + parsed++ + } + + if src4 == 0 { + t.Error("There should be some ipv4 sources") + } + if dst4 == 0 { + t.Error("There should be some ipv4 destinations") + } + // TODO - do some test on the proto } + if parsed != 420 { // 140 new, 277 same, and 3 diff + t.Error(parsed) + } +} diff --git a/nl-proto/tools/testdata/testdata.zst b/nl-proto/tools/testdata/testdata.zst new file mode 100644 index 0000000..b54b684 Binary files /dev/null and b/nl-proto/tools/testdata/testdata.zst differ