diff --git a/probe/endpoint/conntrack.go b/probe/endpoint/conntrack.go index 1f3d876136..d3bc9e6796 100644 --- a/probe/endpoint/conntrack.go +++ b/probe/endpoint/conntrack.go @@ -65,8 +65,14 @@ type conntrack struct { Flows []Flow `xml:"flow"` } +// Conntracker is something that tracks connections. +type Conntracker interface { + WalkFlows(f func(Flow)) + Stop() +} + // Conntracker uses the conntrack command to track network connections -type Conntracker struct { +type conntracker struct { sync.Mutex cmd exec.Cmd activeFlows map[int64]Flow // active flows in state != TIME_WAIT @@ -75,11 +81,11 @@ type Conntracker struct { } // NewConntracker creates and starts a new Conntracter -func NewConntracker(existingConns bool, args ...string) (*Conntracker, error) { +func NewConntracker(existingConns bool, args ...string) (Conntracker, error) { if !ConntrackModulePresent() { return nil, fmt.Errorf("No conntrack module") } - result := &Conntracker{ + result := &conntracker{ activeFlows: map[int64]Flow{}, existingConns: existingConns, } @@ -112,7 +118,7 @@ var ConntrackModulePresent = func() bool { } // NB this is not re-entrant! -func (c *Conntracker) run(args ...string) { +func (c *conntracker) run(args ...string) { if c.existingConns { // Fork another conntrack, just to capture existing connections // for which we don't get events @@ -178,7 +184,7 @@ func (c *Conntracker) run(args ...string) { } } -func (c *Conntracker) existingConnections(args ...string) ([]Flow, error) { +func (c *conntracker) existingConnections(args ...string) ([]Flow, error) { args = append([]string{"-L", "-o", "xml", "-p", "tcp"}, args...) cmd := exec.Command("conntrack", args...) stdout, err := cmd.StdoutPipe() @@ -203,7 +209,7 @@ func (c *Conntracker) existingConnections(args ...string) ([]Flow, error) { } // Stop stop stop -func (c *Conntracker) Stop() { +func (c *conntracker) Stop() { c.Lock() defer c.Unlock() if c.cmd == nil { @@ -215,7 +221,7 @@ func (c *Conntracker) Stop() { } } -func (c *Conntracker) handleFlow(f Flow, forceAdd bool) { +func (c *conntracker) handleFlow(f Flow, forceAdd bool) { // A flow consists of 3 'metas' - the 'original' 4 tuple (as seen by this // host) and the 'reply' 4 tuple, which is what it has been rewritten to. // This code finds those metas, which are identified by a Direction @@ -260,7 +266,7 @@ func (c *Conntracker) handleFlow(f Flow, forceAdd bool) { // WalkFlows calls f with all active flows and flows that have come and gone // since the last call to WalkFlows -func (c *Conntracker) WalkFlows(f func(Flow)) { +func (c *conntracker) WalkFlows(f func(Flow)) { c.Lock() defer c.Unlock() for _, flow := range c.activeFlows { diff --git a/probe/endpoint/conntrack_test.go b/probe/endpoint/conntrack_test.go index 6b5b03f094..16b8f8e6eb 100644 --- a/probe/endpoint/conntrack_test.go +++ b/probe/endpoint/conntrack_test.go @@ -13,54 +13,62 @@ import ( testExec "github.com/weaveworks/scope/test/exec" ) -func makeFlow(id int64, srcIP, dstIP string, srcPort, dstPort int, ty, state string) Flow { +func makeFlow(ty string) Flow { return Flow{ XMLName: xml.Name{ Local: "flow", }, Type: ty, - Metas: []Meta{ - { - XMLName: xml.Name{ - Local: "meta", - }, - Direction: "original", - Layer3: Layer3{ - XMLName: xml.Name{ - Local: "layer3", - }, - SrcIP: srcIP, - DstIP: dstIP, - }, - Layer4: Layer4{ - XMLName: xml.Name{ - Local: "layer4", - }, - SrcPort: srcPort, - DstPort: dstPort, - Proto: TCP, - }, + } +} + +func addMeta(f *Flow, dir, srcIP, dstIP string, srcPort, dstPort int) *Meta { + meta := Meta{ + XMLName: xml.Name{ + Local: "meta", + }, + Direction: dir, + Layer3: Layer3{ + XMLName: xml.Name{ + Local: "layer3", + }, + SrcIP: srcIP, + DstIP: dstIP, + }, + Layer4: Layer4{ + XMLName: xml.Name{ + Local: "layer4", }, - { - XMLName: xml.Name{ - Local: "meta", - }, - Direction: "independent", - ID: id, - State: state, - Layer3: Layer3{ - XMLName: xml.Name{ - Local: "layer3", - }, - }, - Layer4: Layer4{ - XMLName: xml.Name{ - Local: "layer4", - }, - }, + SrcPort: srcPort, + DstPort: dstPort, + Proto: TCP, + }, + } + f.Metas = append(f.Metas, meta) + return &meta +} + +func addIndependant(f *Flow, id int64, state string) *Meta { + meta := Meta{ + XMLName: xml.Name{ + Local: "meta", + }, + Direction: "independent", + ID: id, + State: state, + Layer3: Layer3{ + XMLName: xml.Name{ + Local: "layer3", + }, + }, + Layer4: Layer4{ + XMLName: xml.Name{ + Local: "layer4", }, }, } + f.Metas = append(f.Metas, meta) + return &meta } func TestConntracker(t *testing.T) { @@ -121,7 +129,9 @@ func TestConntracker(t *testing.T) { } } - flow1 := makeFlow(1, "1.2.3.4", "2.3.4.5", 2, 3, New, "") + flow1 := makeFlow(New) + addMeta(&flow1, "original", "1.2.3.4", "2.3.4.5", 2, 3) + addIndependant(&flow1, 1, "") writeFlow(flow1) test.Poll(t, ts, []Flow{flow1}, have) diff --git a/probe/endpoint/nat.go b/probe/endpoint/nat.go index d3531f4388..fe99271559 100644 --- a/probe/endpoint/nat.go +++ b/probe/endpoint/nat.go @@ -16,16 +16,14 @@ type endpointMapping struct { rewrittenPort int } -type natmapper struct { - *Conntracker +// NATMapper rewrites a report to deal with NAT's connections +type NATMapper struct { + Conntracker } -func newNATMapper() (*natmapper, error) { - ct, err := NewConntracker(true, "--any-nat") - if err != nil { - return nil, err - } - return &natmapper{ct}, nil +// NewNATMapper is exposed for testing +func NewNATMapper(ct Conntracker) NATMapper { + return NATMapper{ct} } func toMapping(f Flow) *endpointMapping { @@ -49,9 +47,9 @@ func toMapping(f Flow) *endpointMapping { return &mapping } -// applyNAT duplicates Nodes in the endpoint topology of a +// ApplyNAT duplicates Nodes in the endpoint topology of a // report, based on the NAT table as returns by natTable. -func (n *natmapper) applyNAT(rpt report.Report, scope string) { +func (n NATMapper) ApplyNAT(rpt report.Report, scope string) { n.WalkFlows(func(f Flow) { var ( mapping = toMapping(f) diff --git a/probe/endpoint/nat_test.go b/probe/endpoint/nat_test.go new file mode 100644 index 0000000000..65b2c16ea4 --- /dev/null +++ b/probe/endpoint/nat_test.go @@ -0,0 +1,97 @@ +package endpoint_test + +import ( + "reflect" + "testing" + + "github.com/weaveworks/scope/probe/endpoint" + "github.com/weaveworks/scope/report" + "github.com/weaveworks/scope/test" +) + +type mockConntracker struct { + flows []endpoint.Flow +} + +func (m *mockConntracker) WalkFlows(f func(endpoint.Flow)) { + for _, flow := range m.flows { + f(flow) + } +} + +func (m *mockConntracker) Stop() {} + +func TestNat(t *testing.T) { + // test that two containers, on the docker network, get their connections mapped + // correctly. + // the setup is this: + // + // container2 (10.0.47.2:222222), host2 (2.3.4.5:22223) -> + // host1 (1.2.3.4:80), container1 (10.0.47.2:80) + + // from the PoV of host1 + { + flow := makeFlow("") + addIndependant(&flow, 1, "") + flow.Original = addMeta(&flow, "original", "2.3.4.5", "1.2.3.4", 222222, 80) + flow.Reply = addMeta(&flow, "reply", "10.0.47.1", "2.3.4.5", 80, 222222) + ct := &mockConntracker{ + flows: []endpoint.Flow{flow}, + } + + have := report.MakeReport() + originalID := report.MakeEndpointNodeID("host1", "10.0.47.1", "80") + have.Endpoint.AddNode(originalID, report.MakeNodeWith(report.Metadata{ + endpoint.Addr: "10.0.47.1", + endpoint.Port: "80", + "foo": "bar", + })) + + want := have.Copy() + want.Endpoint.AddNode(report.MakeEndpointNodeID("host1", "1.2.3.4", "80"), report.MakeNodeWith(report.Metadata{ + endpoint.Addr: "1.2.3.4", + endpoint.Port: "80", + "copy_of": originalID, + "foo": "bar", + })) + + natmapper := endpoint.NewNATMapper(ct) + natmapper.ApplyNAT(have, "host1") + if !reflect.DeepEqual(want, have) { + t.Fatal(test.Diff(want, have)) + } + } + + // form the PoV of host2 + { + flow := makeFlow("") + addIndependant(&flow, 2, "") + flow.Original = addMeta(&flow, "original", "10.0.47.2", "1.2.3.4", 22222, 80) + flow.Reply = addMeta(&flow, "reply", "1.2.3.4", "2.3.4.5", 80, 22223) + ct := &mockConntracker{ + flows: []endpoint.Flow{flow}, + } + + have := report.MakeReport() + originalID := report.MakeEndpointNodeID("host2", "10.0.47.2", "22222") + have.Endpoint.AddNode(originalID, report.MakeNodeWith(report.Metadata{ + endpoint.Addr: "10.0.47.2", + endpoint.Port: "22222", + "foo": "baz", + })) + + want := have.Copy() + want.Endpoint.AddNode(report.MakeEndpointNodeID("host2", "2.3.4.5", "22223"), report.MakeNodeWith(report.Metadata{ + endpoint.Addr: "2.3.4.5", + endpoint.Port: "22223", + "copy_of": originalID, + "foo": "baz", + })) + + natmapper := endpoint.NewNATMapper(ct) + natmapper.ApplyNAT(have, "host1") + if !reflect.DeepEqual(want, have) { + t.Fatal(test.Diff(want, have)) + } + } +} diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index b95e40934e..b5b67d7fb8 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -26,8 +26,8 @@ type Reporter struct { hostName string includeProcesses bool includeNAT bool - conntracker *Conntracker - natmapper *natmapper + conntracker Conntracker + natmapper *NATMapper revResolver *ReverseResolver } @@ -51,8 +51,8 @@ var SpyDuration = prometheus.NewSummaryVec( func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool) *Reporter { var ( conntrackModulePresent = ConntrackModulePresent() - conntracker *Conntracker - natmapper *natmapper + conntracker Conntracker + natmapper NATMapper err error ) if conntrackModulePresent && useConntrack { @@ -62,17 +62,18 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo } } if conntrackModulePresent { - natmapper, err = newNATMapper() + ct, err := NewConntracker(true, "--any-nat") if err != nil { - log.Printf("Failed to start natMapper: %v", err) + log.Printf("Failed to start conntracker for natmapper: %v", err) } + natmapper = NewNATMapper(ct) } return &Reporter{ hostID: hostID, hostName: hostName, includeProcesses: includeProcesses, conntracker: conntracker, - natmapper: natmapper, + natmapper: &natmapper, revResolver: NewReverseResolver(), } } @@ -139,7 +140,7 @@ func (r *Reporter) Report() (report.Report, error) { } if r.natmapper != nil { - r.natmapper.applyNAT(rpt, r.hostID) + r.natmapper.ApplyNAT(rpt, r.hostID) } return rpt, nil @@ -165,7 +166,7 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin // In case we have a reverse resolution for the IP, we can use it for // the name... if revRemoteName, err := r.revResolver.Get(remoteAddr); err == nil { - remoteNode = remoteNode.AddMetadata(map[string]string{ + remoteNode = remoteNode.WithMetadata(map[string]string{ "name": revRemoteName, }) } @@ -211,7 +212,7 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin // In case we have a reverse resolution for the IP, we can use it for // the name... if revRemoteName, err := r.revResolver.Get(remoteAddr); err == nil { - remoteNode = remoteNode.AddMetadata(map[string]string{ + remoteNode = remoteNode.WithMetadata(map[string]string{ "name": revRemoteName, }) } diff --git a/render/expected/expected.go b/render/expected/expected.go index 74146e17bd..863a38c04b 100644 --- a/render/expected/expected.go +++ b/render/expected/expected.go @@ -13,12 +13,12 @@ var ( uncontainedServerID = render.MakePseudoNodeID(render.UncontainedID, test.ServerHostName) unknownPseudoNode1ID = render.MakePseudoNodeID("10.10.10.10", test.ServerIP, "80") unknownPseudoNode2ID = render.MakePseudoNodeID("10.10.10.11", test.ServerIP, "80") - unknownPseudoNode1 = func(adjacency report.IDList) render.RenderableNode { + unknownPseudoNode1 = func(adjacent string) render.RenderableNode { return render.RenderableNode{ ID: unknownPseudoNode1ID, LabelMajor: "10.10.10.10", Pseudo: true, - Node: report.MakeNode().WithAdjacency(adjacency), + Node: report.MakeNode().WithAdjacent(adjacent), EdgeMetadata: report.EdgeMetadata{ EgressPacketCount: newu64(70), EgressByteCount: newu64(700), @@ -29,12 +29,12 @@ var ( ), } } - unknownPseudoNode2 = func(adjacency report.IDList) render.RenderableNode { + unknownPseudoNode2 = func(adjacent string) render.RenderableNode { return render.RenderableNode{ ID: unknownPseudoNode2ID, LabelMajor: "10.10.10.11", Pseudo: true, - Node: report.MakeNode().WithAdjacency(adjacency), + Node: report.MakeNode().WithAdjacent(adjacent), EdgeMetadata: report.EdgeMetadata{ EgressPacketCount: newu64(50), EgressByteCount: newu64(500), @@ -44,12 +44,12 @@ var ( ), } } - theInternetNode = func(adjacency report.IDList) render.RenderableNode { + theInternetNode = func(adjacent string) render.RenderableNode { return render.RenderableNode{ ID: render.TheInternetID, LabelMajor: render.TheInternetMajor, Pseudo: true, - Node: report.MakeNode().WithAdjacency(adjacency), + Node: report.MakeNode().WithAdjacent(adjacent), EdgeMetadata: report.EdgeMetadata{ EgressPacketCount: newu64(60), EgressByteCount: newu64(600), @@ -131,9 +131,9 @@ var ( Node: report.MakeNode().WithAdjacent(render.TheInternetID), EdgeMetadata: report.EdgeMetadata{}, }, - unknownPseudoNode1ID: unknownPseudoNode1(report.MakeIDList(ServerProcessID)), - unknownPseudoNode2ID: unknownPseudoNode2(report.MakeIDList(ServerProcessID)), - render.TheInternetID: theInternetNode(report.MakeIDList(ServerProcessID)), + unknownPseudoNode1ID: unknownPseudoNode1(ServerProcessID), + unknownPseudoNode2ID: unknownPseudoNode2(ServerProcessID), + render.TheInternetID: theInternetNode(ServerProcessID), }).Prune() RenderedProcessNames = (render.RenderableNodes{ @@ -187,9 +187,9 @@ var ( Node: report.MakeNode().WithAdjacent(render.TheInternetID), EdgeMetadata: report.EdgeMetadata{}, }, - unknownPseudoNode1ID: unknownPseudoNode1(report.MakeIDList("apache")), - unknownPseudoNode2ID: unknownPseudoNode2(report.MakeIDList("apache")), - render.TheInternetID: theInternetNode(report.MakeIDList("apache")), + unknownPseudoNode1ID: unknownPseudoNode1("apache"), + unknownPseudoNode2ID: unknownPseudoNode2("apache"), + render.TheInternetID: theInternetNode("apache"), }).Prune() RenderedContainers = (render.RenderableNodes{ @@ -247,7 +247,7 @@ var ( Node: report.MakeNode().WithAdjacent(render.TheInternetID), EdgeMetadata: report.EdgeMetadata{}, }, - render.TheInternetID: theInternetNode(report.MakeIDList(test.ServerContainerID)), + render.TheInternetID: theInternetNode(test.ServerContainerID), }).Prune() RenderedContainerImages = (render.RenderableNodes{ @@ -304,7 +304,7 @@ var ( Node: report.MakeNode().WithAdjacent(render.TheInternetID), EdgeMetadata: report.EdgeMetadata{}, }, - render.TheInternetID: theInternetNode(report.MakeIDList(test.ServerContainerImageName)), + render.TheInternetID: theInternetNode(test.ServerContainerImageName), }).Prune() ServerHostRenderedID = render.MakeHostID(test.ServerHostID) diff --git a/render/renderable_node_test.go b/render/renderable_node_test.go index 2401a3a732..8faf26b418 100644 --- a/render/renderable_node_test.go +++ b/render/renderable_node_test.go @@ -54,7 +54,7 @@ func TestMergeRenderableNode(t *testing.T) { LabelMinor: "minor", Rank: "rank", Pseudo: false, - Node: report.MakeNode().WithAdjacency(report.MakeIDList("a1", "a2")), + Node: report.MakeNode().WithAdjacent("a1").WithAdjacent("a2"), Origins: report.MakeIDList("o1", "o2"), EdgeMetadata: report.EdgeMetadata{}, } diff --git a/report/merge_test.go b/report/merge_test.go index fe24d36354..a97585887c 100644 --- a/report/merge_test.go +++ b/report/merge_test.go @@ -216,6 +216,30 @@ func TestMergeNodes(t *testing.T) { }), }, }, + "Counters": { + a: report.Nodes{ + "1": report.MakeNode().WithCounters(map[string]int{ + "a": 13, + "b": 57, + "c": 89, + }), + }, + b: report.Nodes{ + "1": report.MakeNode().WithCounters(map[string]int{ + "a": 78, + "b": 3, + "d": 47, + }), + }, + want: report.Nodes{ + "1": report.MakeNode().WithCounters(map[string]int{ + "a": 91, + "b": 60, + "c": 89, + "d": 47, + }), + }, + }, } { if have := c.a.Merge(c.b); !reflect.DeepEqual(c.want, have) { t.Errorf("%s: want\n\t%#v, have\n\t%#v", name, c.want, have) diff --git a/report/report_test.go b/report/report_test.go index 1e83b96b65..96be84e29b 100644 --- a/report/report_test.go +++ b/report/report_test.go @@ -25,3 +25,39 @@ func TestReportTopologies(t *testing.T) { t.Errorf("want %d, have %d", want, have) } } + +func TestNode(t *testing.T) { + { + node := report.MakeNode().WithMetadata(report.Metadata{ + "foo": "bar", + }) + if node.Metadata["foo"] != "bar" { + t.Errorf("want foo, have %s", node.Metadata["foo"]) + } + } + { + node := report.MakeNode().WithCounters(report.Counters{ + "foo": 1, + }) + if node.Counters["foo"] != 1 { + t.Errorf("want foo, have %d", node.Counters["foo"]) + } + } + { + node := report.MakeNode().WithAdjacent("foo") + if node.Adjacency[0] != "foo" { + t.Errorf("want foo, have %v", node.Adjacency) + } + } + { + node := report.MakeNode().WithEdge("foo", report.EdgeMetadata{ + EgressPacketCount: newu64(13), + }) + if node.Adjacency[0] != "foo" { + t.Errorf("want foo, have %v", node.Adjacency) + } + if *node.Edges["foo"].EgressPacketCount != 13 { + t.Errorf("want 13, have %v", node.Edges) + } + } +} diff --git a/report/topology.go b/report/topology.go index 01c7e77127..9356a986f8 100644 --- a/report/topology.go +++ b/report/topology.go @@ -106,24 +106,10 @@ func (n Node) WithMetadata(m map[string]string) Node { return result } -// AddMetadata returns a fresh copy of n, with Metadata set to the merge of n -// and the metadata provided. -func (n Node) AddMetadata(m map[string]string) Node { - additional := MakeNodeWith(m) - return n.Merge(additional) -} - -// WithCounters returns a fresh copy of n, with Counters set to c. +// WithCounters returns a fresh copy of n, with Counters c merged in. func (n Node) WithCounters(c map[string]int) Node { result := n.Copy() - result.Counters = c - return result -} - -// WithAdjacency returns a fresh copy of n, with Adjacency set to a. -func (n Node) WithAdjacency(a IDList) Node { - result := n.Copy() - result.Adjacency = a + result.Counters = result.Counters.Merge(c) return result } diff --git a/test/report_fixture.go b/test/report_fixture.go index 986643ed55..5a9de4b636 100644 --- a/test/report_fixture.go +++ b/test/report_fixture.go @@ -253,19 +253,19 @@ var ( UnknownAddress1NodeID: report.MakeNode().WithMetadata(map[string]string{ endpoint.Addr: UnknownClient1IP, - }).WithAdjacency(report.MakeIDList(ServerAddressNodeID)), + }).WithAdjacent(ServerAddressNodeID), UnknownAddress2NodeID: report.MakeNode().WithMetadata(map[string]string{ endpoint.Addr: UnknownClient2IP, - }).WithAdjacency(report.MakeIDList(ServerAddressNodeID)), + }).WithAdjacent(ServerAddressNodeID), UnknownAddress3NodeID: report.MakeNode().WithMetadata(map[string]string{ endpoint.Addr: UnknownClient3IP, - }).WithAdjacency(report.MakeIDList(ServerAddressNodeID)), + }).WithAdjacent(ServerAddressNodeID), RandomAddressNodeID: report.MakeNode().WithMetadata(map[string]string{ endpoint.Addr: RandomClientIP, - }).WithAdjacency(report.MakeIDList(ServerAddressNodeID)), + }).WithAdjacent(ServerAddressNodeID), }, }, Host: report.Topology{