Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
06b9314
fix: fix for garbage data and host mapping
mayankpande88 Feb 19, 2025
30d70a0
Merge pull request #106 from nudgebee/fix-host-mapping
blue4209211 Feb 19, 2025
ad0ce68
fix: add trace in alert label
RamanKharchee Mar 4, 2025
a867d9c
fix: made ParseHTTPRequest http request one time
RamanKharchee Mar 4, 2025
eccd213
Merge pull request #107 from nudgebee/trace-label
blue4209211 Mar 4, 2025
44c1a51
build(deps): bump golang.org/x/net from 0.33.0 to 0.36.0
dependabot[bot] Mar 13, 2025
f003248
Merge pull request #108 from nudgebee/dependabot/go_modules/golang.or…
blue4209211 Mar 13, 2025
5ebe0d1
chore: fix updated go.mod (#109)
mayankpande88 Mar 13, 2025
d186b6a
build(deps): bump github.com/docker/docker (#111)
dependabot[bot] Mar 17, 2025
2daa325
build(deps): bump github.com/Azure/azure-sdk-for-go/sdk/azidentity (#63)
dependabot[bot] Mar 17, 2025
7f0d96f
build(deps): bump github.com/containerd/containerd from 1.6.26 to 1.6.38
dependabot[bot] Mar 18, 2025
daa349f
build(deps): bump github.com/golang-jwt/jwt/v5 from 5.2.1 to 5.2.2
dependabot[bot] Mar 21, 2025
6972000
Merge pull request #113 from nudgebee/dependabot/go_modules/github.co…
blue4209211 Mar 22, 2025
4d41d2f
Merge pull request #112 from nudgebee/dependabot/go_modules/github.co…
blue4209211 Mar 22, 2025
4e976f9
fix: fix for invalid src workload
mayankpande88 Mar 26, 2025
2ccb845
fix: fix for src_workload_kind labels
mayankpande88 Mar 26, 2025
cae8e32
Merge pull request #114 from nudgebee/fix-service-map-metrics
blue4209211 Mar 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion common/ip_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,8 @@ func (resolver *K8sIPResolver) getControllerOfOwner(originalOwner *metav1.OwnerR
if !ok {
return nil, errors.New("type confusion in replicasets map")
}
return metav1.GetControllerOf(&replicaSet), nil
owner := metav1.GetControllerOf(&replicaSet)
return owner, nil
case "DaemonSet":
daemonSetVal, ok := resolver.snapshot.DaemonSets.Load(originalOwner.UID)
if !ok {
Expand Down Expand Up @@ -750,3 +751,18 @@ func (resolver *K8sIPResolver) resolvePodDescriptor(pod *v1.Pod) Workload {
}
return result
}

func (resolver *K8sIPResolver) ResolvePodOwner(podName string, podNamespace string) Workload {
pods, err := resolver.clientset.CoreV1().Pods(podNamespace).Get(context.Background(), podName, metav1.GetOptions{})
if err != nil {
return Workload{
Name: podName,
Namespace: podNamespace,
Kind: "Pod",
Region: "",
Zone: "",
Instance: "",
}
}
Comment thread
mayankpande88 marked this conversation as resolved.
return resolver.resolvePodDescriptor(pods)
}
46 changes: 33 additions & 13 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,10 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, pid
// %s -> pod name

split := strings.Split(string(id), "/")
namespace := split[1]
podName := split[2]

namespace := split[2]
podName := split[3]
Comment thread
mayankpande88 marked this conversation as resolved.
src_workload := registry.ip_resolver.ResolvePodOwner(podName, namespace)
klog.Infof("Pod %s/%s is owned by %s/%s/%s", namespace, podName, src_workload.Name, src_workload.Namespace, src_workload.Kind)
c := &Container{
id: id,
cgroup: cg,
Expand Down Expand Up @@ -206,7 +207,7 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, pid
done: make(chan struct{}),
ip_resolver: registry.ip_resolver,
registry: registry,
srcWorkload: common.Workload{Name: podName, Namespace: namespace, Kind: "Pod"},
srcWorkload: src_workload,
}
c.runLogParser("")

Expand Down Expand Up @@ -737,23 +738,46 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
if timestamp != 0 && conn.Timestamp != timestamp {
return nil
}
stats := c.l7Stats.get(r.Protocol, conn.DestinationKey, r, conn.srcWorkload, conn.DestinationKey.GetDestinationWorkload(), conn.DestinationKey.GetActualDestinationWorkload())
if conn.dstWorkload.Namespace == "external" && (r.Protocol == l7.ProtocolHTTP || r.Protocol == l7.ProtocolHTTP2) {
if host, ok := iqfqdn[conn.DestinationKey.ActualDestination().IP()]; ok {
log.Printf("Setting external host %s", host)
conn.dstWorkload.Name = host
} else {
host, error := l7.ParseHostFromHttpRequest(string(r.Payload))
if error == nil {
conn.dstWorkload.Name = host
}
}
}

// Parse HTTP request once if needed
var req *http.Request
var err error
var headers http.Header = http.Header{}
if r.Protocol == l7.ProtocolHTTP {
req, err = l7.ParseHTTPRequest(r.Payload)
if err == nil && req != nil && req.Header != nil {
headers = req.Header
}
}

trace := c.tracer.NewTrace(conn.DestinationKey.ActualDestinationIfKnown(), conn.srcWorkload, conn.DestinationKey.GetDestinationWorkload(), conn.DestinationKey.GetActualDestinationWorkload())
traceId := ""
if headers != nil {
traceId = trace.ExtractTraceId(headers)
}
stats := c.l7Stats.get(r.Protocol, conn.DestinationKey, r, conn.srcWorkload, conn.DestinationKey.GetDestinationWorkload(), conn.DestinationKey.GetActualDestinationWorkload(), traceId)
switch r.Protocol {
case l7.ProtocolHTTP:
stats.observe(r.Status.Http(), "", r.Duration)
payload := ""
method := ""
uri := ""
response := ""
host := ""
headers := http.Header{}
req, err := l7.ParseHTTPRequest(r.Payload)
host := conn.dstWorkload.Name
if err != nil {
log.Printf("Failed to parse payload %s, %q", err, string(r.Payload))
method, uri = l7.ParseHttp(r.Payload)
host, _ = l7.ParseHostFromHttpRequest(string(r.Payload))
} else {
if req != nil && req.Body != nil {
body, _ := io.ReadAll(req.Body)
Expand All @@ -768,10 +792,6 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
klog.Warningf("Non-utf8 characters in uri %q", req.URL.Path)
uri = string([]rune(req.URL.Path))
}
host = req.Host
if req.Header != nil {
headers = req.Header
}
}
if r.Response != nil {
response = base64.StdEncoding.EncodeToString(r.Response)
Expand Down
15 changes: 5 additions & 10 deletions containers/l7.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (m *L7Metrics) observe(status, method string, duration time.Duration) {

type L7Stats map[l7.Protocol]map[common.DestinationKey]*L7Metrics // protocol -> dst:actual_dst -> metrics

func (s L7Stats) get(protocol l7.Protocol, key common.DestinationKey, r *l7.RequestData, srcWorkload common.Workload, dstWorkload common.Workload, actualDstWorkload common.Workload) *L7Metrics {
func (s L7Stats) get(protocol l7.Protocol, key common.DestinationKey, r *l7.RequestData, srcWorkload common.Workload, dstWorkload common.Workload, actualDstWorkload common.Workload, traceId string) *L7Metrics {
if protocol == l7.ProtocolHTTP2 {
protocol = l7.ProtocolHTTP
}
Expand All @@ -56,13 +56,16 @@ func (s L7Stats) get(protocol l7.Protocol, key common.DestinationKey, r *l7.Requ
"destination_workload_kind": dstWorkload.Kind,
"destination_workload_name": dstWorkload.Name,
"destination_workload_namespace": dstWorkload.Namespace,
"src_kind": srcWorkload.Kind,
"src_workload_kind": srcWorkload.Kind,
"src_workload_name": srcWorkload.Name,
"src_workload_namespace": srcWorkload.Namespace,
"actual_destination_workload_kind": actualDstWorkload.Kind,
"actual_destination_workload_name": actualDstWorkload.Name,
"actual_destination_workload_namespace": actualDstWorkload.Namespace,
}
if traceId != "" {
constLabels["trace_id"] = traceId
}
labels := []string{"status"}
switch protocol {
case l7.ProtocolRabbitmq, l7.ProtocolNats:
Expand All @@ -75,14 +78,6 @@ func (s L7Stats) get(protocol l7.Protocol, key common.DestinationKey, r *l7.Requ
log.Printf("Failed to parse path %s", path)
}
constLabels["method"] = method
if dstWorkload.Namespace == "external" {
host, err := l7.ParseHostFromHttpRequest(string(r.Payload))
if host != "" {
constLabels["destination_workload_name"] = host
} else {
log.Printf("Failed to parse host %s , %v", string(r.Payload), err)
}
}
hOpts := L7Latency[protocol]
m.Latency = prometheus.NewHistogram(
prometheus.HistogramOpts{Name: hOpts.Name, Help: hOpts.Help, ConstLabels: constLabels},
Expand Down
1 change: 1 addition & 0 deletions containers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type IPResolver interface {
CacheDNS(string, string) common.Workload
StartWatching() error
StopWatching()
ResolvePodOwner(string, string) common.Workload
}

type Registry struct {
Expand Down
45 changes: 30 additions & 15 deletions ebpftracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,32 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
v := &l7Event{}
data := rec.RawSample
reader := bytes.NewBuffer(data)

// Ensure binary.Read does not fail before proceeding
if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
klog.Warningln("failed to read msg:", err)
continue
}

payload := reader.Bytes()
expectedSize := int(v.PayloadSize) + int(v.ResponseSize)

// If the actual payload is smaller than expected, we log a warning and adjust
if len(payload) < expectedSize {
klog.Warningf("Payload too small (got %d bytes, expected %d), adjusting sizes", len(payload), expectedSize)
}

// Compute safe slicing limits
payloadEnd := min(int(v.PayloadSize), len(payload))
responseEnd := min(payloadEnd+int(v.ResponseSize), len(payload))

// Always copy to prevent garbage data from reused buffers
payloadData := make([]byte, payloadEnd)
copy(payloadData, payload[:payloadEnd])

responseData := make([]byte, responseEnd-payloadEnd)
copy(responseData, payload[payloadEnd:responseEnd])

req := &l7.RequestData{
Protocol: l7.Protocol(v.Protocol),
Status: l7.Status(v.Status),
Expand All @@ -401,23 +422,17 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
StatementId: v.StatementId,
PayloadSize: v.PayloadSize,
ResponseSize: v.ResponseSize,
Payload: payloadData,
Response: responseData,
}
switch {
case v.PayloadSize == 0:
case v.PayloadSize > MaxPayloadSize && v.ResponseSize > MaxPayloadSize:
req.Payload = payload[:MaxPayloadSize]
req.Response = payload[MaxPayloadSize:]
case v.PayloadSize > MaxPayloadSize:
req.Payload = payload[:MaxPayloadSize]
req.Response = payload[MaxPayloadSize : MaxPayloadSize+v.ResponseSize]
case v.ResponseSize > MaxPayloadSize:
req.Payload = payload[:v.PayloadSize]
req.Response = payload[MaxPayloadSize:]
default:
req.Payload = payload[:v.PayloadSize]
req.Response = payload[MaxPayloadSize : MaxPayloadSize+v.ResponseSize]

event = Event{
Type: EventTypeL7Request,
Pid: v.Pid,
Fd: v.Fd,
Timestamp: v.ConnectionTimestamp,
L7Request: req,
}
event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req}
case perfMapTypeFileEvents:
v := &fileEvent{}
if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
Expand Down
37 changes: 20 additions & 17 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/coroot/coroot-node-agent

go 1.23
go 1.23.0

toolchain go1.24.1

require (
cloud.google.com/go/compute/metadata v0.3.0
Expand All @@ -9,9 +11,9 @@ require (
github.com/cilium/cilium v1.13.7
github.com/cilium/ebpf v0.16.0
github.com/containerd/cgroups v1.0.4
github.com/containerd/containerd v1.6.26
github.com/containerd/containerd v1.6.38
github.com/coreos/go-systemd/v22 v22.5.0
github.com/docker/docker v25.0.3+incompatible
github.com/docker/docker v25.0.6+incompatible
github.com/florianl/go-conntrack v0.3.0
github.com/go-kit/log v0.2.1
github.com/godbus/dbus/v5 v5.0.6
Expand All @@ -38,8 +40,8 @@ require (
go.opentelemetry.io/otel/trace v1.24.0
golang.org/x/arch v0.4.0
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a
golang.org/x/net v0.33.0
golang.org/x/sys v0.28.0
golang.org/x/net v0.36.0
golang.org/x/sys v0.30.0
golang.org/x/time v0.5.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/yaml.v2 v2.4.0
Expand All @@ -51,12 +53,12 @@ require (
)

require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.10.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.6.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.8.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/Microsoft/hcsshim v0.9.10 // indirect
github.com/Microsoft/hcsshim v0.9.12 // indirect
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
Expand All @@ -69,6 +71,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cilium/workerpool v1.1.3 // indirect
github.com/containerd/continuity v0.3.0 // indirect
github.com/containerd/errdefs v0.1.0 // indirect
github.com/containerd/fifo v1.0.0 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/containerd/ttrpc v1.1.2 // indirect
Expand Down Expand Up @@ -101,7 +104,7 @@ require (
github.com/go-openapi/validate v0.23.0 // indirect
github.com/gogo/googleapis v1.4.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.0 // indirect
github.com/golang-jwt/jwt/v5 v5.2.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
Expand Down Expand Up @@ -133,15 +136,15 @@ require (
github.com/moby/locker v1.0.1 // indirect
github.com/moby/sys/mountinfo v0.6.2 // indirect
github.com/moby/sys/signal v0.6.0 // indirect
github.com/moby/sys/user v0.1.0 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc2.0.20221005185240-3a7f492d3f1b // indirect
github.com/opencontainers/runc v1.1.14 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/opencontainers/selinux v1.10.1 // indirect
github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
Expand Down Expand Up @@ -179,12 +182,12 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go4.org/intern v0.0.0-20211027215823-ae77deb06f29 // indirect
go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/crypto v0.35.0 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/term v0.27.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/sync v0.11.0 // indirect
golang.org/x/term v0.29.0 // indirect
golang.org/x/text v0.22.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240304212257-790db918fca8 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
Expand Down
Loading