Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions containers/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ var (

func guessApplicationType(cmdline []byte) string {
parts := bytes.Split(cmdline, []byte{0})
if len(parts) == 0 || len(parts[0]) == 0 {
return ""
}
cmd := bytes.TrimSuffix(bytes.Fields(parts[0])[0], []byte{':'})
switch {
case bytes.HasSuffix(cmd, []byte("memcached")):
Expand All @@ -29,6 +32,8 @@ func guessApplicationType(cmdline []byte) string {
return "mongos"
case bytes.HasSuffix(cmd, []byte("mysqld")):
return "mysql"
case bytes.HasSuffix(cmd, []byte("mariadbd")):
return "mysql"
case bytes.Contains(cmdline, []byte("org.apache.zookeeper.server.quorum.QuorumPeerMain")):
return "zookeeper"
case bytes.HasSuffix(cmd, []byte("redis-server")):
Expand Down
155 changes: 110 additions & 45 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ type ActiveConnection struct {
Timestamp uint64
Closed time.Time

BytesSent uint64
BytesReceived uint64

http2Parser *l7.Http2Parser
postgresParser *l7.PostgresParser
mysqlParser *l7.MysqlParser
Expand All @@ -103,6 +106,14 @@ type PidFd struct {
Fd uint64
}

type ConnectionStats struct {
Count uint64
TotalTime time.Duration
Retransmissions uint64
BytesSent uint64
BytesReceived uint64
}

type Container struct {
id ContainerID
cgroup *cgroup.Cgroup
Expand All @@ -121,17 +132,17 @@ type Container struct {
listens map[netaddr.IPPort]map[uint32]*ListenDetails
ipsByNs map[string][]netaddr.IP

connectsSuccessful map[AddrPair]int64 // dst:actual_dst -> count
connectsFailed map[netaddr.IPPort]int64 // dst -> count
connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
connectsSuccessful map[AddrPair]*ConnectionStats // dst:actual_dst -> count
connectsFailed map[netaddr.IPPort]int64 // dst -> count
connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
connectionsActive map[AddrPair]*ActiveConnection
connectionsByPidFd map[PidFd]*ActiveConnection
retransmits map[AddrPair]int64 // dst:actual_dst -> count

l7Stats L7Stats
dnsStats *L7Metrics

oomKills int
oomKills int
pythonThreadLockWaitTime time.Duration

mounts map[string]proc.MountInfo

Expand All @@ -141,14 +152,16 @@ type Container struct {
nsConntrack *Conntrack
lbConntracks []*Conntrack

registry *Registry

lock sync.RWMutex

done chan struct{}
ip_resolver IPResolver
hostIpsMap sync.Map
}

func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, ip_resolver IPResolver) (*Container, error) {
func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, registry *Registry) (*Container, error) {
netNs, err := proc.GetNetNs(pid)
if err != nil {
return nil, err
Expand All @@ -166,12 +179,11 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
listens: map[netaddr.IPPort]map[uint32]*ListenDetails{},
ipsByNs: map[string][]netaddr.IP{},

connectsSuccessful: map[AddrPair]int64{},
connectsSuccessful: map[AddrPair]*ConnectionStats{},
connectsFailed: map[netaddr.IPPort]int64{},
connectLastAttempt: map[netaddr.IPPort]time.Time{},
connectionsActive: map[AddrPair]*ActiveConnection{},
connectionsByPidFd: map[PidFd]*ActiveConnection{},
retransmits: map[AddrPair]int64{},
l7Stats: L7Stats{},
dnsStats: &L7Metrics{},

Expand All @@ -182,8 +194,9 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
hostConntrack: hostConntrack,

done: make(chan struct{}),
ip_resolver: ip_resolver,
ip_resolver: registry.ip_resolver,
hostIpsMap: sync.Map{},
registry: registry,
}

for _, n := range md.networks {
Expand Down Expand Up @@ -238,6 +251,8 @@ func (c *Container) Describe(ch chan<- *prometheus.Desc) {
}

func (c *Container) Collect(ch chan<- prometheus.Metric) {
c.registry.updateTrafficStatsIfNecessary()

c.lock.RLock()
defer c.lock.RUnlock()

Expand Down Expand Up @@ -304,7 +319,7 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
}
}

for d, count := range c.connectsSuccessful {
for d, stats := range c.connectsSuccessful {
workload_src := d.srcWorkload
workload_dest := d.dstWorkload
actualDestWorkload := d.actualDestWorkload
Expand All @@ -318,26 +333,17 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
workload_dest = common.Workload{Name: host.(string), Namespace: "external", Kind: "external"}
}
}
ch <- counter(metrics.NetConnectsSuccessful, float64(count), d.src.String(), d.dst.String(), workload_src.Name, workload_src.Namespace, workload_src.Kind, workload_dest.Name, workload_dest.Namespace, workload_dest.Kind, actualDestWorkload.Name, actualDestWorkload.Namespace, actualDestWorkload.Kind)
ch <- counter(metrics.NetConnectionsSuccessful, float64(stats.Count), d.src.String(), d.dst.String(), workload_src.Name, workload_src.Namespace, workload_src.Kind, workload_dest.Name, workload_dest.Namespace, workload_dest.Kind, actualDestWorkload.Name, actualDestWorkload.Namespace, actualDestWorkload.Kind)
ch <- counter(metrics.NetConnectionsTotalTime, stats.TotalTime.Seconds(), d.src.String(), d.dst.String(), workload_src.Name, workload_src.Namespace, workload_src.Kind, workload_dest.Name, workload_dest.Namespace, workload_dest.Kind, actualDestWorkload.Name, actualDestWorkload.Namespace, actualDestWorkload.Kind)
if stats.Retransmissions > 0 {
ch <- counter(metrics.NetRetransmits, float64(stats.Retransmissions), d.src.String(), d.dst.String(), workload_src.Name, workload_src.Namespace, workload_src.Kind, workload_dest.Name, workload_dest.Namespace, workload_dest.Kind, actualDestWorkload.Name, actualDestWorkload.Namespace, actualDestWorkload.Kind)
}
ch <- counter(metrics.NetBytesSent, float64(stats.BytesSent), d.src.String(), d.dst.String(), workload_src.Name, workload_src.Namespace, workload_src.Kind, workload_dest.Name, workload_dest.Namespace, workload_dest.Kind, actualDestWorkload.Name, actualDestWorkload.Namespace, actualDestWorkload.Kind)
ch <- counter(metrics.NetBytesReceived, float64(stats.BytesReceived), d.src.String(), d.dst.String(), workload_src.Name, workload_src.Namespace, workload_src.Kind, workload_dest.Name, workload_dest.Namespace, workload_dest.Kind, actualDestWorkload.Name, actualDestWorkload.Namespace, actualDestWorkload.Kind)
}
for dst, count := range c.connectsFailed {
workload := c.ip_resolver.ResolveIP(dst.IP().String())
ch <- counter(metrics.NetConnectsFailed, float64(count), dst.String(), workload.Name, workload.Namespace, workload.Kind, workload.Name, workload.Namespace, workload.Kind)
}
for d, count := range c.retransmits {
workload_src := d.srcWorkload
workload_dest := d.dstWorkload

if d.srcWorkload.Name == "" || len(d.srcWorkload.Name) == 0 {
workload_src = c.ip_resolver.ResolveIP(d.src.IP().String())
workload_dest = c.ip_resolver.ResolveIP(d.dst.IP().String())
}
if d.dstWorkload.Kind == "external" {
if host, ok := c.hostIpsMap.Load(d.dst); ok {
workload_dest = common.Workload{Name: host.(string), Namespace: "external", Kind: "external"}
}
}
ch <- counter(metrics.NetRetransmits, float64(count), d.src.String(), d.dst.String(), workload_src.Name, workload_src.Namespace, workload_src.Kind, workload_dest.Name, workload_dest.Namespace, workload_dest.Kind)
ch <- counter(metrics.NetConnectionsFailed, float64(count), dst.String(), workload.Name, workload.Namespace, workload.Kind, workload.Name, workload.Namespace, workload.Kind)
}

connections := map[AddrPair]int{}
Expand Down Expand Up @@ -398,6 +404,10 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
for appType := range appTypes {
ch <- gauge(metrics.ApplicationType, 1, appType)
}
if c.pythonThreadLockWaitTime > 0 {
ch <- counter(metrics.PythonThreadLockWaitTime, c.pythonThreadLockWaitTime.Seconds())
}

if c.dnsStats.Requests != nil {
c.dnsStats.Requests.Collect(ch)
}
Expand All @@ -408,7 +418,8 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {

if !*flags.DisablePinger {
for ip, rtt := range c.ping() {
ch <- gauge(metrics.NetLatency, rtt, ip.String())
destination_workload := c.ip_resolver.ResolveIP(ip.String())
ch <- gauge(metrics.NetLatency, rtt, ip.String(), destination_workload.Name, destination_workload.Namespace, destination_workload.Kind)
}
}
}
Expand All @@ -421,7 +432,8 @@ func (c *Container) onProcessStart(pid uint32) *Process {
return nil
}
c.zombieAt = time.Time{}
p := NewProcess(pid, stats)
p := NewProcess(pid, stats, c.registry.tracer)

if p == nil {
return nil
}
Expand Down Expand Up @@ -553,7 +565,7 @@ func ignoreControlPlane(name string) bool {
return false
}

func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool) {
func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
if common.PortFilter.ShouldBeSkipped(dst.Port()) {
return
}
Expand Down Expand Up @@ -595,8 +607,15 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
c.connectsFailed[dst]++
} else {
actualDestWorkload := c.ip_resolver.ResolveActualIP(actualDst.IP().String())
c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst, srcWorkload: srcWorkload,
dstWorkload: dstWorkload, actualDestWorkload: actualDestWorkload}]++
key := AddrPair{src: dst, dst: *actualDst, srcWorkload: srcWorkload,
dstWorkload: dstWorkload, actualDestWorkload: actualDestWorkload}
stats := c.connectsSuccessful[key]
if stats == nil {
stats = &ConnectionStats{}
c.connectsSuccessful[key] = stats
}
stats.Count++
stats.TotalTime += duration
connection := &ActiveConnection{
Dest: dst,
ActualDest: *actualDst,
Expand Down Expand Up @@ -643,15 +662,60 @@ func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*
return nil, nil
}

func (c *Container) onConnectionClose(srcDst AddrPair) bool {
func (c *Container) onConnectionClose(e ebpftracer.Event) bool {
srcDst := AddrPair{src: e.SrcAddr, dst: e.DstAddr}
c.lock.Lock()
conn, ok := c.connectionsActive[srcDst]
c.lock.Unlock()
if conn != nil {
if conn.Closed.IsZero() {
if e.Pid == 0 && e.Fd == 0 {
stats, err := c.registry.tracer.GetAndDeleteTCPConnection(conn.Pid, conn.Fd)
if err != nil {
klog.Warningln(c.id, conn.Pid, conn.Fd, conn.ActualDest, err)
} else {
c.lock.Lock()
c.updateConnectionTrafficStats(conn, stats.BytesSent, stats.BytesReceived)
c.lock.Unlock()
}
} else if e.TrafficStats != nil {
c.lock.Lock()
c.updateConnectionTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived)
c.lock.Unlock()
}
conn.Closed = time.Now()
}
}
return ok
}

func (c *Container) updateTrafficStats(u *TrafficStatsUpdate) {
if u == nil {
return
}
c.lock.Lock()
defer c.lock.Unlock()
conn := c.connectionsActive[srcDst]
if conn == nil {
return false
c.updateConnectionTrafficStats(c.connectionsByPidFd[PidFd{Pid: u.Pid, Fd: u.FD}], u.BytesSent, u.BytesReceived)
}

func (c *Container) updateConnectionTrafficStats(ac *ActiveConnection, sent, received uint64) {
if ac == nil {
return
}
conn.Closed = time.Now()
return true
key := AddrPair{src: ac.Dest, dst: ac.ActualDest}
stats := c.connectsSuccessful[key]
if stats == nil {
stats = &ConnectionStats{}
c.connectsSuccessful[key] = stats
}
if sent > ac.BytesSent {
stats.BytesSent += sent - ac.BytesSent
}
if received > ac.BytesReceived {
stats.BytesReceived += received - ac.BytesReceived
}
ac.BytesSent = sent
ac.BytesReceived = received
}

func (c *Container) onDNSRequest(r *l7.RequestData) map[netaddr.IP]string {
Expand Down Expand Up @@ -805,7 +869,7 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
return nil
}

func (c *Container) onRetransmit(srcDst AddrPair) bool {
func (c *Container) onRetransmission(srcDst AddrPair) bool {
c.lock.Lock()
defer c.lock.Unlock()
conn, ok := c.connectionsActive[srcDst]
Expand All @@ -814,7 +878,13 @@ func (c *Container) onRetransmit(srcDst AddrPair) bool {
}
src_workload := c.ip_resolver.ResolveIP(srcDst.dst.IP().String())
dst_workload := c.ip_resolver.ResolveIP(conn.ActualDest.IP().String())
c.retransmits[AddrPair{src: srcDst.dst, dst: conn.ActualDest, srcWorkload: src_workload, dstWorkload: dst_workload}]++
key := AddrPair{src: srcDst.dst, dst: conn.ActualDest, srcWorkload: src_workload, dstWorkload: dst_workload}
stats := c.connectsSuccessful[key]
if stats == nil {
stats = &ConnectionStats{}
c.connectsSuccessful[key] = stats
}
stats.Retransmissions++
return true
}

Expand Down Expand Up @@ -1112,11 +1182,6 @@ func (c *Container) gc(now time.Time) {
delete(c.connectsSuccessful, d)
}
}
for d := range c.retransmits {
if d.src == dst {
delete(c.retransmits, d)
}
}
c.l7Stats.delete(dst)
}
}
Expand Down
Loading