Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 121 additions & 71 deletions ebpftracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@ import (
"k8s.io/klog/v2"
)

const MaxPayloadSize = 8192 // Must match MAX_PAYLOAD_SIZE in eBPF
const (
MaxPayloadSize = 8192 // Must match MAX_PAYLOAD_SIZE in eBPF

l7EventHeaderSize = 96 // 56 bytes base + 40 bytes socket tuple fields
tcpEventSize = 104
fileEventSize = 32
procEventSize = 12
)

type EventType uint32
type EventReason uint32
Expand Down Expand Up @@ -468,78 +475,95 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy

switch typ {
case perfMapTypeL7Events:
v := &l7Event{}
data := rec.RawSample

if err := binary.Read(bytes.NewBuffer(data), binary.LittleEndian, v); err != nil {
klog.Warningln("failed to read l7 event:", err)
if len(data) < l7EventHeaderSize {
klog.Warningln("invalid l7 event size:", len(data))
continue
}
payloadSize := binary.LittleEndian.Uint64(data[40:48])
responseSize := binary.LittleEndian.Uint64(data[48:56])
payloadLen := int(payloadSize)
if payloadLen > MaxPayloadSize {
payloadLen = MaxPayloadSize
}
responseLen := int(responseSize)
if responseLen > MaxPayloadSize {
responseLen = MaxPayloadSize
}

// Extract payload data directly from the struct arrays
payloadSize := min(int(v.PayloadSize), len(v.Payload))
responseSize := min(int(v.ResponseSize), len(v.Response))

// Copy the actual data (preventing garbage from unused buffer space)
payloadData := make([]byte, payloadSize)
copy(payloadData, v.Payload[:payloadSize])
payloadData := make([]byte, payloadLen)
if l7EventHeaderSize+payloadLen <= len(data) {
copy(payloadData, data[l7EventHeaderSize:l7EventHeaderSize+payloadLen])
}

responseData := make([]byte, responseSize)
copy(responseData, v.Response[:responseSize])
responseData := make([]byte, responseLen)
respOffset := l7EventHeaderSize + MaxPayloadSize
if respOffset+responseLen <= len(data) {
copy(responseData, data[respOffset:respOffset+responseLen])
}

req := &l7.RequestData{
Protocol: l7.Protocol(v.Protocol),
Status: l7.Status(v.Status),
Duration: time.Duration(v.Duration),
Method: l7.Method(v.Method),
StatementId: v.StatementId,
PayloadSize: v.PayloadSize,
ResponseSize: v.ResponseSize,
Protocol: l7.Protocol(data[32]),
Method: l7.Method(data[33]),
Status: l7.Status(int32(binary.LittleEndian.Uint32(data[20:24]))),
Duration: time.Duration(binary.LittleEndian.Uint64(data[24:32])),
StatementId: binary.LittleEndian.Uint32(data[36:40]),
PayloadSize: payloadSize,
ResponseSize: responseSize,
Payload: payloadData,
Response: responseData,
}

event = Event{
Type: EventTypeL7Request,
Pid: v.Pid,
Fd: v.Fd,
Timestamp: v.ConnectionTimestamp,
Pid: binary.LittleEndian.Uint32(data[16:20]),
Fd: binary.LittleEndian.Uint64(data[0:8]),
Timestamp: binary.LittleEndian.Uint64(data[8:16]),
L7Request: req,
}
case perfMapTypeFileEvents:
v := &fileEvent{}
if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
klog.Warningln("failed to read file event:", err)
if len(rec.RawSample) < fileEventSize {
klog.Warningln("invalid file event size:", len(rec.RawSample))
continue
}
event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd, Mnt: v.Mnt, Log: v.Log > 0}
event = Event{
Type: EventType(binary.LittleEndian.Uint32(rec.RawSample[0:4])),
Pid: binary.LittleEndian.Uint32(rec.RawSample[4:8]),
Fd: binary.LittleEndian.Uint64(rec.RawSample[8:16]),
Mnt: binary.LittleEndian.Uint64(rec.RawSample[16:24]),
Log: binary.LittleEndian.Uint64(rec.RawSample[24:32]) > 0,
}
case perfMapTypeProcEvents:
v := &procEvent{}
if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
klog.Warningln("failed to read proc event:", err)
if len(rec.RawSample) < procEventSize {
klog.Warningln("invalid proc event size:", len(rec.RawSample))
continue
}
event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid}
event = Event{
Type: EventType(binary.LittleEndian.Uint32(rec.RawSample[0:4])),
Pid: binary.LittleEndian.Uint32(rec.RawSample[4:8]),
Reason: EventReason(binary.LittleEndian.Uint32(rec.RawSample[8:12])),
}
case perfMapTypeTCPEvents:
v := &tcpEvent{}
if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
klog.Warningln("failed to read tcp event:", err)
if len(rec.RawSample) < tcpEventSize {
klog.Warningln("invalid tcp event size:", len(rec.RawSample))
continue
}
data := rec.RawSample
typ := EventType(binary.LittleEndian.Uint32(data[24:28]))
event = Event{
Type: v.Type,
Pid: v.Pid,
SrcAddr: ipPort(v.SAddr, v.SPort),
DstAddr: ipPort(v.DAddr, v.DPort),
ActualDstAddr: ipPort(v.AAddr, v.Aport),
Fd: v.Fd,
Timestamp: v.Timestamp,
Duration: time.Duration(v.Duration),
Type: typ,
Pid: binary.LittleEndian.Uint32(data[28:32]),
SrcAddr: ipPort(data[54:70], binary.LittleEndian.Uint16(data[48:50])),
DstAddr: ipPort(data[70:86], binary.LittleEndian.Uint16(data[50:52])),
ActualDstAddr: ipPort(data[86:102], binary.LittleEndian.Uint16(data[52:54])),
Fd: binary.LittleEndian.Uint64(data[0:8]),
Timestamp: binary.LittleEndian.Uint64(data[8:16]),
Duration: time.Duration(binary.LittleEndian.Uint64(data[16:24])),
}
if v.Type == EventTypeConnectionClose {
if typ == EventTypeConnectionClose {
event.TrafficStats = &TrafficStats{
BytesSent: v.BytesSent,
BytesReceived: v.BytesReceived,
BytesSent: binary.LittleEndian.Uint64(data[32:40]),
BytesReceived: binary.LittleEndian.Uint64(data[40:48]),
}
}
default:
Expand All @@ -565,43 +589,69 @@ func runRingbufEventsReader(name string, r *ringbuf.Reader, ch chan<- Event) {
continue
}

v := &l7Event{}
if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
klog.Warningln("failed to read l7 event from ring buffer:", err)
data := rec.RawSample
if len(data) < l7EventHeaderSize {
klog.Warningln("invalid l7 event from ring buffer, size:", len(data))
continue
}

// Extract payload data directly from the struct arrays
payloadSize := min(int(v.PayloadSize), len(v.Payload))
responseSize := min(int(v.ResponseSize), len(v.Response))
payloadSize := binary.LittleEndian.Uint64(data[40:48])
responseSize := binary.LittleEndian.Uint64(data[48:56])

payloadLen := int(payloadSize)
if payloadLen > MaxPayloadSize {
payloadLen = MaxPayloadSize
}
responseLen := int(responseSize)
if responseLen > MaxPayloadSize {
responseLen = MaxPayloadSize
}

// Copy the actual data (preventing garbage from unused buffer space)
payloadData := make([]byte, payloadSize)
copy(payloadData, v.Payload[:payloadSize])
payloadData := make([]byte, payloadLen)
if l7EventHeaderSize+payloadLen <= len(data) {
copy(payloadData, data[l7EventHeaderSize:l7EventHeaderSize+payloadLen])
}

responseData := make([]byte, responseSize)
copy(responseData, v.Response[:responseSize])
responseData := make([]byte, responseLen)
respOffset := l7EventHeaderSize + MaxPayloadSize
if respOffset+responseLen <= len(data) {
copy(responseData, data[respOffset:respOffset+responseLen])
}

req := &l7.RequestData{
Protocol: l7.Protocol(v.Protocol),
Status: l7.Status(v.Status),
Duration: time.Duration(v.Duration),
Method: l7.Method(v.Method),
StatementId: v.StatementId,
PayloadSize: v.PayloadSize,
ResponseSize: v.ResponseSize,
Protocol: l7.Protocol(data[32]),
Method: l7.Method(data[33]),
Status: l7.Status(int32(binary.LittleEndian.Uint32(data[20:24]))),
Duration: time.Duration(binary.LittleEndian.Uint64(data[24:32])),
StatementId: binary.LittleEndian.Uint32(data[36:40]),
PayloadSize: payloadSize,
ResponseSize: responseSize,
Payload: payloadData,
Response: responseData,
}

// Extract socket info from L7 event (enables processing without TCP connection tracking)
socketInfo := GetSocketInfoFromL7Event(v)
// Extract socket info from raw bytes
var socketInfo *SocketInfo
if data[94] != 0 { // SocketInfoValid at offset 94
var saddr, daddr [16]byte
copy(saddr[:], data[56:72])
copy(daddr[:], data[72:88])
addrFamily := binary.LittleEndian.Uint16(data[92:94])
socketInfo = &SocketInfo{
SrcIP: extractIPFromSocketInfo(saddr, addrFamily),
DstIP: extractIPFromSocketInfo(daddr, addrFamily),
SrcPort: binary.LittleEndian.Uint16(data[88:90]),
DstPort: binary.LittleEndian.Uint16(data[90:92]),
Family: addrFamily,
Valid: true,
}
}

event := Event{
Type: EventTypeL7Request,
Pid: v.Pid,
Fd: v.Fd,
Timestamp: v.ConnectionTimestamp,
Pid: binary.LittleEndian.Uint32(data[16:20]),
Fd: binary.LittleEndian.Uint64(data[0:8]),
Timestamp: binary.LittleEndian.Uint64(data[8:16]),
L7Request: req,
SocketInfo: socketInfo,
}
Expand All @@ -610,8 +660,8 @@ func runRingbufEventsReader(name string, r *ringbuf.Reader, ch chan<- Event) {
}
}

func ipPort(ip [16]byte, port uint16) netaddr.IPPort {
i, _ := netaddr.FromStdIP(ip[:])
func ipPort(ip []byte, port uint16) netaddr.IPPort {
i, _ := netaddr.FromStdIP(ip)
return netaddr.IPPortFrom(i, port)
}

Expand Down