Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions common/ip_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type K8sIPResolver struct {
stopSignal chan bool
shouldResolveDns bool
dnsResolvedIps *lrucache.Cache[string, string]
podIpsMap sync.Map
}

type Workload struct {
Expand All @@ -81,11 +82,40 @@ func NewK8sIPResolver(clientset kubernetes.Interface, resolveDns bool) (*K8sIPRe
stopSignal: make(chan bool),
shouldResolveDns: resolveDns,
dnsResolvedIps: dnsCache,
podIpsMap: sync.Map{},
}, nil
}

// resolve the given IP from the resolver's cache
// if not available, return the IP itself.
func (resolver *K8sIPResolver) ResolveActualIP(ip string) Workload {
if val, ok := resolver.podIpsMap.Load(ip); ok {
entry, ok := val.(Workload)
if ok {
return entry
}
}
host := ip

if resolver.shouldResolveDns {
val, ok := resolver.dnsResolvedIps.Get(ip)
if ok {
host = val
} else {
hosts, err := net.LookupAddr(ip)
if err == nil && len(hosts) > 0 {
host = hosts[0]
}
resolver.dnsResolvedIps.Add(ip, host)
}
}
return Workload{
Name: host,
Namespace: "external",
Kind: "external",
}
}

func (resolver *K8sIPResolver) ResolveIP(ip string) Workload {
if val, ok := resolver.ipsMap.Load(ip); ok {
entry, ok := val.(Workload)
Expand All @@ -94,6 +124,13 @@ func (resolver *K8sIPResolver) ResolveIP(ip string) Workload {
}
log.Printf("type confusion in ipsMap")
}

if val, ok := resolver.podIpsMap.Load(ip); ok {
entry, ok := val.(Workload)
if ok {
return entry
}
}
host := ip

if resolver.shouldResolveDns {
Expand Down Expand Up @@ -334,6 +371,12 @@ func (resolver *K8sIPResolver) handlePodWatchEvent(podEvent *watch.Event) {
entry := resolver.resolvePodDescriptor(pod)
for _, podIp := range pod.Status.PodIPs {
resolver.storeWorkloadsIP(podIp.IP, &entry)
podWorkload := Workload{
Name: pod.Name,
Namespace: pod.Namespace,
Kind: "pod",
}
resolver.storePodsIP(podIp.IP, &podWorkload)
}
case watch.Modified:
pod, ok := podEvent.Object.(*v1.Pod)
Expand All @@ -344,6 +387,12 @@ func (resolver *K8sIPResolver) handlePodWatchEvent(podEvent *watch.Event) {
entry := resolver.resolvePodDescriptor(pod)
for _, podIp := range pod.Status.PodIPs {
resolver.storeWorkloadsIP(podIp.IP, &entry)
podWorkload := Workload{
Name: pod.Name,
Namespace: pod.Namespace,
Kind: "pod",
}
resolver.storePodsIP(podIp.IP, &podWorkload)
}
case watch.Deleted:
if val, ok := podEvent.Object.(*v1.Pod); ok {
Expand Down Expand Up @@ -627,6 +676,12 @@ func (resolver *K8sIPResolver) updateIpMapping() {
for _, podIp := range pod.Status.PodIPs {
// if ip is already in the map, override only if current pod is running
resolver.storeWorkloadsIP(podIp.IP, &entry)
podWorkload := Workload{
Name: pod.Name,
Namespace: pod.Namespace,
Kind: "pod",
}
resolver.storePodsIP(podIp.IP, &podWorkload)
}
return true
})
Expand Down Expand Up @@ -663,6 +718,10 @@ func (resolver *K8sIPResolver) storeWorkloadsIP(ip string, newWorkload *Workload
resolver.ipsMap.Store(ip, *newWorkload)
}

func (resolver *K8sIPResolver) storePodsIP(ip string, newWorkload *Workload) {
resolver.podIpsMap.Store(ip, *newWorkload)
}

// an ugly function to go up one level in hierarchy. maybe there's a better way to do it
// the snapshot is maintained to avoid using an API request for each resolving
func (resolver *K8sIPResolver) getControllerOfOwner(originalOwner *metav1.OwnerReference) (*metav1.OwnerReference, error) {
Expand Down
92 changes: 59 additions & 33 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,23 @@ func (p *LogParser) Stop() {
}

type AddrPair struct {
src netaddr.IPPort
dst netaddr.IPPort
srcWorkload common.Workload
dstWorkload common.Workload
src netaddr.IPPort
dst netaddr.IPPort
srcWorkload common.Workload
dstWorkload common.Workload
actualDestWorkload common.Workload
}

type ActiveConnection struct {
Dest netaddr.IPPort
ActualDest netaddr.IPPort
srcWorkload common.Workload
dstWorkload common.Workload
Pid uint32
Fd uint64
Timestamp uint64
Closed time.Time
Dest netaddr.IPPort
ActualDest netaddr.IPPort
srcWorkload common.Workload
dstWorkload common.Workload
actialDestWorkload common.Workload
Pid uint32
Fd uint64
Timestamp uint64
Closed time.Time

http2Parser *l7.Http2Parser
postgresParser *l7.PostgresParser
Expand Down Expand Up @@ -292,29 +294,49 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
}

for d, count := range c.connectsSuccessful {
ch <- counter(metrics.NetConnectsSuccessful, float64(count), d.src.String(), d.dst.String())
workload_src := d.srcWorkload
workload_dest := d.dstWorkload
actualDestWorkload := d.actualDestWorkload
if d.srcWorkload.Name == "" || len(d.srcWorkload.Name) == 0 {
workload_src = c.ip_resolver.ResolveIP(d.src.IP().String())
workload_dest = c.ip_resolver.ResolveIP(d.dst.IP().String())
}
ch <- counter(metrics.NetConnectsSuccessful, float64(count), d.src.String(), d.dst.String(), workload_src.Name, workload_src.Namespace, workload_src.Kind, workload_dest.Name, workload_dest.Namespace, workload_dest.Kind, actualDestWorkload.Name, actualDestWorkload.Namespace, actualDestWorkload.Kind)
}
for dst, count := range c.connectsFailed {
ch <- counter(metrics.NetConnectsFailed, float64(count), dst.String())
workload := c.ip_resolver.ResolveIP(dst.IP().String())
ch <- counter(metrics.NetConnectsFailed, float64(count), dst.String(), workload.Name, workload.Namespace, workload.Kind, workload.Name, workload.Namespace, workload.Kind)
}
for d, count := range c.retransmits {
ch <- counter(metrics.NetRetransmits, float64(count), d.src.String(), d.dst.String())
workload_src := d.srcWorkload
workload_dest := d.dstWorkload

if d.srcWorkload.Name == "" || len(d.srcWorkload.Name) == 0 {
workload_src = c.ip_resolver.ResolveIP(d.src.IP().String())
workload_dest = c.ip_resolver.ResolveIP(d.dst.IP().String())
}
ch <- counter(metrics.NetRetransmits, float64(count), d.src.String(), d.dst.String(), workload_src.Name, workload_src.Namespace, workload_src.Kind, workload_dest.Name, workload_dest.Namespace, workload_dest.Kind)
}

connections := map[AddrPair]int{}
for addrPair, conn := range c.connectionsActive {
if !conn.Closed.IsZero() {
continue
}
connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest}]++
workload_src := c.ip_resolver.ResolveIP(addrPair.dst.IP().String())
workload_dest := c.ip_resolver.ResolveIP(conn.ActualDest.IP().String())
actualDestWorkload := c.ip_resolver.ResolveActualIP(conn.ActualDest.IP().String())
connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest, srcWorkload: workload_src, dstWorkload: workload_dest, actualDestWorkload: actualDestWorkload}]++
}
for d, count := range connections {
ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String())
ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String(), d.srcWorkload.Name, d.srcWorkload.Namespace, d.srcWorkload.Kind, d.dstWorkload.Name, d.dstWorkload.Namespace, d.dstWorkload.Kind, d.actualDestWorkload.Name, d.actualDestWorkload.Namespace, d.actualDestWorkload.Kind)
}

for source, p := range c.logParsers {
for _, c := range p.parser.GetCounters() {
ch <- counter(metrics.LogMessages, float64(c.Messages), source, c.Level.String(), c.Hash, c.Sample)
if c.Level == logparser.LevelCritical || c.Level == logparser.LevelError {
ch <- counter(metrics.LogMessages, float64(c.Messages), source, c.Level.String(), c.Hash, c.Sample)
}
}
}

Expand Down Expand Up @@ -491,9 +513,6 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
if dst.IP().IsLoopback() && !p.isHostNs() {
return
}
srcWorkload := c.ip_resolver.ResolveIP(netaddr.IPPort(src).IP().String())
dstWorkload := c.ip_resolver.ResolveIP(netaddr.IPPort(dst).IP().String())

actualDst, err := c.getActualDestination(p, src, dst)
if err != nil {
if !common.IsNotExist(err) {
Expand All @@ -515,17 +534,22 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
if failed {
c.connectsFailed[dst]++
} else {
c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst}]++
srcWorkload := c.ip_resolver.ResolveIP(src.IP().String())
dstWorkload := c.ip_resolver.ResolveIP(dst.IP().String())
actialDestWorkload := c.ip_resolver.ResolveActualIP(actualDst.IP().String())
c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst, srcWorkload: srcWorkload,
dstWorkload: dstWorkload, actualDestWorkload: actialDestWorkload}]++
connection := &ActiveConnection{
Dest: dst,
ActualDest: *actualDst,
Pid: pid,
Fd: fd,
Timestamp: timestamp,
srcWorkload: srcWorkload,
dstWorkload: dstWorkload,
}
c.connectionsActive[AddrPair{src: src, dst: dst, srcWorkload: srcWorkload, dstWorkload: dstWorkload}] = connection
Dest: dst,
ActualDest: *actualDst,
Pid: pid,
Fd: fd,
Timestamp: timestamp,
srcWorkload: srcWorkload,
dstWorkload: dstWorkload,
actialDestWorkload: actialDestWorkload,
}
c.connectionsActive[AddrPair{src: src, dst: dst, srcWorkload: srcWorkload, dstWorkload: dstWorkload, actualDestWorkload: actialDestWorkload}] = connection
c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] = connection
}
c.connectLastAttempt[dst] = time.Now()
Expand Down Expand Up @@ -584,7 +608,7 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
return
}

stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest, r, conn.srcWorkload, conn.dstWorkload)
stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest, r, conn.srcWorkload, conn.dstWorkload, conn.actialDestWorkload)
trace := tracing.NewTrace(string(c.id), conn.ActualDest)
switch r.Protocol {
case l7.ProtocolHTTP:
Expand Down Expand Up @@ -646,7 +670,9 @@ func (c *Container) onRetransmit(srcDst AddrPair) bool {
if !ok {
return false
}
c.retransmits[AddrPair{src: srcDst.dst, dst: conn.ActualDest}]++
src_workload := c.ip_resolver.ResolveIP(srcDst.dst.IP().String())
dst_workload := c.ip_resolver.ResolveIP(conn.ActualDest.IP().String())
c.retransmits[AddrPair{src: srcDst.dst, dst: conn.ActualDest, srcWorkload: src_workload, dstWorkload: dst_workload}]++
return true
}

Expand Down
21 changes: 18 additions & 3 deletions containers/l7.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (m *L7Metrics) observe(status, method string, duration time.Duration) {

type L7Stats map[l7.Protocol]map[AddrPair]*L7Metrics // protocol -> dst:actual_dst -> metrics

func (s L7Stats) get(protocol l7.Protocol, destination, actualDestination netaddr.IPPort, r *l7.RequestData, srcWorkload common.Workload, dstWorkload common.Workload) *L7Metrics {
func (s L7Stats) get(protocol l7.Protocol, destination, actualDestination netaddr.IPPort, r *l7.RequestData, srcWorkload common.Workload, dstWorkload common.Workload, actualDstWorkload common.Workload) *L7Metrics {
if protocol == l7.ProtocolHTTP2 {
protocol = l7.ProtocolHTTP
}
Expand All @@ -46,12 +46,23 @@ func (s L7Stats) get(protocol l7.Protocol, destination, actualDestination netadd
protoStats = map[AddrPair]*L7Metrics{}
s[protocol] = protoStats
}
dest := AddrPair{src: destination, dst: actualDestination}
dest := AddrPair{src: destination, dst: actualDestination, srcWorkload: srcWorkload, dstWorkload: dstWorkload, actualDestWorkload: actualDstWorkload}
m := protoStats[dest]
if m == nil {
m = &L7Metrics{}
protoStats[dest] = m
constLabels := map[string]string{"destination": destination.String(), "actual_destination": actualDestination.String(), "dest_kind": dstWorkload.Kind, "dest_workload_name": dstWorkload.Name, "dest_workload_namespace": dstWorkload.Namespace, "src_kind": srcWorkload.Kind, "src_workload_name": srcWorkload.Name, "src_workload_namespace": srcWorkload.Namespace}
constLabels := map[string]string{"destination": destination.String(),
"actual_destination": actualDestination.String(),
"destination_workload_kind": dstWorkload.Kind,
"destination_workload_name": dstWorkload.Name,
"destination_workload_namespace": dstWorkload.Namespace,
"src_kind": srcWorkload.Kind,
"src_workload_name": srcWorkload.Name,
"src_workload_namespace": srcWorkload.Namespace,
"actual_destination_workload_kind": actualDstWorkload.Kind,
"actual_destination_workload_name": actualDstWorkload.Name,
"actual_destination_workload_namespace": actualDstWorkload.Namespace,
}
labels := []string{"status"}
switch protocol {
case l7.ProtocolRabbitmq, l7.ProtocolNats:
Expand All @@ -60,6 +71,10 @@ func (s L7Stats) get(protocol l7.Protocol, destination, actualDestination netadd
method, path := l7.ParseHttp(r.Payload)
constLabels["path"] = path
constLabels["method"] = method
hOpts := L7Latency[protocol]
m.Latency = prometheus.NewHistogram(
prometheus.HistogramOpts{Name: hOpts.Name, Help: hOpts.Help, ConstLabels: constLabels},
)
default:
hOpts := L7Latency[protocol]
m.Latency = prometheus.NewHistogram(
Expand Down
8 changes: 4 additions & 4 deletions containers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ var metrics = struct {
DiskWriteBytes: metric("container_resources_disk_written_bytes_total", "Total number of bytes written to the disk by the container", "mount_point", "device", "volume"),

NetListenInfo: metric("container_net_tcp_listen_info", "Listen address of the container", "listen_addr", "proxy"),
NetConnectsSuccessful: metric("container_net_tcp_successful_connects_total", "Total number of successful TCP connects", "destination", "actual_destination"),
NetConnectsFailed: metric("container_net_tcp_failed_connects_total", "Total number of failed TCP connects", "destination"),
NetConnectionsActive: metric("container_net_tcp_active_connections", "Number of active outbound connections used by the container", "destination", "actual_destination"),
NetRetransmits: metric("container_net_tcp_retransmits_total", "Total number of retransmitted TCP segments", "destination", "actual_destination"),
NetConnectsSuccessful: metric("container_net_tcp_successful_connects_total", "Total number of successful TCP connects", "destination", "actual_destination", "src_workload_name", "src_workload_namespace", "src_workload_kind", "destination_workload_name", "destination_workload_namespace", "destination_workload_kind", "actual_destination_workload_name", "actual_destination_workload_namespace", "actual_destination_workload_kind"),
NetConnectsFailed: metric("container_net_tcp_failed_connects_total", "Total number of failed TCP connects", "destination", "destination_workload_name", "destination_workload_namespace", "destination_workload_kind", "actual_destination_workload_name", "actual_destination_workload_namespace", "actual_destination_workload_kind"),
NetConnectionsActive: metric("container_net_tcp_active_connections", "Number of active outbound connections used by the container", "destination", "actual_destination", "src_workload_name", "src_workload_namespace", "src_workload_kind", "destination_workload_name", "destination_workload_namespace", "destination_workload_kind", "actual_destination_workload_name", "actual_destination_workload_namespace", "actual_destination_workload_kind"),
NetRetransmits: metric("container_net_tcp_retransmits_total", "Total number of retransmitted TCP segments", "destination", "actual_destination", "src_workload_name", "src_workload_namespace", "src_workload_kind", "destination_workload_name", "destination_workload_namespace", "destination_workload_kind", "actual_destination_workload_name", "actual_destination_workload_namespace", "actual_destination_workload_kind"),
NetLatency: metric("container_net_latency_seconds", "Round-trip time between the container and a remote IP", "destination_ip"),

LogMessages: metric("container_log_messages_total", "Number of messages grouped by the automatically extracted repeated pattern", "source", "level", "pattern_hash", "sample"),
Expand Down
1 change: 1 addition & 0 deletions containers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type ProcessInfo struct {

type IPResolver interface {
ResolveIP(string) common.Workload
ResolveActualIP(string) common.Workload
StartWatching() error
StopWatching()
}
Expand Down