Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
ede1edc
fix: added logger for pattern matching
mayankpande88 Apr 1, 2024
79d6eb6
Merge pull request #26 from nudgebee/mayankpande88-patch-2
RamanKharchee Apr 1, 2024
43a7595
fix: added request body in tracing data (#27)
mayankpande88 Apr 3, 2024
a8cd8cf
feat: added addition span attributes (#28)
mayankpande88 Apr 3, 2024
a9603bc
feat: added external host resolver from request header of host (#29)
mayankpande88 Apr 4, 2024
e5c4a6a
fix: fix for host header parsing
mayankpande88 Apr 4, 2024
1cfb2db
Merge pull request #30 from nudgebee/mayankpande88-patch-2
blue4209211 Apr 5, 2024
d40dce1
fix: fix for host parsing
mayankpande88 Apr 5, 2024
de4a0ce
Merge pull request #31 from nudgebee/mayankpande88-patch-2
RamanKharchee Apr 5, 2024
815e370
fix: fix for request body parsing
mayankpande88 Apr 5, 2024
8f8cf71
fix: fix for request parsing and filter control plane related tracing
mayankpande88 Apr 8, 2024
34739a6
fix: fix for flag
mayankpande88 Apr 8, 2024
a9953b5
fix: fixes for payload capture
mayankpande88 Apr 9, 2024
4503428
fix: fix for response capture
mayankpande88 Apr 10, 2024
14561c6
fix: added flag for sanitize and encoding for headers
mayankpande88 Apr 10, 2024
7116acf
Merge pull request #32 from nudgebee/mayankpande88-patch-2
RamanKharchee Apr 11, 2024
01dac72
fix: fix for headers as json and ignore control plane dest
mayankpande88 Apr 11, 2024
958a5e3
Merge pull request #33 from nudgebee/mayankpande88-patch-2
blue4209211 Apr 11, 2024
f46880f
fix: fix for failure (#34)
mayankpande88 Apr 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions common/ip_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ func (resolver *K8sIPResolver) ResolveIP(ip string) Workload {
}
}

func (resolver *K8sIPResolver) CacheDNS(ip string, dns string) Workload {
resolver.dnsResolvedIps.Add(ip, dns)
return Workload{
Name: dns,
Namespace: "external",
Kind: "external",
}
}

func (resolver *K8sIPResolver) StartWatching() error {
// register watchers
podsWatcher, err := resolver.clientset.CoreV1().Pods("").Watch(context.Background(), metav1.ListOptions{})
Expand Down
112 changes: 98 additions & 14 deletions containers/container.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package containers

import (
"io"
"log"
"os"
"strings"
"sync"
"time"

"encoding/base64"

"github.com/coroot/coroot-node-agent/cgroup"
"github.com/coroot/coroot-node-agent/common"
"github.com/coroot/coroot-node-agent/ebpftracer"
Expand All @@ -24,8 +28,9 @@ import (
)

var (
gcInterval = 10 * time.Minute
pingTimeout = 300 * time.Millisecond
gcInterval = 10 * time.Minute
pingTimeout = 300 * time.Millisecond
payloadThreshold = 1024 * 1024
)

type ContainerID string
Expand Down Expand Up @@ -75,7 +80,7 @@ type ActiveConnection struct {
ActualDest netaddr.IPPort
srcWorkload common.Workload
dstWorkload common.Workload
actialDestWorkload common.Workload
actualDestWorkload common.Workload
Pid uint32
Fd uint64
Timestamp uint64
Expand Down Expand Up @@ -137,6 +142,7 @@ type Container struct {

done chan struct{}
ip_resolver IPResolver
hostIpsMap sync.Map
}

func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, ip_resolver IPResolver) (*Container, error) {
Expand Down Expand Up @@ -173,6 +179,7 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host

done: make(chan struct{}),
ip_resolver: ip_resolver,
hostIpsMap: sync.Map{},
}

for _, n := range md.networks {
Expand Down Expand Up @@ -301,6 +308,12 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
workload_src = c.ip_resolver.ResolveIP(d.src.IP().String())
workload_dest = c.ip_resolver.ResolveIP(d.dst.IP().String())
}

if d.dstWorkload.Kind == "external" {
if host, ok := c.hostIpsMap.Load(d.dst); ok {
workload_dest = common.Workload{Name: host.(string), Namespace: "external", Kind: "external"}
}
}
ch <- counter(metrics.NetConnectsSuccessful, float64(count), d.src.String(), d.dst.String(), workload_src.Name, workload_src.Namespace, workload_src.Kind, workload_dest.Name, workload_dest.Namespace, workload_dest.Kind, actualDestWorkload.Name, actualDestWorkload.Namespace, actualDestWorkload.Kind)
}
for dst, count := range c.connectsFailed {
Expand All @@ -315,6 +328,11 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
workload_src = c.ip_resolver.ResolveIP(d.src.IP().String())
workload_dest = c.ip_resolver.ResolveIP(d.dst.IP().String())
}
if d.dstWorkload.Kind == "external" {
if host, ok := c.hostIpsMap.Load(d.dst); ok {
workload_dest = common.Workload{Name: host.(string), Namespace: "external", Kind: "external"}
}
}
ch <- counter(metrics.NetRetransmits, float64(count), d.src.String(), d.dst.String(), workload_src.Name, workload_src.Namespace, workload_src.Kind, workload_dest.Name, workload_dest.Namespace, workload_dest.Kind)
}

Expand All @@ -326,6 +344,11 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
workload_src := c.ip_resolver.ResolveIP(addrPair.dst.IP().String())
workload_dest := c.ip_resolver.ResolveIP(conn.ActualDest.IP().String())
actualDestWorkload := c.ip_resolver.ResolveActualIP(conn.ActualDest.IP().String())
if workload_dest.Kind == "external" {
if host, ok := c.hostIpsMap.Load(conn.ActualDest.IP().String()); ok {
workload_dest = common.Workload{Name: host.(string), Namespace: "external", Kind: "external"}
}
}
connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest, srcWorkload: workload_src, dstWorkload: workload_dest, actualDestWorkload: actualDestWorkload}]++
}
for d, count := range connections {
Expand Down Expand Up @@ -502,6 +525,20 @@ func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
}
}

func ignoreControlPlane(name string) bool {
keywords := strings.Split(*flags.IgnoreControlPlane, ",")
if len(keywords) == 0 {
return false
}
for _, keyword := range keywords {
if strings.Contains(strings.ToLower(name), keyword) {
klog.Warningln("Ignoring control plane ", name)
return true
}
}
return false
}

func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool) {
if common.PortFilter.ShouldBeSkipped(dst.Port()) {
return
Expand All @@ -513,7 +550,18 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
if dst.IP().IsLoopback() && !p.isHostNs() {
return
}
srcWorkload := c.ip_resolver.ResolveIP(src.IP().String())
if ignoreControlPlane(srcWorkload.Name) {
klog.Warningf("Ignoring src workload %s, %s \n", src.IP().String(), srcWorkload.Name)
return
}
actualDst, err := c.getActualDestination(p, src, dst)
dstWorkload := c.ip_resolver.ResolveIP(dst.IP().String())

if ignoreControlPlane(dstWorkload.Name) {
klog.Warningf("Ignoring src workload %s, %s \n", dst.IP().String(), dstWorkload.Name)
return
}
if err != nil {
if !common.IsNotExist(err) {
klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
Expand All @@ -534,11 +582,9 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
if failed {
c.connectsFailed[dst]++
} else {
srcWorkload := c.ip_resolver.ResolveIP(src.IP().String())
dstWorkload := c.ip_resolver.ResolveIP(dst.IP().String())
actialDestWorkload := c.ip_resolver.ResolveActualIP(actualDst.IP().String())
actualDestWorkload := c.ip_resolver.ResolveActualIP(actualDst.IP().String())
c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst, srcWorkload: srcWorkload,
dstWorkload: dstWorkload, actualDestWorkload: actialDestWorkload}]++
dstWorkload: dstWorkload, actualDestWorkload: actualDestWorkload}]++
connection := &ActiveConnection{
Dest: dst,
ActualDest: *actualDst,
Expand All @@ -547,9 +593,9 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
Timestamp: timestamp,
srcWorkload: srcWorkload,
dstWorkload: dstWorkload,
actialDestWorkload: actialDestWorkload,
actualDestWorkload: actualDestWorkload,
}
c.connectionsActive[AddrPair{src: src, dst: dst, srcWorkload: srcWorkload, dstWorkload: dstWorkload, actualDestWorkload: actialDestWorkload}] = connection
c.connectionsActive[AddrPair{src: src, dst: dst, srcWorkload: srcWorkload, dstWorkload: dstWorkload, actualDestWorkload: actualDestWorkload}] = connection
c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] = connection
}
c.connectLastAttempt[dst] = time.Now()
Expand Down Expand Up @@ -608,21 +654,59 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
return
}

stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest, r, conn.srcWorkload, conn.dstWorkload, conn.actialDestWorkload)
trace := tracing.NewTrace(string(c.id), conn.ActualDest)
if conn.dstWorkload.Namespace == "external" && (r.Protocol == l7.ProtocolHTTP || r.Protocol == l7.ProtocolHTTP2) {
host, error := l7.ParseHostFromHttpRequest(string(r.Payload))
if error == nil {
conn.dstWorkload.Name = host
c.hostIpsMap.Store(conn.ActualDest.IP().String(), host)
}
}
stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest, r, conn.srcWorkload, conn.dstWorkload, conn.actualDestWorkload)
trace := tracing.NewTrace(string(c.id), conn.ActualDest, conn.srcWorkload, conn.dstWorkload, conn.actualDestWorkload)
switch r.Protocol {
case l7.ProtocolHTTP:
stats.observe(r.Status.Http(), "", r.Duration)
method, path := l7.ParseHttp(r.Payload)
trace.HttpRequest(method, path, r.Status, r.Duration)
payload := ""
headers := ""
method := ""
uri := ""
response := ""
host := ""
req, err := l7.ParseHTTPRequest(r.Payload)
if err != nil {
log.Printf("Failed to parse payload %s, %q", err, string(r.Payload))
method, uri = l7.ParseHttp(r.Payload)
host, _ = l7.ParseHostFromHttpRequest(string(r.Payload))
} else {
if req != nil && req.Body != nil {
body, _ := io.ReadAll(req.Body)
base64String := base64.StdEncoding.EncodeToString(body)
payload = string(base64String)
}
if req != nil && req.Header != nil {
headersStr := l7.ConvertHeadersToString(req.Header)
headers = base64.StdEncoding.EncodeToString([]byte(headersStr))
}
method = req.Method
uri = req.URL.Path
host = req.Host
}
if r.Response != nil {
response = base64.StdEncoding.EncodeToString(r.Response)
}
trace.HttpRequest(method, uri, r.Status, r.Duration, r.PayloadSize, payload, headers, response, host)
case l7.ProtocolHTTP2:
if conn.http2Parser == nil {
conn.http2Parser = l7.NewHttp2Parser()
}
requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
for _, req := range requests {
stats.observe(req.Status.Http(), "", req.Duration)
trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
payload := ""
if int(r.PayloadSize) < payloadThreshold {
payload = string(r.Payload)
}
trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration, string(payload))
}
case l7.ProtocolPostgres:
if r.Method != l7.MethodStatementClose {
Expand Down
14 changes: 9 additions & 5 deletions containers/l7.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,17 @@ func (s L7Stats) get(protocol l7.Protocol, destination, actualDestination netadd
case l7.ProtocolRabbitmq, l7.ProtocolNats:
labels = append(labels, "method")
case l7.ProtocolHTTP:
method, path, payload := l7.ParseHttpAndRest(r.Payload)
if r.Status.Http() == "400" || r.Status.Http() == "500" {
log.Printf("Captured failed request actual body %s, converted body %s, status %s", string(r.Payload), payload, r.Status.Http())
constLabels["payload"] = payload
}
method, path := l7.ParseHttp(r.Payload)
constLabels["path"] = path
constLabels["method"] = method
if dstWorkload.Namespace == "external" {
host, err := l7.ParseHostFromHttpRequest(string(r.Payload))
if host != "" {
constLabels["destination_workload_name"] = host
} else {
log.Printf("Failed to parse host %s , %q", host, err)
}
}
hOpts := L7Latency[protocol]
m.Latency = prometheus.NewHistogram(
prometheus.HistogramOpts{Name: hOpts.Name, Help: hOpts.Help, ConstLabels: constLabels},
Expand Down
3 changes: 1 addition & 2 deletions containers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ProcessInfo struct {
type IPResolver interface {
ResolveIP(string) common.Workload
ResolveActualIP(string) common.Workload
CacheDNS(string, string) common.Workload
StartWatching() error
StopWatching()
}
Expand Down Expand Up @@ -287,7 +288,6 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
return nil
}
id := calcId(cg, md)
klog.Infof("calculated container id %d -> %s -> %s", pid, cg.Id, id)
if id == "" {
if cg.Id == "/init.scope" && pid != 1 {
klog.InfoS("ignoring without persisting", "cg", cg.Id, "pid", pid)
Expand Down Expand Up @@ -318,7 +318,6 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
return nil
}

klog.InfoS("detected a new container", "pid", pid, "cg", cg.Id, "id", id)
if err := prometheus.WrapRegistererWith(prometheus.Labels{"container_id": string(id)}, r.reg).Register(c); err != nil {
klog.Warningln("failed to register container:", err)
return nil
Expand Down
16 changes: 8 additions & 8 deletions ebpftracer/ebpf.go

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions ebpftracer/ebpf/l7/l7.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#define METHOD_HTTP2_CLIENT_FRAMES 5
#define METHOD_HTTP2_SERVER_FRAMES 6

#define MAX_PAYLOAD_SIZE 1024 // must be power of 2
#define MAX_PAYLOAD_SIZE 1024 * 5 // must be power of 2
#define TRUNCATE_PAYLOAD_SIZE(size) ({ \
size = MIN(size, MAX_PAYLOAD_SIZE-1); \
asm volatile ("%0 &= %1" : "+r"(size) : "i"(MAX_PAYLOAD_SIZE-1)); \
Expand Down Expand Up @@ -63,7 +63,9 @@ struct l7_event {
__u16 padding;
__u32 statement_id;
__u64 payload_size;
__u64 response_size;
char payload[MAX_PAYLOAD_SIZE];
char response[MAX_PAYLOAD_SIZE];
};

struct {
Expand Down Expand Up @@ -388,7 +390,8 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
e->method = METHOD_UNKNOWN;
e->statement_id = 0;
e->payload_size = 0;

e->response_size = ret;
COPY_PAYLOAD(e->response, ret, payload);
if (is_rabbitmq_consume(payload, ret)) {
e->protocol = PROTOCOL_RABBITMQ;
e->method = METHOD_CONSUME;
Expand Down Expand Up @@ -427,7 +430,6 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
e->protocol = req->protocol;
e->payload_size = req->payload_size;
COPY_PAYLOAD(e->payload, req->payload_size, req->payload);

bpf_map_delete_elem(&active_l7_requests, &k);
if (e->protocol == PROTOCOL_HTTP) {
response = is_http_response(payload, &e->status);
Expand Down
Loading