Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 69 additions & 7 deletions common/ip_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ type clusterSnapshot struct {
PodDescriptors sync.Map // map[types.UID]Workload
}

type InstanceMeta struct {
Name string
Region string
Zone string
}

type K8sIPResolver struct {
clientset kubernetes.Interface
snapshot clusterSnapshot
Expand All @@ -56,12 +62,16 @@ type K8sIPResolver struct {
shouldResolveDns bool
dnsResolvedIps *lrucache.Cache[string, string]
podIpsMap sync.Map
nodeInfoMap sync.Map
hostIpsMap sync.Map
}

type Workload struct {
Name string
Namespace string
Kind string
Zone string
Region string
}

func NewK8sIPResolver(clientset kubernetes.Interface, resolveDns bool) (*K8sIPResolver, error) {
Expand All @@ -83,6 +93,8 @@ func NewK8sIPResolver(clientset kubernetes.Interface, resolveDns bool) (*K8sIPRe
shouldResolveDns: resolveDns,
dnsResolvedIps: dnsCache,
podIpsMap: sync.Map{},
nodeInfoMap: sync.Map{},
hostIpsMap: sync.Map{},
}, nil
}

Expand Down Expand Up @@ -152,15 +164,16 @@ func (resolver *K8sIPResolver) ResolveIP(ip string) Workload {
}
}

func (resolver *K8sIPResolver) CacheDNS(ip string, dns string) Workload {
resolver.dnsResolvedIps.Add(ip, dns)
return Workload{
Name: dns,
Namespace: "external",
Kind: "external",
}
func (resolver *K8sIPResolver) CacheDNS(ip string, dns string) {
resolver.hostIpsMap.Store(ip, dns)
}

func (resolver *K8sIPResolver) ResolveHost(ip string) string {
if host, ok := resolver.hostIpsMap.Load(ip); ok {
return host.(string)
}
return ""
}
func (resolver *K8sIPResolver) StartWatching() error {
// register watchers
podsWatcher, err := resolver.clientset.CoreV1().Pods("").Watch(context.Background(), metav1.ListOptions{})
Expand Down Expand Up @@ -380,10 +393,23 @@ func (resolver *K8sIPResolver) handlePodWatchEvent(podEvent *watch.Event) {
entry := resolver.resolvePodDescriptor(pod)
for _, podIp := range pod.Status.PodIPs {
resolver.storeWorkloadsIP(podIp.IP, &entry)
nodeInfo, ok := resolver.nodeInfoMap.Load(pod.Spec.NodeName)
region, zone := "", ""
if ok {
meta, ok := nodeInfo.(InstanceMeta)
if ok {
region = meta.Region
zone = meta.Zone
} else {
log.Println("Failed to fetch node info ", ok)
}
}
podWorkload := Workload{
Name: pod.Name,
Namespace: pod.Namespace,
Kind: "pod",
Region: region,
Zone: zone,
}
resolver.storePodsIP(podIp.IP, &podWorkload)
}
Expand All @@ -396,11 +422,25 @@ func (resolver *K8sIPResolver) handlePodWatchEvent(podEvent *watch.Event) {
entry := resolver.resolvePodDescriptor(pod)
for _, podIp := range pod.Status.PodIPs {
resolver.storeWorkloadsIP(podIp.IP, &entry)
nodeInfo, ok := resolver.nodeInfoMap.Load(pod.Spec.NodeName)
region, zone := "", ""
if ok {
meta, ok := nodeInfo.(InstanceMeta)
if ok {
region = meta.Region
zone = meta.Zone
}
} else {
log.Println("Failed to fetch node info ", ok)
}
podWorkload := Workload{
Name: pod.Name,
Namespace: pod.Namespace,
Kind: "pod",
Region: region,
Zone: zone,
}

resolver.storePodsIP(podIp.IP, &podWorkload)
}
case watch.Deleted:
Expand All @@ -424,11 +464,16 @@ func (resolver *K8sIPResolver) handleNodeWatchEvent(nodeEvent *watch.Event) {
Name: node.Name,
Namespace: "node",
Kind: "node",
Zone: node.Annotations["topology.kubernetes.io/zone"],
Region: node.Annotations["topology.kubernetes.io/region"],
})
}
nodeMetadata := InstanceMeta{Zone: node.Annotations["topology.kubernetes.io/zone"], Region: node.Annotations["topology.kubernetes.io/region"]}
resolver.nodeInfoMap.Store(node.Name, nodeMetadata)
case watch.Deleted:
if val, ok := nodeEvent.Object.(*v1.Node); ok {
resolver.snapshot.Nodes.Delete(val.UID)
resolver.nodeInfoMap.Delete(val.Name)
}
}
}
Expand Down Expand Up @@ -499,6 +544,8 @@ func (resolver *K8sIPResolver) handleServicesWatchEvent(servicesEvent *watch.Eve
Name: service.Name,
Namespace: service.Namespace,
Kind: "Service",
Zone: "",
Region: "",
}

// TODO maybe try to match service to workload
Expand Down Expand Up @@ -578,6 +625,8 @@ func (resolver *K8sIPResolver) getFullClusterSnapshot() error {
return errors.New("error getting nodes, aborting snapshot update")
}
for _, node := range nodes.Items {
nodeMetadata := InstanceMeta{Name: node.Name, Zone: node.ObjectMeta.Labels["topology.kubernetes.io/zone"], Region: node.ObjectMeta.Labels["topology.kubernetes.io/region"]}
resolver.nodeInfoMap.Store(node.Name, nodeMetadata)
resolver.snapshot.Nodes.Store(node.UID, node)
}

Expand Down Expand Up @@ -685,10 +734,23 @@ func (resolver *K8sIPResolver) updateIpMapping() {
for _, podIp := range pod.Status.PodIPs {
// if ip is already in the map, override only if current pod is running
resolver.storeWorkloadsIP(podIp.IP, &entry)
nodeInfo, ok := resolver.nodeInfoMap.Load(pod.Spec.NodeName)
region, zone := "", ""
if ok {
meta, ok := nodeInfo.(InstanceMeta)
if ok {
region = meta.Region
zone = meta.Zone
} else {
log.Println("Failed to fetch node info ", ok)
}
}
podWorkload := Workload{
Name: pod.Name,
Namespace: pod.Namespace,
Kind: "pod",
Region: region,
Zone: zone,
}
resolver.storePodsIP(podIp.IP, &podWorkload)
}
Expand Down
23 changes: 11 additions & 12 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ type Container struct {

done chan struct{}
ip_resolver IPResolver
hostIpsMap sync.Map
}

func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, ip_resolver IPResolver) (*Container, error) {
Expand Down Expand Up @@ -179,7 +178,6 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host

done: make(chan struct{}),
ip_resolver: ip_resolver,
hostIpsMap: sync.Map{},
}

for _, n := range md.networks {
Expand Down Expand Up @@ -310,8 +308,9 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
}

if d.dstWorkload.Kind == "external" {
if host, ok := c.hostIpsMap.Load(d.dst); ok {
workload_dest = common.Workload{Name: host.(string), Namespace: "external", Kind: "external"}
host := c.ip_resolver.ResolveHost(d.dst.IP().String())
if host != "" {
workload_dest = common.Workload{Name: host, Namespace: "external", Kind: "external"}
}
}
ch <- counter(metrics.NetConnectsSuccessful, float64(count), d.src.String(), d.dst.String(), workload_src.Name, workload_src.Namespace, workload_src.Kind, workload_dest.Name, workload_dest.Namespace, workload_dest.Kind, actualDestWorkload.Name, actualDestWorkload.Namespace, actualDestWorkload.Kind)
Expand All @@ -329,8 +328,9 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
workload_dest = c.ip_resolver.ResolveIP(d.dst.IP().String())
}
if d.dstWorkload.Kind == "external" {
if host, ok := c.hostIpsMap.Load(d.dst); ok {
workload_dest = common.Workload{Name: host.(string), Namespace: "external", Kind: "external"}
host := c.ip_resolver.ResolveHost(d.dst.IP().String())
if host != "" {
workload_dest = common.Workload{Name: host, Namespace: "external", Kind: "external"}
}
}
ch <- counter(metrics.NetRetransmits, float64(count), d.src.String(), d.dst.String(), workload_src.Name, workload_src.Namespace, workload_src.Kind, workload_dest.Name, workload_dest.Namespace, workload_dest.Kind)
Expand All @@ -345,8 +345,9 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
workload_dest := c.ip_resolver.ResolveIP(conn.ActualDest.IP().String())
actualDestWorkload := c.ip_resolver.ResolveActualIP(conn.ActualDest.IP().String())
if workload_dest.Kind == "external" {
if host, ok := c.hostIpsMap.Load(conn.ActualDest.IP().String()); ok {
workload_dest = common.Workload{Name: host.(string), Namespace: "external", Kind: "external"}
host := c.ip_resolver.ResolveHost(conn.ActualDest.IP().String())
if host != "" {
workload_dest = common.Workload{Name: host, Namespace: "external", Kind: "external"}
}
}
connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest, srcWorkload: workload_src, dstWorkload: workload_dest, actualDestWorkload: actualDestWorkload}]++
Expand Down Expand Up @@ -551,14 +552,12 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
}
srcWorkload := c.ip_resolver.ResolveIP(src.IP().String())
if ignoreControlPlane(srcWorkload.Name) {
klog.Warningf("Ignoring src workload %s, %s \n", src.IP().String(), srcWorkload.Name)
return
}
actualDst, err := c.getActualDestination(p, src, dst)
dstWorkload := c.ip_resolver.ResolveIP(dst.IP().String())

if ignoreControlPlane(dstWorkload.Name) {
klog.Warningf("Ignoring src workload %s, %s \n", dst.IP().String(), dstWorkload.Name)
return
}
if err != nil {
Expand Down Expand Up @@ -657,7 +656,7 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
host, error := l7.ParseHostFromHttpRequest(string(r.Payload))
if error == nil {
conn.dstWorkload.Name = host
c.hostIpsMap.Store(conn.ActualDest.IP().String(), host)
c.ip_resolver.CacheDNS(conn.ActualDest.IP().String(), host)
}
}
stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest, r, conn.srcWorkload, conn.dstWorkload, conn.actualDestWorkload)
Expand Down Expand Up @@ -693,7 +692,7 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
if r.Response != nil {
response = base64.StdEncoding.EncodeToString(r.Response)
}
trace.HttpRequest(method, uri, r.Status, r.Duration, r.PayloadSize, payload, headers, response, host)
trace.HttpRequest(method, uri, r.Status, r.Duration, r.PayloadSize, payload, headers, response, host, conn.actualDestWorkload)
case l7.ProtocolHTTP2:
if conn.http2Parser == nil {
conn.http2Parser = l7.NewHttp2Parser()
Expand Down
9 changes: 2 additions & 7 deletions containers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ type ProcessInfo struct {
type IPResolver interface {
ResolveIP(string) common.Workload
ResolveActualIP(string) common.Workload
CacheDNS(string, string) common.Workload
CacheDNS(string, string)
ResolveHost(string) string
StartWatching() error
StopWatching()
}
Expand Down Expand Up @@ -207,8 +208,6 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
case ebpftracer.EventTypeListenOpen:
if c := r.getOrCreateContainer(e.Pid); c != nil {
c.onListenOpen(e.Pid, e.SrcAddr, false)
} else {
klog.Infoln("TCP listen open from unknown container", e)
}
case ebpftracer.EventTypeListenClose:
if c := r.containersByPid[e.Pid]; c != nil {
Expand All @@ -219,14 +218,10 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
if c := r.getOrCreateContainer(e.Pid); c != nil {
c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, e.Timestamp, false)
c.attachTlsUprobes(r.tracer, e.Pid)
} else {
klog.Infoln("TCP connection from unknown container", e)
}
case ebpftracer.EventTypeConnectionError:
if c := r.getOrCreateContainer(e.Pid); c != nil {
c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, 0, true)
} else {
klog.Infoln("TCP connection error from unknown container", e)
}
case ebpftracer.EventTypeConnectionClose:
srcDst := AddrPair{src: e.SrcAddr, dst: e.DstAddr}
Expand Down
6 changes: 4 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/coroot/coroot-node-agent/flags"
"github.com/coroot/coroot-node-agent/logs"
"github.com/coroot/coroot-node-agent/node"
"github.com/coroot/coroot-node-agent/node/metadata"
"github.com/coroot/coroot-node-agent/profiling"
"github.com/coroot/coroot-node-agent/prom"
"github.com/coroot/coroot-node-agent/tracing"
Expand Down Expand Up @@ -148,15 +149,16 @@ func main() {
whitelistNodeExternalNetworks()

machineId := machineID()
tracing.Init(machineId, hostname, version)
md := metadata.GetInstanceMetadata()
tracing.Init(machineId, hostname, version, md)
logs.Init(machineId, hostname, version)

registry := prometheus.NewRegistry()
registerer := prometheus.WrapRegistererWith(prometheus.Labels{"machine_id": machineId}, registry)

registerer.MustRegister(info("node_agent_info", version))

if err := registerer.Register(node.NewCollector(hostname, kv)); err != nil {
if err := registerer.Register(node.NewCollector(hostname, kv, md)); err != nil {
klog.Exitln(err)
}

Expand Down
3 changes: 1 addition & 2 deletions node/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ type Collector struct {
instanceMetadata *metadata.CloudMetadata
}

func NewCollector(hostname, kernelVersion string) *Collector {
md := metadata.GetInstanceMetadata()
func NewCollector(hostname, kernelVersion string, md *metadata.CloudMetadata) *Collector {
klog.Infof("instance metadata: %+v", md)
return &Collector{
hostname: hostname,
Expand Down
18 changes: 15 additions & 3 deletions tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/coroot/coroot-node-agent/common"
"github.com/coroot/coroot-node-agent/ebpftracer/l7"
"github.com/coroot/coroot-node-agent/flags"
"github.com/coroot/coroot-node-agent/node/metadata"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
Expand All @@ -25,11 +26,13 @@ const (
)

var (
tracer func(containerId string) trace.Tracer
tracer func(containerId string) trace.Tracer
instanceMetadata *metadata.CloudMetadata
)

func Init(machineId, hostname, version string) {
func Init(machineId, hostname, version string, md *metadata.CloudMetadata) {
endpointUrl := *flags.TracesEndpoint
instanceMetadata = md
if endpointUrl == nil {
klog.Infoln("no OpenTelemetry traces collector endpoint configured")
return
Expand Down Expand Up @@ -107,7 +110,7 @@ func (t *Trace) createSpan(name string, duration time.Duration, error bool, attr
span.End(trace.WithTimestamp(end))
}

func (t *Trace) HttpRequest(method, path string, status l7.Status, duration time.Duration, requestSize uint64, payload string, headers string, response string, host string) {
func (t *Trace) HttpRequest(method, path string, status l7.Status, duration time.Duration, requestSize uint64, payload string, headers string, response string, host string, destWorkload common.Workload) {
if t == nil || method == "" {
return
}
Expand All @@ -123,6 +126,15 @@ func (t *Trace) HttpRequest(method, path string, status l7.Status, duration time
attribute.Key("http.headers").String(headers),
attribute.Key("http.response").String(response),
attribute.Key("http.path").String(path),
attribute.Key("source.cloud.region").String(instanceMetadata.Region),
attribute.Key("source.cloud.provider").String(string(instanceMetadata.Provider)),
attribute.Key("source.cloud.zone").String(instanceMetadata.AvailabilityZone),
attribute.Key("source.cloud.zone_id").String(instanceMetadata.AvailabilityZoneId),
attribute.Key("source.cloud.instance_id").String(instanceMetadata.InstanceId),
attribute.Key("source.cloud.instance_type").String(instanceMetadata.InstanceType),
attribute.Key("source.cloud.region").String(instanceMetadata.Region),
attribute.Key("destination.cloud.region").String(destWorkload.Region),
attribute.Key("destination.cloud.zone").String(destWorkload.Zone),
)
}

Expand Down