Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
830 changes: 830 additions & 0 deletions common/ip_resolver.go

Large diffs are not rendered by default.

895 changes: 895 additions & 0 deletions common/ip_resolver_test.go

Large diffs are not rendered by default.

92 changes: 65 additions & 27 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,23 @@ func (p *LogParser) Stop() {
}

type AddrPair struct {
src netaddr.IPPort
dst netaddr.IPPort
src netaddr.IPPort
dst netaddr.IPPort
srcWorkload common.Workload
dstWorkload common.Workload
actualDestWorkload common.Workload
}

type ActiveConnection struct {
Dest netaddr.IPPort
ActualDest netaddr.IPPort
Pid uint32
Fd uint64
Timestamp uint64
Closed time.Time
Dest netaddr.IPPort
ActualDest netaddr.IPPort
srcWorkload common.Workload
dstWorkload common.Workload
actialDestWorkload common.Workload
Pid uint32
Fd uint64
Timestamp uint64
Closed time.Time

http2Parser *l7.Http2Parser
postgresParser *l7.PostgresParser
Expand Down Expand Up @@ -129,10 +135,11 @@ type Container struct {

lock sync.RWMutex

done chan struct{}
done chan struct{}
ip_resolver IPResolver
}

func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32) (*Container, error) {
func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, ip_resolver IPResolver) (*Container, error) {
netNs, err := proc.GetNetNs(pid)
if err != nil {
return nil, err
Expand Down Expand Up @@ -164,7 +171,8 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host

hostConntrack: hostConntrack,

done: make(chan struct{}),
done: make(chan struct{}),
ip_resolver: ip_resolver,
}

for _, n := range md.networks {
Expand Down Expand Up @@ -286,29 +294,49 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
}

for d, count := range c.connectsSuccessful {
ch <- counter(metrics.NetConnectsSuccessful, float64(count), d.src.String(), d.dst.String())
workload_src := d.srcWorkload
workload_dest := d.dstWorkload
actualDestWorkload := d.actualDestWorkload
if d.srcWorkload.Name == "" || len(d.srcWorkload.Name) == 0 {
workload_src = c.ip_resolver.ResolveIP(d.src.IP().String())
workload_dest = c.ip_resolver.ResolveIP(d.dst.IP().String())
}
ch <- counter(metrics.NetConnectsSuccessful, float64(count), d.src.String(), d.dst.String(), workload_src.Name, workload_src.Namespace, workload_src.Kind, workload_dest.Name, workload_dest.Namespace, workload_dest.Kind, actualDestWorkload.Name, actualDestWorkload.Namespace, actualDestWorkload.Kind)
}
for dst, count := range c.connectsFailed {
ch <- counter(metrics.NetConnectsFailed, float64(count), dst.String())
workload := c.ip_resolver.ResolveIP(dst.IP().String())
ch <- counter(metrics.NetConnectsFailed, float64(count), dst.String(), workload.Name, workload.Namespace, workload.Kind, workload.Name, workload.Namespace, workload.Kind)
}
for d, count := range c.retransmits {
ch <- counter(metrics.NetRetransmits, float64(count), d.src.String(), d.dst.String())
workload_src := d.srcWorkload
workload_dest := d.dstWorkload

if d.srcWorkload.Name == "" || len(d.srcWorkload.Name) == 0 {
workload_src = c.ip_resolver.ResolveIP(d.src.IP().String())
workload_dest = c.ip_resolver.ResolveIP(d.dst.IP().String())
}
ch <- counter(metrics.NetRetransmits, float64(count), d.src.String(), d.dst.String(), workload_src.Name, workload_src.Namespace, workload_src.Kind, workload_dest.Name, workload_dest.Namespace, workload_dest.Kind)
}

connections := map[AddrPair]int{}
for addrPair, conn := range c.connectionsActive {
if !conn.Closed.IsZero() {
continue
}
connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest}]++
workload_src := c.ip_resolver.ResolveIP(addrPair.dst.IP().String())
workload_dest := c.ip_resolver.ResolveIP(conn.ActualDest.IP().String())
actualDestWorkload := c.ip_resolver.ResolveActualIP(conn.ActualDest.IP().String())
connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest, srcWorkload: workload_src, dstWorkload: workload_dest, actualDestWorkload: actualDestWorkload}]++
}
for d, count := range connections {
ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String())
ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String(), d.srcWorkload.Name, d.srcWorkload.Namespace, d.srcWorkload.Kind, d.dstWorkload.Name, d.dstWorkload.Namespace, d.dstWorkload.Kind, d.actualDestWorkload.Name, d.actualDestWorkload.Namespace, d.actualDestWorkload.Kind)
}

for source, p := range c.logParsers {
for _, c := range p.parser.GetCounters() {
ch <- counter(metrics.LogMessages, float64(c.Messages), source, c.Level.String(), c.Hash, c.Sample)
if c.Level == logparser.LevelCritical || c.Level == logparser.LevelError {
ch <- counter(metrics.LogMessages, float64(c.Messages), source, c.Level.String(), c.Hash, c.Sample)
}
}
}

Expand Down Expand Up @@ -506,15 +534,22 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
if failed {
c.connectsFailed[dst]++
} else {
c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst}]++
srcWorkload := c.ip_resolver.ResolveIP(src.IP().String())
dstWorkload := c.ip_resolver.ResolveIP(dst.IP().String())
actialDestWorkload := c.ip_resolver.ResolveActualIP(actualDst.IP().String())
c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst, srcWorkload: srcWorkload,
dstWorkload: dstWorkload, actualDestWorkload: actialDestWorkload}]++
connection := &ActiveConnection{
Dest: dst,
ActualDest: *actualDst,
Pid: pid,
Fd: fd,
Timestamp: timestamp,
}
c.connectionsActive[AddrPair{src: src, dst: dst}] = connection
Dest: dst,
ActualDest: *actualDst,
Pid: pid,
Fd: fd,
Timestamp: timestamp,
srcWorkload: srcWorkload,
dstWorkload: dstWorkload,
actialDestWorkload: actialDestWorkload,
}
c.connectionsActive[AddrPair{src: src, dst: dst, srcWorkload: srcWorkload, dstWorkload: dstWorkload, actualDestWorkload: actialDestWorkload}] = connection
c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] = connection
}
c.connectLastAttempt[dst] = time.Now()
Expand Down Expand Up @@ -572,7 +607,8 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
if timestamp != 0 && conn.Timestamp != timestamp {
return
}
stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest, r)

stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest, r, conn.srcWorkload, conn.dstWorkload, conn.actialDestWorkload)
trace := tracing.NewTrace(string(c.id), conn.ActualDest)
switch r.Protocol {
case l7.ProtocolHTTP:
Expand Down Expand Up @@ -634,7 +670,9 @@ func (c *Container) onRetransmit(srcDst AddrPair) bool {
if !ok {
return false
}
c.retransmits[AddrPair{src: srcDst.dst, dst: conn.ActualDest}]++
src_workload := c.ip_resolver.ResolveIP(srcDst.dst.IP().String())
dst_workload := c.ip_resolver.ResolveIP(conn.ActualDest.IP().String())
c.retransmits[AddrPair{src: srcDst.dst, dst: conn.ActualDest, srcWorkload: src_workload, dstWorkload: dst_workload}]++
return true
}

Expand Down
22 changes: 19 additions & 3 deletions containers/l7.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package containers
import (
"time"

"github.com/coroot/coroot-node-agent/common"
"github.com/coroot/coroot-node-agent/ebpftracer/l7"
"github.com/prometheus/client_golang/prometheus"
"inet.af/netaddr"
Expand Down Expand Up @@ -36,7 +37,7 @@ func (m *L7Metrics) observe(status, method string, duration time.Duration) {

type L7Stats map[l7.Protocol]map[AddrPair]*L7Metrics // protocol -> dst:actual_dst -> metrics

func (s L7Stats) get(protocol l7.Protocol, destination, actualDestination netaddr.IPPort, r *l7.RequestData) *L7Metrics {
func (s L7Stats) get(protocol l7.Protocol, destination, actualDestination netaddr.IPPort, r *l7.RequestData, srcWorkload common.Workload, dstWorkload common.Workload, actualDstWorkload common.Workload) *L7Metrics {
if protocol == l7.ProtocolHTTP2 {
protocol = l7.ProtocolHTTP
}
Expand All @@ -45,12 +46,23 @@ func (s L7Stats) get(protocol l7.Protocol, destination, actualDestination netadd
protoStats = map[AddrPair]*L7Metrics{}
s[protocol] = protoStats
}
dest := AddrPair{src: destination, dst: actualDestination}
dest := AddrPair{src: destination, dst: actualDestination, srcWorkload: srcWorkload, dstWorkload: dstWorkload, actualDestWorkload: actualDstWorkload}
m := protoStats[dest]
if m == nil {
m = &L7Metrics{}
protoStats[dest] = m
constLabels := map[string]string{"destination": destination.String(), "actual_destination": actualDestination.String()}
constLabels := map[string]string{"destination": destination.String(),
"actual_destination": actualDestination.String(),
"destination_workload_kind": dstWorkload.Kind,
"destination_workload_name": dstWorkload.Name,
"destination_workload_namespace": dstWorkload.Namespace,
"src_kind": srcWorkload.Kind,
"src_workload_name": srcWorkload.Name,
"src_workload_namespace": srcWorkload.Namespace,
"actual_destination_workload_kind": actualDstWorkload.Kind,
"actual_destination_workload_name": actualDstWorkload.Name,
"actual_destination_workload_namespace": actualDstWorkload.Namespace,
}
labels := []string{"status"}
switch protocol {
case l7.ProtocolRabbitmq, l7.ProtocolNats:
Expand All @@ -59,6 +71,10 @@ func (s L7Stats) get(protocol l7.Protocol, destination, actualDestination netadd
method, path := l7.ParseHttp(r.Payload)
constLabels["path"] = path
constLabels["method"] = method
hOpts := L7Latency[protocol]
m.Latency = prometheus.NewHistogram(
prometheus.HistogramOpts{Name: hOpts.Name, Help: hOpts.Help, ConstLabels: constLabels},
)
default:
hOpts := L7Latency[protocol]
m.Latency = prometheus.NewHistogram(
Expand Down
8 changes: 4 additions & 4 deletions containers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ var metrics = struct {
DiskWriteBytes: metric("container_resources_disk_written_bytes_total", "Total number of bytes written to the disk by the container", "mount_point", "device", "volume"),

NetListenInfo: metric("container_net_tcp_listen_info", "Listen address of the container", "listen_addr", "proxy"),
NetConnectsSuccessful: metric("container_net_tcp_successful_connects_total", "Total number of successful TCP connects", "destination", "actual_destination"),
NetConnectsFailed: metric("container_net_tcp_failed_connects_total", "Total number of failed TCP connects", "destination"),
NetConnectionsActive: metric("container_net_tcp_active_connections", "Number of active outbound connections used by the container", "destination", "actual_destination"),
NetRetransmits: metric("container_net_tcp_retransmits_total", "Total number of retransmitted TCP segments", "destination", "actual_destination"),
NetConnectsSuccessful: metric("container_net_tcp_successful_connects_total", "Total number of successful TCP connects", "destination", "actual_destination", "src_workload_name", "src_workload_namespace", "src_workload_kind", "destination_workload_name", "destination_workload_namespace", "destination_workload_kind", "actual_destination_workload_name", "actual_destination_workload_namespace", "actual_destination_workload_kind"),
NetConnectsFailed: metric("container_net_tcp_failed_connects_total", "Total number of failed TCP connects", "destination", "destination_workload_name", "destination_workload_namespace", "destination_workload_kind", "actual_destination_workload_name", "actual_destination_workload_namespace", "actual_destination_workload_kind"),
NetConnectionsActive: metric("container_net_tcp_active_connections", "Number of active outbound connections used by the container", "destination", "actual_destination", "src_workload_name", "src_workload_namespace", "src_workload_kind", "destination_workload_name", "destination_workload_namespace", "destination_workload_kind", "actual_destination_workload_name", "actual_destination_workload_namespace", "actual_destination_workload_kind"),
NetRetransmits: metric("container_net_tcp_retransmits_total", "Total number of retransmitted TCP segments", "destination", "actual_destination", "src_workload_name", "src_workload_namespace", "src_workload_kind", "destination_workload_name", "destination_workload_namespace", "destination_workload_kind", "actual_destination_workload_name", "actual_destination_workload_namespace", "actual_destination_workload_kind"),
NetLatency: metric("container_net_latency_seconds", "Round-trip time between the container and a remote IP", "destination_ip"),

LogMessages: metric("container_log_messages_total", "Number of messages grouped by the automatically extracted repeated pattern", "source", "level", "pattern_hash", "sample"),
Expand Down
15 changes: 12 additions & 3 deletions containers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ type ProcessInfo struct {
StartedAt time.Time
}

type IPResolver interface {
ResolveIP(string) common.Workload
ResolveActualIP(string) common.Workload
StartWatching() error
StopWatching()
}

type Registry struct {
reg prometheus.Registerer

Expand All @@ -44,9 +51,10 @@ type Registry struct {
containersByPid map[uint32]*Container

processInfoCh chan<- ProcessInfo
ip_resolver IPResolver
}

func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh chan<- ProcessInfo) (*Registry, error) {
func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh chan<- ProcessInfo, ip_resolver *common.K8sIPResolver) (*Registry, error) {
ns, err := proc.GetSelfNetNs()
if err != nil {
return nil, err
Expand Down Expand Up @@ -100,7 +108,8 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh

processInfoCh: processInfoCh,

tracer: ebpftracer.NewTracer(kernelVersion, *flags.DisableL7Tracing),
tracer: ebpftracer.NewTracer(kernelVersion, *flags.DisableL7Tracing),
ip_resolver: ip_resolver,
}

go r.handleEvents(r.events)
Expand Down Expand Up @@ -303,7 +312,7 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
r.containersByCgroupId[cg.Id] = c
return c
}
c, err := NewContainer(id, cg, md, r.hostConntrack, pid)
c, err := NewContainer(id, cg, md, r.hostConntrack, pid, r.ip_resolver)
if err != nil {
klog.Warningf("failed to create container pid=%d cg=%s id=%s: %s", pid, cg.Id, id, err)
return nil
Expand Down
3 changes: 2 additions & 1 deletion flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ var (
ApiKey = kingpin.Flag("api-key", "Coroot API key").Envar("API_KEY").String()
ScrapeInterval = kingpin.Flag("scrape-interval", "How often to gather metrics from the agent").Default("15s").Envar("SCRAPE_INTERVAL").Duration()

WalDir = kingpin.Flag("wal-dir", "Path to where the agent stores data (e.g. the metrics Write-Ahead Log)").Default("/tmp/coroot-node-agent").Envar("WAL_DIR").String()
WalDir = kingpin.Flag("wal-dir", "Path to where the agent stores data (e.g. the metrics Write-Ahead Log)").Default("/tmp/coroot-node-agent").Envar("WAL_DIR").String()
ResolveDns = kingpin.Flag("resolve-dns", "should resolve DNS").Default("true").Envar("RESOLVE_DNS").Bool()
)

func GetString(fl *string) string {
Expand Down
20 changes: 16 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ require (
github.com/docker/docker v25.0.0+incompatible
github.com/florianl/go-conntrack v0.3.0
github.com/go-kit/log v0.2.1
github.com/google/uuid v1.5.0
github.com/grafana/pyroscope/ebpf v0.4.1
github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/jpillora/backoff v1.0.0
github.com/mdlayher/taskstats v0.0.0-20230712191918-387b3d561d14
github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417
Expand All @@ -40,6 +42,9 @@ require (
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/yaml.v2 v2.4.0
inet.af/netaddr v0.0.0-20230525184311-b8eac61e914a
k8s.io/api v0.28.6
k8s.io/apimachinery v0.28.6
k8s.io/client-go v0.28.6
k8s.io/klog/v2 v2.120.1
)

Expand Down Expand Up @@ -73,6 +78,8 @@ require (
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
Expand All @@ -96,13 +103,13 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20240117000934-35fc243c5815 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/grafana/regexp v0.0.0-20221123153739-15dc172cd2db // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
Expand All @@ -125,6 +132,7 @@ require (
github.com/moby/term v0.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
Expand Down Expand Up @@ -169,18 +177,22 @@ require (
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect
golang.org/x/oauth2 v0.16.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/term v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.17.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240116215550-a9fa1716bcac // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac // indirect
google.golang.org/grpc v1.61.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apimachinery v0.28.6 // indirect
k8s.io/client-go v0.28.6 // indirect
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)

replace (
Expand Down
Loading