Skip to content

Commit

Permalink
Revert to correct edge construction
Browse files Browse the repository at this point in the history
Another implicit invariant in the data model is that edges are always of the
form (local -> remote). That is, the source of an edge must always be a node
that originates from within Scope's domain of visibility. This was evident by
the presence of ingress and egress fields in edge/aggregate metadata.

When building the sniffer, I accidentally and incorrectly violated this
invariant, by constructing distinct edges for (local -> remote) and (remote ->
local), and collapsing ingress and egress byte counts to a single scalar. I
experienced a variety of subtle undefined behavior as a result. See #339.

This change reverts to the old, correct methodology. Consequently the sniffer
needs to be able to find out which side of the sniffed packet is local v.
remote, and to do that it needs access to local networks. I moved the
discovery from the probe/host package into probe/main.go.

As part of that work I discovered that package report also maintains its own,
independent "cache" of local networks. Except it contains only the (optional)
Docker bridge network, if it's been populated by the probe, and it's only used
by the report.Make{Endpoint,Address}NodeID constructors to scope local
addresses. Normally, scoping happens during rendering, and only for pseudo
nodes -- see current LeafMap Render localNetworks. This is pretty convoluted
and should be either be made consistent or heavily commented.
  • Loading branch information
peterbourgon committed Jul 31, 2015
1 parent f66bcfc commit 14fbc26
Show file tree
Hide file tree
Showing 16 changed files with 223 additions and 153 deletions.
4 changes: 2 additions & 2 deletions app/api_topology_test.go
Expand Up @@ -96,8 +96,8 @@ func TestAPITopologyApplications(t *testing.T) {
t.Fatalf("JSON parse error: %s", err)
}
if want, have := (report.EdgeMetadata{
PacketCount: newu64(100),
ByteCount: newu64(10),
PacketCount: newu64(10),
EgressByteCount: newu64(100),
}), edge.Metadata; !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}
Expand Down
33 changes: 13 additions & 20 deletions probe/host/reporter.go
@@ -1,15 +1,14 @@
package host

import (
"net"
"runtime"
"strings"
"time"

"github.com/weaveworks/scope/report"
)

// Keys for use in NodeMetadata
// Keys for use in NodeMetadata.
const (
Timestamp = "ts"
HostName = "host_name"
Expand All @@ -20,30 +19,31 @@ const (
Uptime = "uptime"
)

// Exposed for testing
// Exposed for testing.
const (
ProcUptime = "/proc/uptime"
ProcLoad = "/proc/loadavg"
)

// Exposed for testing
// Exposed for testing.
var (
InterfaceAddrs = net.InterfaceAddrs
Now = func() string { return time.Now().UTC().Format(time.RFC3339Nano) }
Now = func() string { return time.Now().UTC().Format(time.RFC3339Nano) }
)

// Reporter generates Reports containing the host topology.
type Reporter struct {
hostID string
hostName string
hostID string
hostName string
localNets report.Networks
}

// NewReporter returns a Reporter which produces a report containing host
// topology for this host.
func NewReporter(hostID, hostName string) *Reporter {
func NewReporter(hostID, hostName string, localNets report.Networks) *Reporter {
return &Reporter{
hostID: hostID,
hostName: hostName,
hostID: hostID,
hostName: hostName,
localNets: localNets,
}
}

Expand All @@ -54,15 +54,8 @@ func (r *Reporter) Report() (report.Report, error) {
localCIDRs []string
)

localNets, err := InterfaceAddrs()
if err != nil {
return rep, err
}
for _, localNet := range localNets {
// Not all networks are IP networks.
if ipNet, ok := localNet.(*net.IPNet); ok {
localCIDRs = append(localCIDRs, ipNet.String())
}
for _, localNet := range r.localNets {
localCIDRs = append(localCIDRs, localNet.String())
}

uptime, err := GetUptime()
Expand Down
32 changes: 15 additions & 17 deletions probe/host/reporter_test.go
Expand Up @@ -12,38 +12,37 @@ import (
"github.com/weaveworks/scope/test"
)

const (
release = "release"
version = "version"
network = "192.168.0.0/16"
hostID = "hostid"
now = "now"
hostname = "hostname"
load = "0.59 0.36 0.29"
uptime = "278h55m43s"
kernel = "release version"
)

func TestReporter(t *testing.T) {
var (
release = "release"
version = "version"
network = "192.168.0.0/16"
hostID = "hostid"
now = "now"
hostname = "hostname"
load = "0.59 0.36 0.29"
uptime = "278h55m43s"
kernel = "release version"
_, ipnet, _ = net.ParseCIDR(network)
localNets = report.Networks{ipnet}
)

var (
oldGetKernelVersion = host.GetKernelVersion
oldGetLoad = host.GetLoad
oldGetUptime = host.GetUptime
oldInterfaceAddrs = host.InterfaceAddrs
oldNow = host.Now
)
defer func() {
host.GetKernelVersion = oldGetKernelVersion
host.GetLoad = oldGetLoad
host.GetUptime = oldGetUptime
host.InterfaceAddrs = oldInterfaceAddrs
host.Now = oldNow
}()
host.GetKernelVersion = func() (string, error) { return release + " " + version, nil }
host.GetLoad = func() string { return load }
host.GetUptime = func() (time.Duration, error) { return time.ParseDuration(uptime) }
host.Now = func() string { return now }
host.InterfaceAddrs = func() ([]net.Addr, error) { _, ipnet, _ := net.ParseCIDR(network); return []net.Addr{ipnet}, nil }

want := report.MakeReport()
want.Host.NodeMetadatas[report.MakeHostNodeID(hostID)] = report.MakeNodeMetadataWith(map[string]string{
Expand All @@ -55,8 +54,7 @@ func TestReporter(t *testing.T) {
host.Uptime: uptime,
host.KernelVersion: kernel,
})
r := host.NewReporter(hostID, hostname)
have, _ := r.Report()
have, _ := host.NewReporter(hostID, hostname, localNets).Report()
if !reflect.DeepEqual(want, have) {
t.Errorf("%s", test.Diff(want, have))
}
Expand Down
16 changes: 14 additions & 2 deletions probe/main.go
Expand Up @@ -79,11 +79,23 @@ func main() {
}
defer publisher.Close()

addrs, err := net.InterfaceAddrs()
if err != nil {
log.Fatal(err)
}
localNets := report.Networks{}
for _, addr := range addrs {
// Not all addrs are IPNets.
if ipNet, ok := addr.(*net.IPNet); ok {
localNets = append(localNets, ipNet)
}
}

var (
hostName = hostname()
hostID = hostName // TODO: we should sanitize the hostname
taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)}
reporters = []Reporter{host.NewReporter(hostID, hostName), endpoint.NewReporter(hostID, hostName, *spyProcs)}
reporters = []Reporter{host.NewReporter(hostID, hostName, localNets), endpoint.NewReporter(hostID, hostName, *spyProcs)}
processCache *process.CachingWalker
)

Expand Down Expand Up @@ -122,7 +134,7 @@ func main() {
continue
}
log.Printf("capturing packets on %s", iface)
reporters = append(reporters, sniff.New(hostID, source, *captureOn, *captureOff))
reporters = append(reporters, sniff.New(hostID, localNets, source, *captureOn, *captureOff))
}
}

Expand Down
124 changes: 90 additions & 34 deletions probe/sniff/sniffer.go
Expand Up @@ -3,6 +3,7 @@ package sniff
import (
"io"
"log"
"net"
"strconv"
"sync/atomic"
"time"
Expand All @@ -15,27 +16,29 @@ import (

// Sniffer is a packet-sniffing reporter.
type Sniffer struct {
hostID string
reports chan chan report.Report
parser *gopacket.DecodingLayerParser
decoded []gopacket.LayerType
eth layers.Ethernet
ip4 layers.IPv4
ip6 layers.IPv6
tcp layers.TCP
udp layers.UDP
icmp4 layers.ICMPv4
icmp6 layers.ICMPv6
hostID string
localNets report.Networks
reports chan chan report.Report
parser *gopacket.DecodingLayerParser
decoded []gopacket.LayerType
eth layers.Ethernet
ip4 layers.IPv4
ip6 layers.IPv6
tcp layers.TCP
udp layers.UDP
icmp4 layers.ICMPv4
icmp6 layers.ICMPv6
}

// New returns a new sniffing reporter that samples traffic by turning its
// packet capture facilities on and off. Note that the on and off durations
// represent a way to bound CPU burn. Effective sample rate needs to be
// calculated as (packets decoded / packets observed).
func New(hostID string, src gopacket.ZeroCopyPacketDataSource, on, off time.Duration) *Sniffer {
func New(hostID string, localNets report.Networks, src gopacket.ZeroCopyPacketDataSource, on, off time.Duration) *Sniffer {
s := &Sniffer{
hostID: hostID,
reports: make(chan chan report.Report),
hostID: hostID,
localNets: localNets,
reports: make(chan chan report.Report),
}
s.parser = gopacket.NewDecodingLayerParser(
layers.LayerTypeEthernet,
Expand Down Expand Up @@ -119,8 +122,11 @@ func interpolateCounts(r report.Report) {
if emd.PacketCount != nil {
*emd.PacketCount = uint64(float64(*emd.PacketCount) * factor)
}
if emd.ByteCount != nil {
*emd.ByteCount = uint64(float64(*emd.ByteCount) * factor)
if emd.EgressByteCount != nil {
*emd.EgressByteCount = uint64(float64(*emd.EgressByteCount) * factor)
}
if emd.IngressByteCount != nil {
*emd.IngressByteCount = uint64(float64(*emd.IngressByteCount) * factor)
}
}
}
Expand Down Expand Up @@ -204,54 +210,104 @@ func (s *Sniffer) read(src gopacket.ZeroCopyPacketDataSource, dst chan Packet, p
}

// Merge puts the packet into the report.
//
// Note that, for the moment, we encode bidirectional traffic as ingress and
// egress traffic on a single edge whose src is local and dst is remote. That
// is, if we see a packet from the remote addr 9.8.7.6 to the local addr
// 1.2.3.4, we apply it as *ingress* on the edge (1.2.3.4 -> 9.8.7.6).
func (s *Sniffer) Merge(p Packet, rpt report.Report) {
// With a src and dst IP, we can add to the address topology.
if p.SrcIP != "" && p.DstIP != "" {
if p.SrcIP == "" || p.DstIP == "" {
return
}

// One end of the traffic has to be local. Otherwise, we don't know how to
// construct the edge.
//
// If we need to get around this limitation, we may be able to change the
// semantics of the report, and allow the src side of edges to be from
// anywhere. But that will have ramifications throughout Scope (read: it
// may violate implicit invariants) and needs to be thought through.
var (
srcLocal = s.localNets.Contains(net.ParseIP(p.SrcIP))
dstLocal = s.localNets.Contains(net.ParseIP(p.DstIP))
localIP string
remoteIP string
egress bool
)
switch {
case srcLocal && !dstLocal:
localIP, remoteIP, egress = p.SrcIP, p.DstIP, true
case !srcLocal && dstLocal:
localIP, remoteIP, egress = p.DstIP, p.SrcIP, false
case srcLocal && dstLocal:
localIP, remoteIP, egress = p.SrcIP, p.DstIP, true // loopback
case !srcLocal && !dstLocal:
log.Printf("sniffer ignoring remote-to-remote (%s -> %s) traffic", p.SrcIP, p.DstIP)
return
}

// For sure, we can add to the address topology.
{
var (
srcNodeID = report.MakeAddressNodeID(s.hostID, p.SrcIP)
dstNodeID = report.MakeAddressNodeID(s.hostID, p.DstIP)
srcNodeID = report.MakeAddressNodeID(s.hostID, localIP)
dstNodeID = report.MakeAddressNodeID(s.hostID, remoteIP)
edgeID = report.MakeEdgeID(srcNodeID, dstNodeID)
srcAdjacencyID = report.MakeAdjacencyID(srcNodeID)
)

rpt.Address.NodeMetadatas[srcNodeID] = report.MakeNodeMetadata()
rpt.Address.NodeMetadatas[dstNodeID] = report.MakeNodeMetadata()

emd := rpt.Address.EdgeMetadatas[edgeID]
if emd.PacketCount == nil {
emd.PacketCount = new(uint64)
}
*emd.PacketCount++
if emd.ByteCount == nil {
emd.ByteCount = new(uint64)

if egress {
if emd.EgressByteCount == nil {
emd.EgressByteCount = new(uint64)
}
*emd.EgressByteCount += uint64(p.Network)
} else {
if emd.IngressByteCount == nil {
emd.IngressByteCount = new(uint64)
}
*emd.IngressByteCount += uint64(p.Network)
}
*emd.ByteCount += uint64(p.Network)
rpt.Address.EdgeMetadatas[edgeID] = emd

rpt.Address.EdgeMetadatas[edgeID] = emd
rpt.Address.Adjacency[srcAdjacencyID] = rpt.Address.Adjacency[srcAdjacencyID].Add(dstNodeID)
}

// With a src and dst IP and port, we can add to the endpoints.
if p.SrcIP != "" && p.DstIP != "" && p.SrcPort != "" && p.DstPort != "" {
// If we have ports, we can add to the endpoint topology, too.
if p.SrcPort != "" && p.DstPort != "" {
var (
srcNodeID = report.MakeEndpointNodeID(s.hostID, p.SrcIP, p.SrcPort)
dstNodeID = report.MakeEndpointNodeID(s.hostID, p.DstIP, p.DstPort)
srcNodeID = report.MakeEndpointNodeID(s.hostID, localIP, p.SrcPort)
dstNodeID = report.MakeEndpointNodeID(s.hostID, remoteIP, p.DstPort)
edgeID = report.MakeEdgeID(srcNodeID, dstNodeID)
srcAdjacencyID = report.MakeAdjacencyID(srcNodeID)
)
rpt.Endpoint.NodeMetadatas[srcNodeID] = report.MakeNodeMetadata()
rpt.Endpoint.NodeMetadatas[dstNodeID] = report.MakeNodeMetadata()

emd := rpt.Endpoint.EdgeMetadatas[edgeID]
if emd.PacketCount == nil {
emd.PacketCount = new(uint64)
}
*emd.PacketCount++
if emd.ByteCount == nil {
emd.ByteCount = new(uint64)

if egress {
if emd.EgressByteCount == nil {
emd.EgressByteCount = new(uint64)
}
*emd.EgressByteCount += uint64(p.Transport)
} else {
if emd.IngressByteCount == nil {
emd.IngressByteCount = new(uint64)
}
*emd.IngressByteCount += uint64(p.Transport)
}
*emd.ByteCount += uint64(p.Transport)
rpt.Endpoint.EdgeMetadatas[edgeID] = emd

rpt.Endpoint.EdgeMetadatas[edgeID] = emd
rpt.Endpoint.Adjacency[srcAdjacencyID] = rpt.Endpoint.Adjacency[srcAdjacencyID].Add(dstNodeID)
}
}
10 changes: 7 additions & 3 deletions probe/sniff/sniffer_internal_test.go
Expand Up @@ -22,8 +22,9 @@ func TestInterpolateCounts(t *testing.T) {
r.Sampling.Count = samplingCount
r.Sampling.Total = samplingTotal
r.Endpoint.EdgeMetadatas[edgeID] = report.EdgeMetadata{
PacketCount: newu64(packetCount),
ByteCount: newu64(byteCount),
PacketCount: newu64(packetCount),
IngressByteCount: newu64(byteCount),
EgressByteCount: newu64(byteCount),
}

interpolateCounts(r)
Expand All @@ -37,7 +38,10 @@ func TestInterpolateCounts(t *testing.T) {
if want, have := apply(packetCount), (*emd.PacketCount); want != have {
t.Errorf("want %d packets, have %d", want, have)
}
if want, have := apply(byteCount), (*emd.ByteCount); want != have {
if want, have := apply(byteCount), (*emd.EgressByteCount); want != have {
t.Errorf("want %d bytes, have %d", want, have)
}
if want, have := apply(byteCount), (*emd.IngressByteCount); want != have {
t.Errorf("want %d bytes, have %d", want, have)
}
}
Expand Down

0 comments on commit 14fbc26

Please sign in to comment.