diff --git a/common/ip_resolver.go b/common/ip_resolver.go index c7b1714d..3350235b 100644 --- a/common/ip_resolver.go +++ b/common/ip_resolver.go @@ -48,6 +48,12 @@ type clusterSnapshot struct { PodDescriptors sync.Map // map[types.UID]Workload } +type InstanceMeta struct { + Name string + Region string + Zone string +} + type K8sIPResolver struct { clientset kubernetes.Interface snapshot clusterSnapshot @@ -56,12 +62,16 @@ type K8sIPResolver struct { shouldResolveDns bool dnsResolvedIps *lrucache.Cache[string, string] podIpsMap sync.Map + nodeInfoMap sync.Map + hostIpsMap sync.Map } type Workload struct { Name string Namespace string Kind string + Zone string + Region string } func NewK8sIPResolver(clientset kubernetes.Interface, resolveDns bool) (*K8sIPResolver, error) { @@ -83,6 +93,8 @@ func NewK8sIPResolver(clientset kubernetes.Interface, resolveDns bool) (*K8sIPRe shouldResolveDns: resolveDns, dnsResolvedIps: dnsCache, podIpsMap: sync.Map{}, + nodeInfoMap: sync.Map{}, + hostIpsMap: sync.Map{}, }, nil } @@ -152,15 +164,16 @@ func (resolver *K8sIPResolver) ResolveIP(ip string) Workload { } } -func (resolver *K8sIPResolver) CacheDNS(ip string, dns string) Workload { - resolver.dnsResolvedIps.Add(ip, dns) - return Workload{ - Name: dns, - Namespace: "external", - Kind: "external", - } +func (resolver *K8sIPResolver) CacheDNS(ip string, dns string) { + resolver.hostIpsMap.Store(ip, dns) } +func (resolver *K8sIPResolver) ResolveHost(ip string) string { + if host, ok := resolver.hostIpsMap.Load(ip); ok { + return host.(string) + } + return "" +} func (resolver *K8sIPResolver) StartWatching() error { // register watchers podsWatcher, err := resolver.clientset.CoreV1().Pods("").Watch(context.Background(), metav1.ListOptions{}) @@ -380,10 +393,23 @@ func (resolver *K8sIPResolver) handlePodWatchEvent(podEvent *watch.Event) { entry := resolver.resolvePodDescriptor(pod) for _, podIp := range pod.Status.PodIPs { resolver.storeWorkloadsIP(podIp.IP, &entry) + nodeInfo, ok := resolver.nodeInfoMap.Load(pod.Spec.NodeName) + region, zone := "", "" + if ok { + meta, ok := nodeInfo.(InstanceMeta) + if ok { + region = meta.Region + zone = meta.Zone + } else { + log.Println("Failed to fetch node info ", ok) + } + } podWorkload := Workload{ Name: pod.Name, Namespace: pod.Namespace, Kind: "pod", + Region: region, + Zone: zone, } resolver.storePodsIP(podIp.IP, &podWorkload) } @@ -396,11 +422,25 @@ func (resolver *K8sIPResolver) handlePodWatchEvent(podEvent *watch.Event) { entry := resolver.resolvePodDescriptor(pod) for _, podIp := range pod.Status.PodIPs { resolver.storeWorkloadsIP(podIp.IP, &entry) + nodeInfo, ok := resolver.nodeInfoMap.Load(pod.Spec.NodeName) + region, zone := "", "" + if ok { + meta, ok := nodeInfo.(InstanceMeta) + if ok { + region = meta.Region + zone = meta.Zone + } + } else { + log.Println("Failed to fetch node info ", ok) + } podWorkload := Workload{ Name: pod.Name, Namespace: pod.Namespace, Kind: "pod", + Region: region, + Zone: zone, } + resolver.storePodsIP(podIp.IP, &podWorkload) } case watch.Deleted: @@ -424,11 +464,16 @@ func (resolver *K8sIPResolver) handleNodeWatchEvent(nodeEvent *watch.Event) { Name: node.Name, Namespace: "node", Kind: "node", + Zone: node.Annotations["topology.kubernetes.io/zone"], + Region: node.Annotations["topology.kubernetes.io/region"], }) } + nodeMetadata := InstanceMeta{Zone: node.Annotations["topology.kubernetes.io/zone"], Region: node.Annotations["topology.kubernetes.io/region"]} + resolver.nodeInfoMap.Store(node.Name, nodeMetadata) case watch.Deleted: if val, ok := nodeEvent.Object.(*v1.Node); ok { resolver.snapshot.Nodes.Delete(val.UID) + resolver.nodeInfoMap.Delete(val.Name) } } } @@ -499,6 +544,8 @@ func (resolver *K8sIPResolver) handleServicesWatchEvent(servicesEvent *watch.Eve Name: service.Name, Namespace: service.Namespace, Kind: "Service", + Zone: "", + Region: "", } // TODO maybe try to match service to workload @@ -578,6 +625,8 @@ func (resolver *K8sIPResolver) getFullClusterSnapshot() error { return errors.New("error getting nodes, aborting snapshot update") } for _, node := range nodes.Items { + nodeMetadata := InstanceMeta{Name: node.Name, Zone: node.ObjectMeta.Labels["topology.kubernetes.io/zone"], Region: node.ObjectMeta.Labels["topology.kubernetes.io/region"]} + resolver.nodeInfoMap.Store(node.Name, nodeMetadata) resolver.snapshot.Nodes.Store(node.UID, node) } @@ -685,10 +734,23 @@ func (resolver *K8sIPResolver) updateIpMapping() { for _, podIp := range pod.Status.PodIPs { // if ip is already in the map, override only if current pod is running resolver.storeWorkloadsIP(podIp.IP, &entry) + nodeInfo, ok := resolver.nodeInfoMap.Load(pod.Spec.NodeName) + region, zone := "", "" + if ok { + meta, ok := nodeInfo.(InstanceMeta) + if ok { + region = meta.Region + zone = meta.Zone + } else { + log.Println("Failed to fetch node info ", ok) + } + } podWorkload := Workload{ Name: pod.Name, Namespace: pod.Namespace, Kind: "pod", + Region: region, + Zone: zone, } resolver.storePodsIP(podIp.IP, &podWorkload) } diff --git a/containers/container.go b/containers/container.go index 83252b79..2069c65c 100644 --- a/containers/container.go +++ b/containers/container.go @@ -142,7 +142,6 @@ type Container struct { done chan struct{} ip_resolver IPResolver - hostIpsMap sync.Map } func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, ip_resolver IPResolver) (*Container, error) { @@ -179,7 +178,6 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host done: make(chan struct{}), ip_resolver: ip_resolver, - hostIpsMap: sync.Map{}, } for _, n := range md.networks { @@ -310,8 +308,9 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) { } if d.dstWorkload.Kind == "external" { - if host, ok := c.hostIpsMap.Load(d.dst); ok { - workload_dest = common.Workload{Name: host.(string), Namespace: "external", Kind: "external"} + host := c.ip_resolver.ResolveHost(d.dst.IP().String()) + if host != "" { + workload_dest = common.Workload{Name: host, Namespace: "external", Kind: "external"} } } ch <- counter(metrics.NetConnectsSuccessful, float64(count), d.src.String(), d.dst.String(), workload_src.Name, workload_src.Namespace, workload_src.Kind, workload_dest.Name, workload_dest.Namespace, workload_dest.Kind, actualDestWorkload.Name, actualDestWorkload.Namespace, actualDestWorkload.Kind) @@ -329,8 +328,9 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) { workload_dest = c.ip_resolver.ResolveIP(d.dst.IP().String()) } if d.dstWorkload.Kind == "external" { - if host, ok := c.hostIpsMap.Load(d.dst); ok { - workload_dest = common.Workload{Name: host.(string), Namespace: "external", Kind: "external"} + host := c.ip_resolver.ResolveHost(d.dst.IP().String()) + if host != "" { + workload_dest = common.Workload{Name: host, Namespace: "external", Kind: "external"} } } ch <- counter(metrics.NetRetransmits, float64(count), d.src.String(), d.dst.String(), workload_src.Name, workload_src.Namespace, workload_src.Kind, workload_dest.Name, workload_dest.Namespace, workload_dest.Kind) @@ -345,8 +345,9 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) { workload_dest := c.ip_resolver.ResolveIP(conn.ActualDest.IP().String()) actualDestWorkload := c.ip_resolver.ResolveActualIP(conn.ActualDest.IP().String()) if workload_dest.Kind == "external" { - if host, ok := c.hostIpsMap.Load(conn.ActualDest.IP().String()); ok { - workload_dest = common.Workload{Name: host.(string), Namespace: "external", Kind: "external"} + host := c.ip_resolver.ResolveHost(conn.ActualDest.IP().String()) + if host != "" { + workload_dest = common.Workload{Name: host, Namespace: "external", Kind: "external"} } } connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest, srcWorkload: workload_src, dstWorkload: workload_dest, actualDestWorkload: actualDestWorkload}]++ @@ -551,14 +552,12 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP } srcWorkload := c.ip_resolver.ResolveIP(src.IP().String()) if ignoreControlPlane(srcWorkload.Name) { - klog.Warningf("Ignoring src workload %s, %s \n", src.IP().String(), srcWorkload.Name) return } actualDst, err := c.getActualDestination(p, src, dst) dstWorkload := c.ip_resolver.ResolveIP(dst.IP().String()) if ignoreControlPlane(dstWorkload.Name) { - klog.Warningf("Ignoring src workload %s, %s \n", dst.IP().String(), dstWorkload.Name) return } if err != nil { @@ -657,7 +656,7 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R host, error := l7.ParseHostFromHttpRequest(string(r.Payload)) if error == nil { conn.dstWorkload.Name = host - c.hostIpsMap.Store(conn.ActualDest.IP().String(), host) + c.ip_resolver.CacheDNS(conn.ActualDest.IP().String(), host) } } stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest, r, conn.srcWorkload, conn.dstWorkload, conn.actualDestWorkload) @@ -693,7 +692,7 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R if r.Response != nil { response = base64.StdEncoding.EncodeToString(r.Response) } - trace.HttpRequest(method, uri, r.Status, r.Duration, r.PayloadSize, payload, headers, response, host) + trace.HttpRequest(method, uri, r.Status, r.Duration, r.PayloadSize, payload, headers, response, host, conn.actualDestWorkload) case l7.ProtocolHTTP2: if conn.http2Parser == nil { conn.http2Parser = l7.NewHttp2Parser() diff --git a/containers/registry.go b/containers/registry.go index d4a97026..a5b0fbf2 100644 --- a/containers/registry.go +++ b/containers/registry.go @@ -34,7 +34,8 @@ type ProcessInfo struct { type IPResolver interface { ResolveIP(string) common.Workload ResolveActualIP(string) common.Workload - CacheDNS(string, string) common.Workload + CacheDNS(string, string) + ResolveHost(string) string StartWatching() error StopWatching() } @@ -207,8 +208,6 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) { case ebpftracer.EventTypeListenOpen: if c := r.getOrCreateContainer(e.Pid); c != nil { c.onListenOpen(e.Pid, e.SrcAddr, false) - } else { - klog.Infoln("TCP listen open from unknown container", e) } case ebpftracer.EventTypeListenClose: if c := r.containersByPid[e.Pid]; c != nil { @@ -219,14 +218,10 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) { if c := r.getOrCreateContainer(e.Pid); c != nil { c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, e.Timestamp, false) c.attachTlsUprobes(r.tracer, e.Pid) - } else { - klog.Infoln("TCP connection from unknown container", e) } case ebpftracer.EventTypeConnectionError: if c := r.getOrCreateContainer(e.Pid); c != nil { c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, 0, true) - } else { - klog.Infoln("TCP connection error from unknown container", e) } case ebpftracer.EventTypeConnectionClose: srcDst := AddrPair{src: e.SrcAddr, dst: e.DstAddr} diff --git a/main.go b/main.go index 2ebfd3f5..5de0ab35 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ import ( "github.com/coroot/coroot-node-agent/flags" "github.com/coroot/coroot-node-agent/logs" "github.com/coroot/coroot-node-agent/node" + "github.com/coroot/coroot-node-agent/node/metadata" "github.com/coroot/coroot-node-agent/profiling" "github.com/coroot/coroot-node-agent/prom" "github.com/coroot/coroot-node-agent/tracing" @@ -148,7 +149,8 @@ func main() { whitelistNodeExternalNetworks() machineId := machineID() - tracing.Init(machineId, hostname, version) + md := metadata.GetInstanceMetadata() + tracing.Init(machineId, hostname, version, md) logs.Init(machineId, hostname, version) registry := prometheus.NewRegistry() @@ -156,7 +158,7 @@ func main() { registerer.MustRegister(info("node_agent_info", version)) - if err := registerer.Register(node.NewCollector(hostname, kv)); err != nil { + if err := registerer.Register(node.NewCollector(hostname, kv, md)); err != nil { klog.Exitln(err) } diff --git a/node/collector.go b/node/collector.go index fc86458f..d2358be4 100644 --- a/node/collector.go +++ b/node/collector.go @@ -152,8 +152,7 @@ type Collector struct { instanceMetadata *metadata.CloudMetadata } -func NewCollector(hostname, kernelVersion string) *Collector { - md := metadata.GetInstanceMetadata() +func NewCollector(hostname, kernelVersion string, md *metadata.CloudMetadata) *Collector { klog.Infof("instance metadata: %+v", md) return &Collector{ hostname: hostname, diff --git a/tracing/tracing.go b/tracing/tracing.go index dac1ca5b..e184d6ac 100644 --- a/tracing/tracing.go +++ b/tracing/tracing.go @@ -8,6 +8,7 @@ import ( "github.com/coroot/coroot-node-agent/common" "github.com/coroot/coroot-node-agent/ebpftracer/l7" "github.com/coroot/coroot-node-agent/flags" + "github.com/coroot/coroot-node-agent/node/metadata" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" @@ -25,11 +26,13 @@ const ( ) var ( - tracer func(containerId string) trace.Tracer + tracer func(containerId string) trace.Tracer + instanceMetadata *metadata.CloudMetadata ) -func Init(machineId, hostname, version string) { +func Init(machineId, hostname, version string, md *metadata.CloudMetadata) { endpointUrl := *flags.TracesEndpoint + instanceMetadata = md if endpointUrl == nil { klog.Infoln("no OpenTelemetry traces collector endpoint configured") return @@ -107,7 +110,7 @@ func (t *Trace) createSpan(name string, duration time.Duration, error bool, attr span.End(trace.WithTimestamp(end)) } -func (t *Trace) HttpRequest(method, path string, status l7.Status, duration time.Duration, requestSize uint64, payload string, headers string, response string, host string) { +func (t *Trace) HttpRequest(method, path string, status l7.Status, duration time.Duration, requestSize uint64, payload string, headers string, response string, host string, destWorkload common.Workload) { if t == nil || method == "" { return } @@ -123,6 +126,15 @@ func (t *Trace) HttpRequest(method, path string, status l7.Status, duration time attribute.Key("http.headers").String(headers), attribute.Key("http.response").String(response), attribute.Key("http.path").String(path), + attribute.Key("source.cloud.region").String(instanceMetadata.Region), + attribute.Key("source.cloud.provider").String(string(instanceMetadata.Provider)), + attribute.Key("source.cloud.zone").String(instanceMetadata.AvailabilityZone), + attribute.Key("source.cloud.zone_id").String(instanceMetadata.AvailabilityZoneId), + attribute.Key("source.cloud.instance_id").String(instanceMetadata.InstanceId), + attribute.Key("source.cloud.instance_type").String(instanceMetadata.InstanceType), + attribute.Key("source.cloud.region").String(instanceMetadata.Region), + attribute.Key("destination.cloud.region").String(destWorkload.Region), + attribute.Key("destination.cloud.zone").String(destWorkload.Zone), ) }