Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/coroot/coroot-node-agent/flags"
"github.com/coroot/coroot-node-agent/logs"
"github.com/coroot/coroot-node-agent/node"
"github.com/coroot/coroot-node-agent/node/metadata"
"github.com/coroot/coroot-node-agent/pinger"
"github.com/coroot/coroot-node-agent/proc"
"github.com/coroot/coroot-node-agent/tracing"
Expand Down Expand Up @@ -141,13 +140,12 @@ type Container struct {

lock sync.RWMutex

done chan struct{}
ip_resolver IPResolver
hostIpsMap sync.Map
instanceMetadata *metadata.CloudMetadata
done chan struct{}
ip_resolver IPResolver
hostIpsMap sync.Map
}

func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, ip_resolver IPResolver, instanceMetadata *metadata.CloudMetadata) (*Container, error) {
func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, ip_resolver IPResolver) (*Container, error) {
netNs, err := proc.GetNetNs(pid)
if err != nil {
return nil, err
Expand Down Expand Up @@ -179,10 +177,9 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host

hostConntrack: hostConntrack,

done: make(chan struct{}),
ip_resolver: ip_resolver,
hostIpsMap: sync.Map{},
instanceMetadata: instanceMetadata,
done: make(chan struct{}),
ip_resolver: ip_resolver,
hostIpsMap: sync.Map{},
}

for _, n := range md.networks {
Expand Down Expand Up @@ -694,7 +691,7 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
if r.Response != nil {
response = base64.StdEncoding.EncodeToString(r.Response)
}
trace.HttpRequest(method, uri, r.Status, r.Duration, r.PayloadSize, payload, headers, response, host, *c.instanceMetadata, conn.actualDestWorkload)
trace.HttpRequest(method, uri, r.Status, r.Duration, r.PayloadSize, payload, headers, response, host, conn.actualDestWorkload, conn.srcWorkload)
case l7.ProtocolHTTP2:
if conn.http2Parser == nil {
conn.http2Parser = l7.NewHttp2Parser()
Expand Down
17 changes: 7 additions & 10 deletions containers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/coroot/coroot-node-agent/common"
"github.com/coroot/coroot-node-agent/ebpftracer"
"github.com/coroot/coroot-node-agent/flags"
"github.com/coroot/coroot-node-agent/node/metadata"
"github.com/coroot/coroot-node-agent/proc"
"github.com/prometheus/client_golang/prometheus"
"github.com/vishvananda/netns"
Expand Down Expand Up @@ -52,12 +51,11 @@ type Registry struct {
containersByCgroupId map[string]*Container
containersByPid map[uint32]*Container

processInfoCh chan<- ProcessInfo
ip_resolver IPResolver
instanceMetadata *metadata.CloudMetadata
processInfoCh chan<- ProcessInfo
ip_resolver IPResolver
}

func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh chan<- ProcessInfo, ip_resolver *common.K8sIPResolver, md *metadata.CloudMetadata) (*Registry, error) {
func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh chan<- ProcessInfo, ip_resolver *common.K8sIPResolver) (*Registry, error) {
ns, err := proc.GetSelfNetNs()
if err != nil {
return nil, err
Expand Down Expand Up @@ -111,9 +109,8 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh

processInfoCh: processInfoCh,

tracer: ebpftracer.NewTracer(kernelVersion, *flags.DisableL7Tracing),
ip_resolver: ip_resolver,
instanceMetadata: md,
tracer: ebpftracer.NewTracer(kernelVersion, *flags.DisableL7Tracing),
ip_resolver: ip_resolver,
}

go r.handleEvents(r.events)
Expand Down Expand Up @@ -309,7 +306,7 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
r.containersByCgroupId[cg.Id] = c
return c
}
c, err := NewContainer(id, cg, md, r.hostConntrack, pid, r.ip_resolver, r.instanceMetadata)
c, err := NewContainer(id, cg, md, r.hostConntrack, pid, r.ip_resolver)
if err != nil {
klog.Warningf("failed to create container pid=%d cg=%s id=%s: %s", pid, cg.Id, id, err)
return nil
Expand Down Expand Up @@ -348,7 +345,7 @@ func calcId(cg *cgroup.Cgroup, md *ContainerMetadata) ContainerID {
name = "sandbox"
}
if name == "" || name == "POD" { // skip pause containers
return ""
name = "sandbox"
}
return ContainerID(fmt.Sprintf("/k8s/%s/%s/%s", namespace, pod, name))
}
Expand Down
6 changes: 2 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/coroot/coroot-node-agent/flags"
"github.com/coroot/coroot-node-agent/logs"
"github.com/coroot/coroot-node-agent/node"
"github.com/coroot/coroot-node-agent/node/metadata"
"github.com/coroot/coroot-node-agent/profiling"
"github.com/coroot/coroot-node-agent/prom"
"github.com/coroot/coroot-node-agent/tracing"
Expand Down Expand Up @@ -156,14 +155,13 @@ func main() {
registerer := prometheus.WrapRegistererWith(prometheus.Labels{"machine_id": machineId}, registry)

registerer.MustRegister(info("node_agent_info", version))
md := metadata.GetInstanceMetadata()
if err := registerer.Register(node.NewCollector(hostname, kv, md)); err != nil {
if err := registerer.Register(node.NewCollector(hostname, kv)); err != nil {
klog.Exitln(err)
}

processInfoCh := profiling.Init(machineId, hostname)

cr, err := containers.NewRegistry(registerer, kv, processInfoCh, resolver, md)
cr, err := containers.NewRegistry(registerer, kv, processInfoCh, resolver)
if err != nil {
klog.Exitln(err)
}
Expand Down
3 changes: 2 additions & 1 deletion node/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ type Collector struct {
instanceMetadata *metadata.CloudMetadata
}

func NewCollector(hostname, kernelVersion string, md *metadata.CloudMetadata) *Collector {
func NewCollector(hostname, kernelVersion string) *Collector {
md := metadata.GetInstanceMetadata()
klog.Infof("instance metadata: %+v", md)
return &Collector{
hostname: hostname,
Expand Down
12 changes: 3 additions & 9 deletions tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/coroot/coroot-node-agent/common"
"github.com/coroot/coroot-node-agent/ebpftracer/l7"
"github.com/coroot/coroot-node-agent/flags"
"github.com/coroot/coroot-node-agent/node/metadata"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
Expand Down Expand Up @@ -108,7 +107,7 @@ func (t *Trace) createSpan(name string, duration time.Duration, error bool, attr
span.End(trace.WithTimestamp(end))
}

func (t *Trace) HttpRequest(method, path string, status l7.Status, duration time.Duration, requestSize uint64, payload string, headers string, response string, host string, instanceMetadata metadata.CloudMetadata, destWorkload common.Workload) {
func (t *Trace) HttpRequest(method, path string, status l7.Status, duration time.Duration, requestSize uint64, payload string, headers string, response string, host string, srcWorkload common.Workload, destWorkload common.Workload) {
if t == nil || method == "" {
return
}
Expand All @@ -124,13 +123,8 @@ func (t *Trace) HttpRequest(method, path string, status l7.Status, duration time
attribute.Key("http.headers").String(headers),
attribute.Key("http.response").String(response),
attribute.Key("http.path").String(path),
attribute.Key("source.cloud.region").String(instanceMetadata.Region),
attribute.Key("source.cloud.provider").String(string(instanceMetadata.Provider)),
attribute.Key("source.cloud.zone").String(instanceMetadata.AvailabilityZone),
attribute.Key("source.cloud.zone_id").String(instanceMetadata.AvailabilityZoneId),
attribute.Key("source.cloud.instance_id").String(instanceMetadata.InstanceId),
attribute.Key("source.cloud.instance_type").String(instanceMetadata.InstanceType),
attribute.Key("source.cloud.region").String(instanceMetadata.Region),
attribute.Key("source.cloud.region").String(srcWorkload.Region),
attribute.Key("source.cloud.zone").String(srcWorkload.Zone),
attribute.Key("destination.cloud.region").String(destWorkload.Region),
attribute.Key("destination.cloud.zone").String(destWorkload.Zone),
)
Expand Down