From 298c28ffb653c9525dcb8585aa9d48f4969017ea Mon Sep 17 00:00:00 2001 From: Mayank Pande Date: Mon, 29 Apr 2024 18:31:14 +0530 Subject: [PATCH 1/5] fix: removed logging (#44) --- containers/registry.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/containers/registry.go b/containers/registry.go index d4a97026..c3396deb 100644 --- a/containers/registry.go +++ b/containers/registry.go @@ -207,8 +207,6 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) { case ebpftracer.EventTypeListenOpen: if c := r.getOrCreateContainer(e.Pid); c != nil { c.onListenOpen(e.Pid, e.SrcAddr, false) - } else { - klog.Infoln("TCP listen open from unknown container", e) } case ebpftracer.EventTypeListenClose: if c := r.containersByPid[e.Pid]; c != nil { @@ -219,14 +217,10 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) { if c := r.getOrCreateContainer(e.Pid); c != nil { c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, e.Timestamp, false) c.attachTlsUprobes(r.tracer, e.Pid) - } else { - klog.Infoln("TCP connection from unknown container", e) } case ebpftracer.EventTypeConnectionError: if c := r.getOrCreateContainer(e.Pid); c != nil { c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, 0, true) - } else { - klog.Infoln("TCP connection error from unknown container", e) } case ebpftracer.EventTypeConnectionClose: srcDst := AddrPair{src: e.SrcAddr, dst: e.DstAddr} From f5d1624e26f6346f1ad41d54f399a200afafa5e3 Mon Sep 17 00:00:00 2001 From: Mayank Pande Date: Mon, 6 May 2024 15:16:47 +0530 Subject: [PATCH 2/5] feat: add network az and region info (#45) --- common/ip_resolver.go | 40 ++++++++++++++++++++++++++++++++++++++++ containers/container.go | 20 +++++++++++--------- tracing/tracing.go | 12 +++++++++++- 3 files changed, 62 insertions(+), 10 deletions(-) diff --git a/common/ip_resolver.go b/common/ip_resolver.go index c7b1714d..c7b89955 100644 --- a/common/ip_resolver.go +++ b/common/ip_resolver.go @@ -48,6 +48,12 @@ type clusterSnapshot struct { PodDescriptors sync.Map // map[types.UID]Workload } +type InstanceMeta struct { + Name string + Region string + Zone string +} + type K8sIPResolver struct { clientset kubernetes.Interface snapshot clusterSnapshot @@ -56,12 +62,15 @@ type K8sIPResolver struct { shouldResolveDns bool dnsResolvedIps *lrucache.Cache[string, string] podIpsMap sync.Map + nodeInfoMap sync.Map } type Workload struct { Name string Namespace string Kind string + Zone string + Region string } func NewK8sIPResolver(clientset kubernetes.Interface, resolveDns bool) (*K8sIPResolver, error) { @@ -83,6 +92,7 @@ func NewK8sIPResolver(clientset kubernetes.Interface, resolveDns bool) (*K8sIPRe shouldResolveDns: resolveDns, dnsResolvedIps: dnsCache, podIpsMap: sync.Map{}, + nodeInfoMap: sync.Map{}, }, nil } @@ -380,10 +390,21 @@ func (resolver *K8sIPResolver) handlePodWatchEvent(podEvent *watch.Event) { entry := resolver.resolvePodDescriptor(pod) for _, podIp := range pod.Status.PodIPs { resolver.storeWorkloadsIP(podIp.IP, &entry) + nodeInfo, err := resolver.nodeInfoMap.Load(pod.Spec.NodeName) + region, zone := "", "" + if !err { + meta, ok := nodeInfo.(InstanceMeta) + if ok { + region = meta.Region + zone = meta.Zone + } + } podWorkload := Workload{ Name: pod.Name, Namespace: pod.Namespace, Kind: "pod", + Region: region, + Zone: zone, } resolver.storePodsIP(podIp.IP, &podWorkload) } @@ -396,11 +417,23 @@ func (resolver *K8sIPResolver) handlePodWatchEvent(podEvent *watch.Event) { entry := resolver.resolvePodDescriptor(pod) for _, podIp := range pod.Status.PodIPs { resolver.storeWorkloadsIP(podIp.IP, &entry) + nodeInfo, err := resolver.nodeInfoMap.Load(pod.Spec.NodeName) + region, zone := "", "" + if !err { + meta, ok := nodeInfo.(InstanceMeta) + if ok { + region = meta.Region + zone = meta.Zone + } + } podWorkload := Workload{ Name: pod.Name, Namespace: pod.Namespace, Kind: "pod", + Region: region, + Zone: zone, } + resolver.storePodsIP(podIp.IP, &podWorkload) } case watch.Deleted: @@ -424,11 +457,16 @@ func (resolver *K8sIPResolver) handleNodeWatchEvent(nodeEvent *watch.Event) { Name: node.Name, Namespace: "node", Kind: "node", + Zone: node.Annotations["topology.kubernetes.io/zone"], + Region: node.Annotations["topology.kubernetes.io/region"], }) } + nodeMetadata := &InstanceMeta{Zone: node.Annotations["topology.kubernetes.io/zone"], Region: node.Annotations["topology.kubernetes.io/region"]} + resolver.nodeInfoMap.Store(node.Name, nodeMetadata) case watch.Deleted: if val, ok := nodeEvent.Object.(*v1.Node); ok { resolver.snapshot.Nodes.Delete(val.UID) + resolver.nodeInfoMap.Delete(val.Name) } } } @@ -499,6 +537,8 @@ func (resolver *K8sIPResolver) handleServicesWatchEvent(servicesEvent *watch.Eve Name: service.Name, Namespace: service.Namespace, Kind: "Service", + Zone: "", + Region: "", } // TODO maybe try to match service to workload diff --git a/containers/container.go b/containers/container.go index 83252b79..f1f85f89 100644 --- a/containers/container.go +++ b/containers/container.go @@ -17,6 +17,7 @@ import ( "github.com/coroot/coroot-node-agent/flags" "github.com/coroot/coroot-node-agent/logs" "github.com/coroot/coroot-node-agent/node" + "github.com/coroot/coroot-node-agent/node/metadata" "github.com/coroot/coroot-node-agent/pinger" "github.com/coroot/coroot-node-agent/proc" "github.com/coroot/coroot-node-agent/tracing" @@ -140,12 +141,14 @@ type Container struct { lock sync.RWMutex - done chan struct{} - ip_resolver IPResolver - hostIpsMap sync.Map + done chan struct{} + ip_resolver IPResolver + hostIpsMap sync.Map + instanceMetadata *metadata.CloudMetadata } func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, ip_resolver IPResolver) (*Container, error) { + instanceMetadata := metadata.GetInstanceMetadata() netNs, err := proc.GetNetNs(pid) if err != nil { return nil, err @@ -177,9 +180,10 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host hostConntrack: hostConntrack, - done: make(chan struct{}), - ip_resolver: ip_resolver, - hostIpsMap: sync.Map{}, + done: make(chan struct{}), + ip_resolver: ip_resolver, + hostIpsMap: sync.Map{}, + instanceMetadata: instanceMetadata, } for _, n := range md.networks { @@ -551,14 +555,12 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP } srcWorkload := c.ip_resolver.ResolveIP(src.IP().String()) if ignoreControlPlane(srcWorkload.Name) { - klog.Warningf("Ignoring src workload %s, %s \n", src.IP().String(), srcWorkload.Name) return } actualDst, err := c.getActualDestination(p, src, dst) dstWorkload := c.ip_resolver.ResolveIP(dst.IP().String()) if ignoreControlPlane(dstWorkload.Name) { - klog.Warningf("Ignoring src workload %s, %s \n", dst.IP().String(), dstWorkload.Name) return } if err != nil { @@ -693,7 +695,7 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R if r.Response != nil { response = base64.StdEncoding.EncodeToString(r.Response) } - trace.HttpRequest(method, uri, r.Status, r.Duration, r.PayloadSize, payload, headers, response, host) + trace.HttpRequest(method, uri, r.Status, r.Duration, r.PayloadSize, payload, headers, response, host, *c.instanceMetadata, conn.actualDestWorkload) case l7.ProtocolHTTP2: if conn.http2Parser == nil { conn.http2Parser = l7.NewHttp2Parser() diff --git a/tracing/tracing.go b/tracing/tracing.go index dac1ca5b..3a3d4d7f 100644 --- a/tracing/tracing.go +++ b/tracing/tracing.go @@ -8,6 +8,7 @@ import ( "github.com/coroot/coroot-node-agent/common" "github.com/coroot/coroot-node-agent/ebpftracer/l7" "github.com/coroot/coroot-node-agent/flags" + "github.com/coroot/coroot-node-agent/node/metadata" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" @@ -107,7 +108,7 @@ func (t *Trace) createSpan(name string, duration time.Duration, error bool, attr span.End(trace.WithTimestamp(end)) } -func (t *Trace) HttpRequest(method, path string, status l7.Status, duration time.Duration, requestSize uint64, payload string, headers string, response string, host string) { +func (t *Trace) HttpRequest(method, path string, status l7.Status, duration time.Duration, requestSize uint64, payload string, headers string, response string, host string, instanceMetadata metadata.CloudMetadata, destWorkload common.Workload) { if t == nil || method == "" { return } @@ -123,6 +124,15 @@ func (t *Trace) HttpRequest(method, path string, status l7.Status, duration time attribute.Key("http.headers").String(headers), attribute.Key("http.response").String(response), attribute.Key("http.path").String(path), + attribute.Key("source.cloud.region").String(instanceMetadata.Region), + attribute.Key("source.cloud.provider").String(string(instanceMetadata.Provider)), + attribute.Key("source.cloud.zone").String(instanceMetadata.AvailabilityZone), + attribute.Key("source.cloud.zone_id").String(instanceMetadata.AvailabilityZoneId), + attribute.Key("source.cloud.instance_id").String(instanceMetadata.InstanceId), + attribute.Key("source.cloud.instance_type").String(instanceMetadata.InstanceType), + attribute.Key("source.cloud.region").String(instanceMetadata.Region), + attribute.Key("destination.cloud.region").String(destWorkload.Region), + attribute.Key("destination.cloud.zone").String(destWorkload.Zone), ) } From 9b9e53013257e4ab94e25de6b0e7e0d9000aa1ab Mon Sep 17 00:00:00 2001 From: Mayank Pande Date: Fri, 17 May 2024 17:08:37 +0530 Subject: [PATCH 3/5] fix: fix for az info parsing --- common/ip_resolver.go | 29 ++++++++++++++++++++++++----- containers/container.go | 4 ++-- containers/registry.go | 9 ++++++--- 3 files changed, 32 insertions(+), 10 deletions(-) diff --git a/common/ip_resolver.go b/common/ip_resolver.go index c7b89955..b4bb182c 100644 --- a/common/ip_resolver.go +++ b/common/ip_resolver.go @@ -390,13 +390,15 @@ func (resolver *K8sIPResolver) handlePodWatchEvent(podEvent *watch.Event) { entry := resolver.resolvePodDescriptor(pod) for _, podIp := range pod.Status.PodIPs { resolver.storeWorkloadsIP(podIp.IP, &entry) - nodeInfo, err := resolver.nodeInfoMap.Load(pod.Spec.NodeName) + nodeInfo, ok := resolver.nodeInfoMap.Load(pod.Spec.NodeName) region, zone := "", "" - if !err { + if ok { meta, ok := nodeInfo.(InstanceMeta) if ok { region = meta.Region zone = meta.Zone + } else { + log.Println("Failed to fetch node info ", ok) } } podWorkload := Workload{ @@ -417,14 +419,16 @@ func (resolver *K8sIPResolver) handlePodWatchEvent(podEvent *watch.Event) { entry := resolver.resolvePodDescriptor(pod) for _, podIp := range pod.Status.PodIPs { resolver.storeWorkloadsIP(podIp.IP, &entry) - nodeInfo, err := resolver.nodeInfoMap.Load(pod.Spec.NodeName) + nodeInfo, ok := resolver.nodeInfoMap.Load(pod.Spec.NodeName) region, zone := "", "" - if !err { + if ok { meta, ok := nodeInfo.(InstanceMeta) if ok { region = meta.Region zone = meta.Zone } + } else { + log.Println("Failed to fetch node info ", ok) } podWorkload := Workload{ Name: pod.Name, @@ -461,7 +465,7 @@ func (resolver *K8sIPResolver) handleNodeWatchEvent(nodeEvent *watch.Event) { Region: node.Annotations["topology.kubernetes.io/region"], }) } - nodeMetadata := &InstanceMeta{Zone: node.Annotations["topology.kubernetes.io/zone"], Region: node.Annotations["topology.kubernetes.io/region"]} + nodeMetadata := InstanceMeta{Zone: node.Annotations["topology.kubernetes.io/zone"], Region: node.Annotations["topology.kubernetes.io/region"]} resolver.nodeInfoMap.Store(node.Name, nodeMetadata) case watch.Deleted: if val, ok := nodeEvent.Object.(*v1.Node); ok { @@ -618,6 +622,8 @@ func (resolver *K8sIPResolver) getFullClusterSnapshot() error { return errors.New("error getting nodes, aborting snapshot update") } for _, node := range nodes.Items { + nodeMetadata := InstanceMeta{Name: node.Name, Zone: node.ObjectMeta.Labels["topology.kubernetes.io/zone"], Region: node.ObjectMeta.Labels["topology.kubernetes.io/region"]} + resolver.nodeInfoMap.Store(node.Name, nodeMetadata) resolver.snapshot.Nodes.Store(node.UID, node) } @@ -725,10 +731,23 @@ func (resolver *K8sIPResolver) updateIpMapping() { for _, podIp := range pod.Status.PodIPs { // if ip is already in the map, override only if current pod is running resolver.storeWorkloadsIP(podIp.IP, &entry) + nodeInfo, ok := resolver.nodeInfoMap.Load(pod.Spec.NodeName) + region, zone := "", "" + if ok { + meta, ok := nodeInfo.(InstanceMeta) + if ok { + region = meta.Region + zone = meta.Zone + } else { + log.Println("Failed to fetch node info ", ok) + } + } podWorkload := Workload{ Name: pod.Name, Namespace: pod.Namespace, Kind: "pod", + Region: region, + Zone: zone, } resolver.storePodsIP(podIp.IP, &podWorkload) } diff --git a/containers/container.go b/containers/container.go index f1f85f89..2f8b1b60 100644 --- a/containers/container.go +++ b/containers/container.go @@ -147,8 +147,8 @@ type Container struct { instanceMetadata *metadata.CloudMetadata } -func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, ip_resolver IPResolver) (*Container, error) { - instanceMetadata := metadata.GetInstanceMetadata() +func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, ip_resolver IPResolver, instanceMeta *metadata.CloudMetadata) (*Container, error) { + instanceMetadata := instanceMeta netNs, err := proc.GetNetNs(pid) if err != nil { return nil, err diff --git a/containers/registry.go b/containers/registry.go index c3396deb..e0fb70e7 100644 --- a/containers/registry.go +++ b/containers/registry.go @@ -12,6 +12,7 @@ import ( "github.com/coroot/coroot-node-agent/common" "github.com/coroot/coroot-node-agent/ebpftracer" "github.com/coroot/coroot-node-agent/flags" + "github.com/coroot/coroot-node-agent/node/metadata" "github.com/coroot/coroot-node-agent/proc" "github.com/prometheus/client_golang/prometheus" "github.com/vishvananda/netns" @@ -53,6 +54,7 @@ type Registry struct { processInfoCh chan<- ProcessInfo ip_resolver IPResolver + instanceMeta *metadata.CloudMetadata } func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh chan<- ProcessInfo, ip_resolver *common.K8sIPResolver) (*Registry, error) { @@ -109,8 +111,9 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh processInfoCh: processInfoCh, - tracer: ebpftracer.NewTracer(kernelVersion, *flags.DisableL7Tracing), - ip_resolver: ip_resolver, + tracer: ebpftracer.NewTracer(kernelVersion, *flags.DisableL7Tracing), + ip_resolver: ip_resolver, + instanceMeta: metadata.GetInstanceMetadata(), } go r.handleEvents(r.events) @@ -306,7 +309,7 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container { r.containersByCgroupId[cg.Id] = c return c } - c, err := NewContainer(id, cg, md, r.hostConntrack, pid, r.ip_resolver) + c, err := NewContainer(id, cg, md, r.hostConntrack, pid, r.ip_resolver, r.instanceMeta) if err != nil { klog.Warningf("failed to create container pid=%d cg=%s id=%s: %s", pid, cg.Id, id, err) return nil From 664041245e09a74b09bedc1a2b3e6f3f7dca54a0 Mon Sep 17 00:00:00 2001 From: Mayank Pande Date: Sat, 18 May 2024 10:09:43 +0530 Subject: [PATCH 4/5] fix: fix dns resoliton --- common/ip_resolver.go | 17 ++++++++++------- containers/container.go | 19 ++++++++++--------- containers/registry.go | 3 ++- 3 files changed, 22 insertions(+), 17 deletions(-) diff --git a/common/ip_resolver.go b/common/ip_resolver.go index b4bb182c..3350235b 100644 --- a/common/ip_resolver.go +++ b/common/ip_resolver.go @@ -63,6 +63,7 @@ type K8sIPResolver struct { dnsResolvedIps *lrucache.Cache[string, string] podIpsMap sync.Map nodeInfoMap sync.Map + hostIpsMap sync.Map } type Workload struct { @@ -93,6 +94,7 @@ func NewK8sIPResolver(clientset kubernetes.Interface, resolveDns bool) (*K8sIPRe dnsResolvedIps: dnsCache, podIpsMap: sync.Map{}, nodeInfoMap: sync.Map{}, + hostIpsMap: sync.Map{}, }, nil } @@ -162,15 +164,16 @@ func (resolver *K8sIPResolver) ResolveIP(ip string) Workload { } } -func (resolver *K8sIPResolver) CacheDNS(ip string, dns string) Workload { - resolver.dnsResolvedIps.Add(ip, dns) - return Workload{ - Name: dns, - Namespace: "external", - Kind: "external", - } +func (resolver *K8sIPResolver) CacheDNS(ip string, dns string) { + resolver.hostIpsMap.Store(ip, dns) } +func (resolver *K8sIPResolver) ResolveHost(ip string) string { + if host, ok := resolver.hostIpsMap.Load(ip); ok { + return host.(string) + } + return "" +} func (resolver *K8sIPResolver) StartWatching() error { // register watchers podsWatcher, err := resolver.clientset.CoreV1().Pods("").Watch(context.Background(), metav1.ListOptions{}) diff --git a/containers/container.go b/containers/container.go index 2f8b1b60..03c30e06 100644 --- a/containers/container.go +++ b/containers/container.go @@ -143,7 +143,6 @@ type Container struct { done chan struct{} ip_resolver IPResolver - hostIpsMap sync.Map instanceMetadata *metadata.CloudMetadata } @@ -182,7 +181,6 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host done: make(chan struct{}), ip_resolver: ip_resolver, - hostIpsMap: sync.Map{}, instanceMetadata: instanceMetadata, } @@ -314,8 +312,9 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) { } if d.dstWorkload.Kind == "external" { - if host, ok := c.hostIpsMap.Load(d.dst); ok { - workload_dest = common.Workload{Name: host.(string), Namespace: "external", Kind: "external"} + host := c.ip_resolver.ResolveHost(d.dst.IP().String()) + if host != "" { + workload_dest = common.Workload{Name: host, Namespace: "external", Kind: "external"} } } ch <- counter(metrics.NetConnectsSuccessful, float64(count), d.src.String(), d.dst.String(), workload_src.Name, workload_src.Namespace, workload_src.Kind, workload_dest.Name, workload_dest.Namespace, workload_dest.Kind, actualDestWorkload.Name, actualDestWorkload.Namespace, actualDestWorkload.Kind) @@ -333,8 +332,9 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) { workload_dest = c.ip_resolver.ResolveIP(d.dst.IP().String()) } if d.dstWorkload.Kind == "external" { - if host, ok := c.hostIpsMap.Load(d.dst); ok { - workload_dest = common.Workload{Name: host.(string), Namespace: "external", Kind: "external"} + host := c.ip_resolver.ResolveHost(d.dst.IP().String()) + if host != "" { + workload_dest = common.Workload{Name: host, Namespace: "external", Kind: "external"} } } ch <- counter(metrics.NetRetransmits, float64(count), d.src.String(), d.dst.String(), workload_src.Name, workload_src.Namespace, workload_src.Kind, workload_dest.Name, workload_dest.Namespace, workload_dest.Kind) @@ -349,8 +349,9 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) { workload_dest := c.ip_resolver.ResolveIP(conn.ActualDest.IP().String()) actualDestWorkload := c.ip_resolver.ResolveActualIP(conn.ActualDest.IP().String()) if workload_dest.Kind == "external" { - if host, ok := c.hostIpsMap.Load(conn.ActualDest.IP().String()); ok { - workload_dest = common.Workload{Name: host.(string), Namespace: "external", Kind: "external"} + host := c.ip_resolver.ResolveHost(conn.ActualDest.IP().String()) + if host != "" { + workload_dest = common.Workload{Name: host, Namespace: "external", Kind: "external"} } } connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest, srcWorkload: workload_src, dstWorkload: workload_dest, actualDestWorkload: actualDestWorkload}]++ @@ -659,7 +660,7 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R host, error := l7.ParseHostFromHttpRequest(string(r.Payload)) if error == nil { conn.dstWorkload.Name = host - c.hostIpsMap.Store(conn.ActualDest.IP().String(), host) + c.ip_resolver.CacheDNS(conn.ActualDest.IP().String(), host) } } stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest, r, conn.srcWorkload, conn.dstWorkload, conn.actualDestWorkload) diff --git a/containers/registry.go b/containers/registry.go index e0fb70e7..0399f782 100644 --- a/containers/registry.go +++ b/containers/registry.go @@ -35,7 +35,8 @@ type ProcessInfo struct { type IPResolver interface { ResolveIP(string) common.Workload ResolveActualIP(string) common.Workload - CacheDNS(string, string) common.Workload + CacheDNS(string, string) + ResolveHost(string) string StartWatching() error StopWatching() } From 1a36ca360536cfaece4d733d029bdac9a4fc4732 Mon Sep 17 00:00:00 2001 From: Mayank Pande Date: Sun, 19 May 2024 11:52:06 +0530 Subject: [PATCH 5/5] fix: fix for duplicate stats --- containers/container.go | 16 ++++++---------- containers/registry.go | 9 +++------ main.go | 6 ++++-- node/collector.go | 3 +-- tracing/tracing.go | 8 +++++--- 5 files changed, 19 insertions(+), 23 deletions(-) diff --git a/containers/container.go b/containers/container.go index 03c30e06..2069c65c 100644 --- a/containers/container.go +++ b/containers/container.go @@ -17,7 +17,6 @@ import ( "github.com/coroot/coroot-node-agent/flags" "github.com/coroot/coroot-node-agent/logs" "github.com/coroot/coroot-node-agent/node" - "github.com/coroot/coroot-node-agent/node/metadata" "github.com/coroot/coroot-node-agent/pinger" "github.com/coroot/coroot-node-agent/proc" "github.com/coroot/coroot-node-agent/tracing" @@ -141,13 +140,11 @@ type Container struct { lock sync.RWMutex - done chan struct{} - ip_resolver IPResolver - instanceMetadata *metadata.CloudMetadata + done chan struct{} + ip_resolver IPResolver } -func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, ip_resolver IPResolver, instanceMeta *metadata.CloudMetadata) (*Container, error) { - instanceMetadata := instanceMeta +func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, ip_resolver IPResolver) (*Container, error) { netNs, err := proc.GetNetNs(pid) if err != nil { return nil, err @@ -179,9 +176,8 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host hostConntrack: hostConntrack, - done: make(chan struct{}), - ip_resolver: ip_resolver, - instanceMetadata: instanceMetadata, + done: make(chan struct{}), + ip_resolver: ip_resolver, } for _, n := range md.networks { @@ -696,7 +692,7 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R if r.Response != nil { response = base64.StdEncoding.EncodeToString(r.Response) } - trace.HttpRequest(method, uri, r.Status, r.Duration, r.PayloadSize, payload, headers, response, host, *c.instanceMetadata, conn.actualDestWorkload) + trace.HttpRequest(method, uri, r.Status, r.Duration, r.PayloadSize, payload, headers, response, host, conn.actualDestWorkload) case l7.ProtocolHTTP2: if conn.http2Parser == nil { conn.http2Parser = l7.NewHttp2Parser() diff --git a/containers/registry.go b/containers/registry.go index 0399f782..a5b0fbf2 100644 --- a/containers/registry.go +++ b/containers/registry.go @@ -12,7 +12,6 @@ import ( "github.com/coroot/coroot-node-agent/common" "github.com/coroot/coroot-node-agent/ebpftracer" "github.com/coroot/coroot-node-agent/flags" - "github.com/coroot/coroot-node-agent/node/metadata" "github.com/coroot/coroot-node-agent/proc" "github.com/prometheus/client_golang/prometheus" "github.com/vishvananda/netns" @@ -55,7 +54,6 @@ type Registry struct { processInfoCh chan<- ProcessInfo ip_resolver IPResolver - instanceMeta *metadata.CloudMetadata } func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh chan<- ProcessInfo, ip_resolver *common.K8sIPResolver) (*Registry, error) { @@ -112,9 +110,8 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh processInfoCh: processInfoCh, - tracer: ebpftracer.NewTracer(kernelVersion, *flags.DisableL7Tracing), - ip_resolver: ip_resolver, - instanceMeta: metadata.GetInstanceMetadata(), + tracer: ebpftracer.NewTracer(kernelVersion, *flags.DisableL7Tracing), + ip_resolver: ip_resolver, } go r.handleEvents(r.events) @@ -310,7 +307,7 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container { r.containersByCgroupId[cg.Id] = c return c } - c, err := NewContainer(id, cg, md, r.hostConntrack, pid, r.ip_resolver, r.instanceMeta) + c, err := NewContainer(id, cg, md, r.hostConntrack, pid, r.ip_resolver) if err != nil { klog.Warningf("failed to create container pid=%d cg=%s id=%s: %s", pid, cg.Id, id, err) return nil diff --git a/main.go b/main.go index 2ebfd3f5..5de0ab35 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ import ( "github.com/coroot/coroot-node-agent/flags" "github.com/coroot/coroot-node-agent/logs" "github.com/coroot/coroot-node-agent/node" + "github.com/coroot/coroot-node-agent/node/metadata" "github.com/coroot/coroot-node-agent/profiling" "github.com/coroot/coroot-node-agent/prom" "github.com/coroot/coroot-node-agent/tracing" @@ -148,7 +149,8 @@ func main() { whitelistNodeExternalNetworks() machineId := machineID() - tracing.Init(machineId, hostname, version) + md := metadata.GetInstanceMetadata() + tracing.Init(machineId, hostname, version, md) logs.Init(machineId, hostname, version) registry := prometheus.NewRegistry() @@ -156,7 +158,7 @@ func main() { registerer.MustRegister(info("node_agent_info", version)) - if err := registerer.Register(node.NewCollector(hostname, kv)); err != nil { + if err := registerer.Register(node.NewCollector(hostname, kv, md)); err != nil { klog.Exitln(err) } diff --git a/node/collector.go b/node/collector.go index fc86458f..d2358be4 100644 --- a/node/collector.go +++ b/node/collector.go @@ -152,8 +152,7 @@ type Collector struct { instanceMetadata *metadata.CloudMetadata } -func NewCollector(hostname, kernelVersion string) *Collector { - md := metadata.GetInstanceMetadata() +func NewCollector(hostname, kernelVersion string, md *metadata.CloudMetadata) *Collector { klog.Infof("instance metadata: %+v", md) return &Collector{ hostname: hostname, diff --git a/tracing/tracing.go b/tracing/tracing.go index 3a3d4d7f..e184d6ac 100644 --- a/tracing/tracing.go +++ b/tracing/tracing.go @@ -26,11 +26,13 @@ const ( ) var ( - tracer func(containerId string) trace.Tracer + tracer func(containerId string) trace.Tracer + instanceMetadata *metadata.CloudMetadata ) -func Init(machineId, hostname, version string) { +func Init(machineId, hostname, version string, md *metadata.CloudMetadata) { endpointUrl := *flags.TracesEndpoint + instanceMetadata = md if endpointUrl == nil { klog.Infoln("no OpenTelemetry traces collector endpoint configured") return @@ -108,7 +110,7 @@ func (t *Trace) createSpan(name string, duration time.Duration, error bool, attr span.End(trace.WithTimestamp(end)) } -func (t *Trace) HttpRequest(method, path string, status l7.Status, duration time.Duration, requestSize uint64, payload string, headers string, response string, host string, instanceMetadata metadata.CloudMetadata, destWorkload common.Workload) { +func (t *Trace) HttpRequest(method, path string, status l7.Status, duration time.Duration, requestSize uint64, payload string, headers string, response string, host string, destWorkload common.Workload) { if t == nil || method == "" { return }