Skip to content

Commit

Permalink
optimize pod scanning (#46)
Browse files Browse the repository at this point in the history
* pod scanning optimizations

* pod scanning optimizations
  • Loading branch information
iluxa committed Mar 5, 2024
1 parent 13e2472 commit 282ba9c
Show file tree
Hide file tree
Showing 15 changed files with 464 additions and 256 deletions.
8 changes: 4 additions & 4 deletions bpf/fd_to_address_tracepoints.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ SEC("tracepoint/syscalls/sys_enter_accept4")
void sys_enter_accept4(struct sys_enter_accept4_ctx* ctx) {
__u64 id = bpf_get_current_pid_tgid();

if (!should_target(id >> 32)) {
if (!should_watch(id >> 32)) {
return;
}

Expand All @@ -57,7 +57,7 @@ SEC("tracepoint/syscalls/sys_exit_accept4")
void sys_exit_accept4(struct sys_exit_accept4_ctx* ctx) {
__u64 id = bpf_get_current_pid_tgid();

if (!should_target(id >> 32)) {
if (!should_watch(id >> 32)) {
return;
}

Expand Down Expand Up @@ -124,7 +124,7 @@ SEC("tracepoint/syscalls/sys_enter_connect")
void sys_enter_connect(struct sys_enter_connect_ctx* ctx) {
__u64 id = bpf_get_current_pid_tgid();

if (!should_target(id >> 32)) {
if (!should_watch(id >> 32)) {
return;
}

Expand All @@ -151,7 +151,7 @@ SEC("tracepoint/syscalls/sys_exit_connect")
void sys_exit_connect(struct sys_exit_connect_ctx* ctx) {
__u64 id = bpf_get_current_pid_tgid();

if (!should_target(id >> 32)) {
if (!should_watch(id >> 32)) {
return;
}

Expand Down
4 changes: 2 additions & 2 deletions bpf/fd_tracepoints.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ SEC("tracepoint/syscalls/sys_enter_read")
void sys_enter_read(struct sys_enter_read_write_ctx* ctx) {
__u64 id = bpf_get_current_pid_tgid();

if (!should_target(id >> 32)) {
if (!should_watch(id >> 32)) {
return;
}

Expand All @@ -78,7 +78,7 @@ SEC("tracepoint/syscalls/sys_enter_write")
void sys_enter_write(struct sys_enter_read_write_ctx* ctx) {
__u64 id = bpf_get_current_pid_tgid();

if (!should_target(id >> 32)) {
if (!should_watch(id >> 32)) {
return;
}

Expand Down
4 changes: 2 additions & 2 deletions bpf/include/maps.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ struct goid_offsets {
};

struct pid_info {
__u64 go_tcp_conn_offset;
__s64 sys_fd_offset;
__u64 is_interface;
};
Expand Down Expand Up @@ -105,7 +104,8 @@ struct {
BPF_MAP(_name, BPF_MAP_TYPE_LRU_HASH, _key_type, _value_type, MAX_ENTRIES_LRU_HASH)

// Generic
BPF_HASH(pids_map, __u32, __u32);
BPF_HASH(target_pids_map, __u32, __u32);
BPF_HASH(watch_pids_map, __u32, __u32);
BPF_HASH(pids_info, struct pid_offset, struct pid_info);
BPF_LRU_HASH(connection_context, __u64, conn_flags);
BPF_PERF_OUTPUT(chunks_buffer);
Expand Down
15 changes: 12 additions & 3 deletions bpf/include/pids.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,26 @@ Copyright (C) Kubeshark
#ifndef __PIDS__
#define __PIDS__

int should_target(__u32 pid) {
__u32* shouldTarget = bpf_map_lookup_elem(&pids_map, &pid);
int _pid_in_map(struct bpf_map_def* pmap, __u32 pid) {
__u32* shouldTarget = bpf_map_lookup_elem(pmap, &pid);

if (shouldTarget != NULL && *shouldTarget == 1) {
return 1;
}

__u32 globalPid = 0;
__u32* shouldTargetGlobally = bpf_map_lookup_elem(&pids_map, &globalPid);
__u32* shouldTargetGlobally = bpf_map_lookup_elem(pmap, &globalPid);

return shouldTargetGlobally != NULL && *shouldTargetGlobally == 1;
}


int should_target(__u32 pid) {
return _pid_in_map(&target_pids_map, pid);
}

int should_watch(__u32 pid) {
return _pid_in_map(&watch_pids_map, pid);
}

#endif /* __PIDS__ */
2 changes: 1 addition & 1 deletion bpf/tcp_kprobes.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ static __always_inline void tcp_kprobe(struct pt_regs* ctx, struct bpf_map_def*

__u64 id = bpf_get_current_pid_tgid();

if (!should_target(id >> 32)) {
if (!should_watch(id >> 32)) {
return;
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ require (
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect
golang.org/x/sys v0.14.1-0.20231108175955-e4099bfacb8c // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,8 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.1-0.20231108175955-e4099bfacb8c h1:3kC/TjQ+xzIblQv39bCOyRk8fbEeJcDHwbyxPUU2BpA=
golang.org/x/sys v0.14.1-0.20231108175955-e4099bfacb8c/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw=
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
Expand Down
38 changes: 16 additions & 22 deletions go_offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,13 @@ const (
const PtrSize int = 8

type goOffsets struct {
GoWriteOffset *goExtendedOffset
GoReadOffset *goExtendedOffset
GoVersion string
Abi goAbi
GoidOffset uint64
GStructOffset uint64
GoTcpConnOffset uint64
NetConnOffsets map[string]*netConnOffset
GoWriteOffset *goExtendedOffset
GoReadOffset *goExtendedOffset
GoVersion string
Abi goAbi
GoidOffset uint64
GStructOffset uint64
NetConnOffsets map[string]*netConnOffset
}

type goExtendedOffset struct {
Expand All @@ -53,7 +52,6 @@ const (
goVersionSymbol = "runtime.buildVersion.str" // symbol does not exist in Go (<=1.16)
goWriteSymbol = "crypto/tls.(*Conn).Write"
goReadSymbol = "crypto/tls.(*Conn).Read"
goTcpConnSymbol = "go.itab.*net.TCPConn,net.Conn"
)

func findGoOffsets(fpath string) (goOffsets, error) {
Expand All @@ -64,7 +62,7 @@ func findGoOffsets(fpath string) (goOffsets, error) {
goReadSymbol: nil,
}

goidOffset, gStructOffset, goTcpConnOffset, netConnOffsets, err := getOffsets(fpath, offsets)
goidOffset, gStructOffset, netConnOffsets, err := getOffsets(fpath, offsets)
if err != nil {
return goOffsets{}, err
}
Expand Down Expand Up @@ -97,14 +95,13 @@ func findGoOffsets(fpath string) (goOffsets, error) {
}

return goOffsets{
GoWriteOffset: writeOffset,
GoReadOffset: readOffset,
GoVersion: goVersion,
Abi: abi,
GoidOffset: goidOffset,
GStructOffset: gStructOffset,
GoTcpConnOffset: goTcpConnOffset,
NetConnOffsets: netConnOffsets,
GoWriteOffset: writeOffset,
GoReadOffset: readOffset,
GoVersion: goVersion,
Abi: abi,
GoidOffset: goidOffset,
GStructOffset: gStructOffset,
NetConnOffsets: netConnOffsets,
}, nil
}

Expand Down Expand Up @@ -268,7 +265,7 @@ func getGoidOffset(elfFile *elf.File, netConnOffsets map[string]*netConnOffset)

var regexpNetConn = regexp.MustCompile(`go:itab\.\*([^,]+),net.Conn`)

func getOffsets(fpath string, offsets map[string]*goExtendedOffset) (goidOffset uint64, gStructOffset uint64, goTcpConnOffset uint64, netConnOffsets map[string]*netConnOffset, err error) {
func getOffsets(fpath string, offsets map[string]*goExtendedOffset) (goidOffset uint64, gStructOffset uint64, netConnOffsets map[string]*netConnOffset, err error) {
var engine gapstone.Engine
switch runtime.GOARCH {
case "amd64":
Expand Down Expand Up @@ -331,9 +328,6 @@ func getOffsets(fpath string, offsets map[string]*goExtendedOffset) (goidOffset
if len(matches) == 2 {
netConnOffsets[matches[1]] = &netConnOffset{symbolOffset: sym.Value, socketSysFdOffset: -1}
}
if sym.Name == goTcpConnSymbol {
goTcpConnOffset = sym.Value
}
if _, ok := offsets[sym.Name]; !ok {
continue
}
Expand Down
11 changes: 7 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/kubeshark/tracer/pkg/kubernetes"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
)

Expand Down Expand Up @@ -46,10 +47,15 @@ func main() {
func run() {
log.Info().Msg("Starting tracer...")

tracer = &Tracer{
procfs: *procfs,
watchingPods: make(map[types.UID]*podWatcher),
}

_, err := rest.InClusterConfig()
clusterMode := err == nil
errOut := make(chan error, 100)
watcher := kubernetes.NewFromInCluster(errOut, updateTargets)
watcher := kubernetes.NewFromInCluster(errOut, tracer.updateTargets)
ctx := context.Background()

if clusterMode {
Expand All @@ -76,9 +82,6 @@ func run() {
}

func createTracer() (err error) {
tracer = &Tracer{
procfs: *procfs,
}
chunksBufferSize := os.Getpagesize() * 10000
logBufferSize := os.Getpagesize()

Expand Down
3 changes: 1 addition & 2 deletions pkg/kubernetes/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package kubernetes

import (
"github.com/rs/zerolog/log"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

func NewFromInCluster(errOut chan error, callback func(pods []v1.Pod) error) *Watcher {
func NewFromInCluster(errOut chan error, callback callbackPodsChanged) *Watcher {
config, err := rest.InClusterConfig()
if err != nil {
log.Warn().Err(err).Send()
Expand Down
46 changes: 37 additions & 9 deletions pkg/kubernetes/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,26 @@ import (

var targetedPods []v1.Pod

var watchedPods []v1.Pod

func GetTargetedPods() []v1.Pod {
return targetedPods
}

func GetWatchedPods() []v1.Pod {
return watchedPods
}

func SetTargetedPods(pods []v1.Pod) {
targetedPods = pods
}

func SetWatchedPods(pods []v1.Pod) {
watchedPods = pods
}

type callbackPodsChanged func(addedWatchedPods []v1.Pod, removedWatchedPods []v1.Pod, addedTargetedPods []v1.Pod, removedTargetedPods []v1.Pod) error

func excludeSelfPods(pods []v1.Pod) []v1.Pod {
kubesharkLabels := map[string]string{"app.kubernetes.io/name": "kubeshark"}

Expand All @@ -43,7 +55,7 @@ func getPodArrayDiff(oldPods []v1.Pod, newPods []v1.Pod) (added []v1.Pod, remove
return added, removed
}

//returns pods present in pods1 array and missing in pods2 array
// returns pods present in pods1 array and missing in pods2 array
func getMissingPods(pods1 []v1.Pod, pods2 []v1.Pod) []v1.Pod {
missingPods := make([]v1.Pod, 0)
for _, pod1 := range pods1 {
Expand Down Expand Up @@ -104,31 +116,47 @@ func listAllRunningPodsMatchingRegex(ctx context.Context, clientSet *kubernetes.
return matchingPods, nil
}

var regexAllPods = regexp.MustCompile(`.*`)

func updateCurrentlyTargetedPods(
ctx context.Context,
clientSet *kubernetes.Clientset,
regex *regexp.Regexp,
namespaces []string,
callback func(pods []v1.Pod) error,
callback callbackPodsChanged,
) (err error) {

var allPods []v1.Pod
if allPods, err = listAllRunningPodsMatchingRegex(ctx, clientSet, regexAllPods, namespaces); err != nil {
return
}
podsToWatch := excludeSelfPods(allPods)

var matchingPods []v1.Pod
if matchingPods, err = listAllRunningPodsMatchingRegex(ctx, clientSet, regex, namespaces); err != nil {
return
}

podsToTarget := excludeSelfPods(matchingPods)
addedPods, removedPods := getPodArrayDiff(GetTargetedPods(), podsToTarget)
for _, addedPod := range addedPods {
addedTargetedPods, removedTargetedPods := getPodArrayDiff(GetTargetedPods(), podsToTarget)
addedWatchedPods, removedWatchedPods := getPodArrayDiff(GetWatchedPods(), podsToWatch)

for _, addedPod := range addedWatchedPods {
log.Info().Msg(fmt.Sprintf("Watched pod: %s", fmt.Sprintf(Green, addedPod.Name)))
}
for _, removedPod := range removedWatchedPods {
log.Info().Msg(fmt.Sprintf("Unwatchted pod: %s", fmt.Sprintf(Red, removedPod.Name)))
}
for _, addedPod := range addedTargetedPods {
log.Info().Msg(fmt.Sprintf("Targeted pod: %s", fmt.Sprintf(Green, addedPod.Name)))
}
for _, removedPod := range removedPods {
for _, removedPod := range removedTargetedPods {
log.Info().Msg(fmt.Sprintf("Untargeted pod: %s", fmt.Sprintf(Red, removedPod.Name)))
}

if len(addedPods) > 0 || len(removedPods) > 0 {
SetTargetedPods(podsToTarget)
err = callback(podsToTarget)
}
SetTargetedPods(podsToTarget)
SetWatchedPods(podsToWatch)
err = callback(addedWatchedPods, removedWatchedPods, addedTargetedPods, removedTargetedPods)

return
}
Loading

0 comments on commit 282ba9c

Please sign in to comment.