Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions common/ip_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ type clusterSnapshot struct {
PodDescriptors sync.Map // map[types.UID]Workload
}

type InstanceMeta struct {
Name string
Region string
Zone string
}

type K8sIPResolver struct {
clientset kubernetes.Interface
snapshot clusterSnapshot
Expand Down Expand Up @@ -416,6 +422,17 @@ func (resolver *K8sIPResolver) handlePodWatchEvent(podEvent *watch.Event) {
}
for _, podIp := range pod.Status.PodIPs {
resolver.storeWorkloadsIP(podIp.IP, &entry)
nodeInfo, ok := resolver.nodeInfoMap.Load(pod.Spec.NodeName)
region, zone := "", ""
if ok {
meta, ok := nodeInfo.(InstanceMeta)
if ok {
region = meta.Region
zone = meta.Zone
} else {
log.Println("Failed to fetch node info ", ok)
}
}
podWorkload := Workload{
Name: pod.Name,
Namespace: pod.Namespace,
Expand Down Expand Up @@ -449,6 +466,17 @@ func (resolver *K8sIPResolver) handlePodWatchEvent(podEvent *watch.Event) {
}
for _, podIp := range pod.Status.PodIPs {
resolver.storeWorkloadsIP(podIp.IP, &entry)
nodeInfo, ok := resolver.nodeInfoMap.Load(pod.Spec.NodeName)
region, zone := "", ""
if ok {
meta, ok := nodeInfo.(InstanceMeta)
if ok {
region = meta.Region
zone = meta.Zone
}
} else {
log.Println("Failed to fetch node info ", ok)
}
podWorkload := Workload{
Name: pod.Name,
Namespace: pod.Namespace,
Expand All @@ -457,6 +485,7 @@ func (resolver *K8sIPResolver) handlePodWatchEvent(podEvent *watch.Event) {
Zone: zone,
Instance: pod.Spec.NodeName,
}

resolver.storePodsIP(podIp.IP, &podWorkload)
}
case watch.Deleted:
Expand Down Expand Up @@ -494,6 +523,8 @@ func (resolver *K8sIPResolver) handleNodeWatchEvent(nodeEvent *watch.Event) {
Instance: node.Name,
})
}
nodeMetadata := InstanceMeta{Zone: node.Annotations["topology.kubernetes.io/zone"], Region: node.Annotations["topology.kubernetes.io/region"]}
resolver.nodeInfoMap.Store(node.Name, nodeMetadata)
case watch.Deleted:
if val, ok := nodeEvent.Object.(*v1.Node); ok {
resolver.snapshot.Nodes.Delete(val.UID)
Expand Down
13 changes: 7 additions & 6 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ type Container struct {

done chan struct{}
ip_resolver IPResolver
hostIpsMap sync.Map
}

func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, registry *Registry) (*Container, error) {
Expand Down Expand Up @@ -329,8 +328,9 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
}

if d.dstWorkload.Kind == "external" {
if host, ok := c.hostIpsMap.Load(d.dst); ok {
workload_dest = common.Workload{Name: host.(string), Namespace: "external", Kind: "external"}
host := c.ip_resolver.ResolveHost(d.dst.IP().String())
if host != "" {
workload_dest = common.Workload{Name: host, Namespace: "external", Kind: "external"}
}
}
ch <- counter(metrics.NetConnectionsSuccessful, float64(stats.Count), d.src.String(), d.dst.String(), workload_src.Name, workload_src.Namespace, workload_src.Kind, workload_dest.Name, workload_dest.Namespace, workload_dest.Kind, actualDestWorkload.Name, actualDestWorkload.Namespace, actualDestWorkload.Kind)
Expand All @@ -355,8 +355,9 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
workload_dest := c.ip_resolver.ResolveIP(conn.ActualDest.IP().String())
actualDestWorkload := c.ip_resolver.ResolveActualIP(conn.ActualDest.IP().String())
if workload_dest.Kind == "external" {
if host, ok := c.hostIpsMap.Load(conn.ActualDest.IP().String()); ok {
workload_dest = common.Workload{Name: host.(string), Namespace: "external", Kind: "external"}
host := c.ip_resolver.ResolveHost(conn.ActualDest.IP().String())
if host != "" {
workload_dest = common.Workload{Name: host, Namespace: "external", Kind: "external"}
}
}
connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest, srcWorkload: workload_src, dstWorkload: workload_dest, actualDestWorkload: actualDestWorkload}]++
Expand Down Expand Up @@ -815,7 +816,7 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
if r.Response != nil {
response = base64.StdEncoding.EncodeToString(r.Response)
}
trace.HttpRequest(method, uri, r.Status, r.Duration, r.PayloadSize, payload, headers, response, host)
trace.HttpRequest(method, uri, r.Status, r.Duration, r.PayloadSize, payload, headers, response, host, conn.actualDestWorkload)
case l7.ProtocolHTTP2:
if conn.http2Parser == nil {
conn.http2Parser = l7.NewHttp2Parser()
Expand Down
3 changes: 2 additions & 1 deletion containers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ type ProcessInfo struct {
type IPResolver interface {
ResolveIP(string) common.Workload
ResolveActualIP(string) common.Workload
CacheDNS(string, string) common.Workload
CacheDNS(string, string)
ResolveHost(string) string
StartWatching() error
StopWatching()
}
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func main() {

registerer.MustRegister(info("node_agent_info", version))

if err := registerer.Register(node.NewCollector(hostname, kv)); err != nil {
if err := registerer.Register(node.NewCollector(hostname, kv, md)); err != nil {
klog.Exitln(err)
}

Expand Down
3 changes: 1 addition & 2 deletions node/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ type Collector struct {
instanceMetadata *metadata.CloudMetadata
}

func NewCollector(hostname, kernelVersion string) *Collector {
md := metadata.GetInstanceMetadata()
func NewCollector(hostname, kernelVersion string, md *metadata.CloudMetadata) *Collector {
klog.Infof("instance metadata: %+v", md)
return &Collector{
hostname: hostname,
Expand Down
6 changes: 4 additions & 2 deletions tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ const (
)

var (
tracer func(containerId string) trace.Tracer
tracer func(containerId string) trace.Tracer
instanceMetadata *metadata.CloudMetadata
)

func Init(machineId, hostname, version string) {
md := metadata.GetInstanceMetadata()
endpointUrl := *flags.TracesEndpoint
instanceMetadata = md
if endpointUrl == nil {
klog.Infoln("no OpenTelemetry traces collector endpoint configured")
return
Expand Down Expand Up @@ -144,7 +146,7 @@ func (t *Trace) createSpan(name string, duration time.Duration, error bool, attr
span.End(trace.WithTimestamp(end))
}

func (t *Trace) HttpRequest(method, path string, status l7.Status, duration time.Duration, requestSize uint64, payload string, headers string, response string, host string) {
func (t *Trace) HttpRequest(method, path string, status l7.Status, duration time.Duration, requestSize uint64, payload string, headers string, response string, host string, destWorkload common.Workload) {
if t == nil || method == "" {
return
}
Expand Down