Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trace tcp/udp message bytes for in-kernel-aggregation #12

Merged
merged 14 commits into from
Apr 2, 2021
Merged
153 changes: 103 additions & 50 deletions bpf/conntracer.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,56 +20,69 @@ char LICENSE[] SEC("license") = "Dual BSD/GPL";

struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, struct ipv4_flow_key);
__type(key, struct aggregated_flow_tuple);
__type(value, struct aggregated_flow);
__uint(max_entries, MAX_FLOW_ENTRIES);
__uint(map_flags, BPF_F_NO_PREALLOC);
} flows SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, struct aggregated_flow_tuple);
__type(value, struct aggregated_flow_stat);
__uint(max_entries, MAX_FLOW_ENTRIES);
__uint(map_flags, BPF_F_NO_PREALLOC);
} flow_stats SEC(".maps");

static __always_inline void
insert_tcp_flows(pid_t pid, struct sock *sk, __u16 lport, __u8 direction)
{
insert_tcp_flows(struct aggregated_flow_tuple *tuple, pid_t pid) {
struct aggregated_flow flow = {}, *val;
struct ipv4_flow_key flow_key = {};

BPF_CORE_READ_INTO(&flow.saddr, sk, __sk_common.skc_rcv_saddr);
BPF_CORE_READ_INTO(&flow.daddr, sk, __sk_common.skc_daddr);
flow.lport = lport;
flow.ts_us = bpf_ktime_get_ns() / 1000;
flow.saddr = tuple->saddr;
flow.daddr = tuple->daddr;
flow.lport = tuple->lport;
flow.pid = pid;
flow.direction = direction;
flow.direction = tuple->direction;
bpf_get_current_comm(flow.task, sizeof(flow.task));

flow_key.saddr = flow.saddr;
flow_key.daddr = flow.daddr;
flow_key.lport = flow.lport;
flow_key.direction = flow.direction;
flow_key.l4_proto = IPPROTO_TCP;

val = bpf_map_lookup_elem(&flows, &flow_key);
if (val) {
__u32 *cnt = &(val->stat.connections);
__atomic_add_fetch(cnt, 1, __ATOMIC_RELAXED);
return;
}

flow.stat.connections = 1;
bpf_map_update_elem(&flows, &flow_key, &flow, BPF_ANY);
bpf_map_update_elem(&flows, tuple, &flow, BPF_ANY);
}

static __always_inline void
insert_udp_flows(pid_t pid, struct ipv4_flow_key* flow_key)
insert_udp_flows(pid_t pid, struct aggregated_flow_tuple* tuple)
{
struct aggregated_flow flow = {};

flow.saddr = flow_key->saddr;
flow.daddr = flow_key->daddr;
flow.lport = flow_key->lport;
flow.direction = flow_key->direction;
flow.l4_proto = flow_key->l4_proto;
flow.saddr = tuple->saddr;
flow.daddr = tuple->daddr;
flow.lport = tuple->lport;
flow.direction = tuple->direction;
flow.l4_proto = tuple->l4_proto;
flow.pid = pid;
bpf_get_current_comm(flow.task, sizeof(flow.task));

bpf_map_update_elem(&flows, flow_key, &flow, BPF_ANY);
bpf_map_update_elem(&flows, tuple, &flow, BPF_ANY);
}

static __always_inline void
update_message(struct aggregated_flow_tuple* tuple, size_t sent_bytes, size_t recv_bytes)
{
struct aggregated_flow_stat *val, empty = {};

__builtin_memset(&empty, 0, sizeof(struct aggregated_flow_stat));
bpf_map_update_elem(&flow_stats, tuple, &empty, BPF_NOEXIST);

val = bpf_map_lookup_elem(&flow_stats, tuple);
if (!val) return;
val->ts_us = bpf_ktime_get_ns() / 1000;

if (sent_bytes) {
__atomic_add_fetch(&val->sent_bytes, sent_bytes, __ATOMIC_RELAXED);
}
if (recv_bytes) {
__atomic_add_fetch(&val->recv_bytes, recv_bytes, __ATOMIC_RELAXED);
}
}

SEC("kprobe/tcp_v4_connect")
Expand Down Expand Up @@ -102,9 +115,10 @@ int BPF_KRETPROBE(tcp_v4_connect_ret, int ret)

struct sock* sk = *skpp;

BPF_CORE_READ_INTO(&dport, sk, __sk_common.skc_dport);

insert_tcp_flows(pid, sk, dport, FLOW_ACTIVE);
struct aggregated_flow_tuple tuple = {};
read_aggr_flow_tuple_for_tcp(&tuple, sk, FLOW_ACTIVE);
insert_tcp_flows(&tuple, pid);
update_message(&tuple, 0, 0);

log_debug("kretprobe/tcp_v4_connect: dport:%u, tid:%u\n", dport, pid_tgid);
end:
Expand All @@ -117,49 +131,88 @@ int BPF_KRETPROBE(inet_csk_accept_ret, struct sock *sk)
{
__u64 pid_tgid = bpf_get_current_pid_tgid();
__u32 pid = pid_tgid >> 32;
__u16 lport = 0;

if (!sk)
return 0;
if (!sk) return 0;

BPF_CORE_READ_INTO(&lport, sk, __sk_common.skc_num);

insert_tcp_flows(pid, sk, lport, FLOW_PASSIVE);
struct aggregated_flow_tuple tuple = {};
read_aggr_flow_tuple_for_tcp(&tuple, sk, FLOW_PASSIVE);
insert_tcp_flows(&tuple, pid);
update_port_binding(tuple.lport);
update_message(&tuple, 0, 0);

log_debug("kretprobe/inet_csk_accept: lport:%u,pid_tgid:%u\n", pid_tgid, lport);
return 0;
}

SEC("kprobe/tcp_sendmsg")
int BPF_KPROBE(tcp_sendmsg, struct sock* sk, struct msghdr *msg, size_t size) {
__u64 pid_tgid = bpf_get_current_pid_tgid();
__u32 pid = pid_tgid >> 32;
log_debug("kprobe/tcp_sendmsg: pid_tgid:%d, size:%d\n", pid_tgid, size);

struct aggregated_flow_tuple tuple = {};
read_aggr_flow_tuple_for_tcp(&tuple, sk, FLOW_UNKNOWN);
update_message(&tuple, size, 0);

return 0;
}

SEC("kprobe/tcp_cleanup_rbuf")
int BPF_KPROBE(tcp_cleanup_rbuf, struct sock* sk, int copied) {
if (copied < 0) {
return 0;
}
__u64 pid_tgid = bpf_get_current_pid_tgid();
__u32 pid = pid_tgid >> 32;
log_debug("kprobe/tcp_cleanup_rbuf: pid_tgid:%d, copied:%d\n", pid_tgid, copied);

struct aggregated_flow_tuple tuple = {};
read_aggr_flow_tuple_for_tcp(&tuple, sk, FLOW_UNKNOWN);
update_message(&tuple, 0, copied);

return 0;
}

// struct sock with udp_sendmsg may not miss ip addresses on listening socket.
// Addresses are retrieved from struct flowi4 with ip_make_skb.
// https://github.com/DataDog/datadog-agent/pull/6307
SEC("kprobe/ip_make_skb")
int BPF_KPROBE(ip_make_skb, struct sock *sk, struct flowi4 *flw4) {
size_t msglen = (size_t)PT_REGS_PARM5(ctx);

__u64 pid_tgid = bpf_get_current_pid_tgid();
__u32 pid = pid_tgid >> 32;
struct ipv4_flow_key flow_key = {};
struct aggregated_flow_tuple tuple = {};

read_flow_for_udp_send(&flow_key, sk, flw4);
insert_udp_flows(pid, &flow_key);
msglen = msglen - sizeof(struct udphdr);

log_debug("kprobe/ip_make_skb: lport:%u, tgid:%u\n",
flow_key.lport, pid_tgid);
read_flow_for_udp_send(&tuple, sk, flw4);
insert_udp_flows(pid, &tuple);
update_message(&tuple, msglen, 0);

log_debug("kprobe/ip_make_skb: lport:%u, msglen:%u, tgid:%u\n",
tuple.lport, msglen, pid_tgid);
return 0;
}

// struct sock with udp_recvmsg may not miss ip addresses on listening socket.
// Addresses are retrieved from arguments of skb_consume_udp.
SEC("kprobe/skb_consume_udp")
int BPF_KPROBE(skb_consume_udp, struct sock *sk, struct sk_buff *skb) {
int BPF_KPROBE(skb_consume_udp, struct sock *sk, struct sk_buff *skb, int len) {
if (len < 0) {
return 0;
}

__u64 pid_tgid = bpf_get_current_pid_tgid();
__u32 pid = pid_tgid >> 32;
struct ipv4_flow_key flow_key = {};
struct aggregated_flow_tuple tuple = {};

read_flow_for_udp_recv(&flow_key, sk, skb);
insert_udp_flows(pid, &flow_key);
read_flow_for_udp_recv(&tuple, sk, skb);
insert_udp_flows(pid, &tuple);
update_message(&tuple, 0, len);

log_debug("kprobe/skb_consume_udp: sport:%u, dport:%u, tid:%u\n",
flow_key.lport, pid_tgid);
log_debug("kprobe/skb_consume_udp: lport:%u, len:%u, tid:%u\n",
tuple.lport, len, pid_tgid);
return 0;
}

Expand Down
72 changes: 52 additions & 20 deletions bpf/conntracer_bpf_read.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,60 @@ void read_flow_tuple_for_tcp(struct flow_tuple *tuple, struct sock *sk, pid_t pi
tuple->l4_proto = IPPROTO_TCP;
}

static __always_inline void read_flow_for_udp_send(struct ipv4_flow_key *flow_key, struct sock *sk, struct flowi4 *flw4) {
static __always_inline
void read_aggr_flow_tuple_for_tcp(struct aggregated_flow_tuple *tuple, struct sock *sk, flow_direction direction) {
__u16 sport, dport;

BPF_CORE_READ_INTO(&tuple->saddr, sk, __sk_common.skc_rcv_saddr);
BPF_CORE_READ_INTO(&tuple->daddr, sk, __sk_common.skc_daddr);
BPF_CORE_READ_INTO(&sport, sk, __sk_common.skc_num);
BPF_CORE_READ_INTO(&dport, sk, __sk_common.skc_dport);

tuple->l4_proto = IPPROTO_TCP;

struct port_binding_key pb = {};
switch (direction) {
case FLOW_ACTIVE:
tuple->lport = bpf_ntohs(dport);
break;
case FLOW_PASSIVE:
tuple->lport = bpf_ntohs(sport);
break;
case FLOW_UNKNOWN:
pb.port = bpf_ntohs(sport);
__u8 *ok = bpf_map_lookup_elem(&tcp_port_binding, &pb);
direction = ok ? FLOW_PASSIVE : FLOW_ACTIVE;
tuple->lport = ok ? sport : dport;
break;
default:
log_debug("unreachable statement\n");
break;
}
tuple->direction = direction;
}

static __always_inline void read_flow_for_udp_send(struct aggregated_flow_tuple *tuple, struct sock *sk, struct flowi4 *flw4) {
__u16 dport, sport;

BPF_CORE_READ_INTO(&sport, sk, __sk_common.skc_num);
BPF_CORE_READ_INTO(&dport, sk, __sk_common.skc_dport);

__u8 *sstate = bpf_map_lookup_elem(&udp_port_binding, &sport);
if (sstate) {
BPF_CORE_READ_INTO(&flow_key->saddr, flw4, daddr);
BPF_CORE_READ_INTO(&flow_key->daddr, flw4, saddr);
flow_key->direction = FLOW_PASSIVE;
flow_key->lport = bpf_htons(sport);
BPF_CORE_READ_INTO(&tuple->saddr, flw4, daddr);
BPF_CORE_READ_INTO(&tuple->daddr, flw4, saddr);
tuple->direction = FLOW_PASSIVE;
tuple->lport = bpf_htons(sport);
} else {
BPF_CORE_READ_INTO(&flow_key->saddr, flw4, saddr);
BPF_CORE_READ_INTO(&flow_key->daddr, flw4, daddr);
flow_key->direction = FLOW_ACTIVE;
flow_key->lport = dport;
BPF_CORE_READ_INTO(&tuple->saddr, flw4, saddr);
BPF_CORE_READ_INTO(&tuple->daddr, flw4, daddr);
tuple->direction = FLOW_ACTIVE;
tuple->lport = dport;
}
flow_key->l4_proto = IPPROTO_UDP;
tuple->l4_proto = IPPROTO_UDP;
}

static __always_inline void read_flow_for_udp_recv(struct ipv4_flow_key *flow_key, struct sock *sk, struct sk_buff *skb) {
static __always_inline void read_flow_for_udp_recv(struct aggregated_flow_tuple *tuple, struct sock *sk, struct sk_buff *skb) {
struct udphdr *udphdr = (struct udphdr *)(BPF_CORE_READ(skb, head)
+ BPF_CORE_READ(skb,transport_header));
struct iphdr *iphdr = (struct iphdr *)(BPF_CORE_READ(skb, head)
Expand All @@ -52,18 +84,18 @@ static __always_inline void read_flow_for_udp_recv(struct ipv4_flow_key *flow_ke
__u16 dport_key = bpf_htons(dport);
__u8 *sstate = bpf_map_lookup_elem(&udp_port_binding, &dport_key);
if (sstate) {
flow_key->saddr = BPF_CORE_READ(iphdr, saddr);
flow_key->daddr = BPF_CORE_READ(iphdr, daddr);
flow_key->direction = FLOW_PASSIVE;
flow_key->lport = dport;
tuple->saddr = BPF_CORE_READ(iphdr, saddr);
tuple->daddr = BPF_CORE_READ(iphdr, daddr);
tuple->direction = FLOW_PASSIVE;
tuple->lport = dport;
} else {
flow_key->saddr = BPF_CORE_READ(iphdr, daddr);
flow_key->daddr = BPF_CORE_READ(iphdr, saddr);
flow_key->direction = FLOW_ACTIVE;
flow_key->lport = sport;
tuple->saddr = BPF_CORE_READ(iphdr, daddr);
tuple->daddr = BPF_CORE_READ(iphdr, saddr);
tuple->direction = FLOW_ACTIVE;
tuple->lport = sport;
}

flow_key->l4_proto = IPPROTO_UDP;
tuple->l4_proto = IPPROTO_UDP;
}

static __always_inline
Expand Down
4 changes: 2 additions & 2 deletions bpf/conntracer_in_flow_aggr.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, struct flow_tuple);
__type(value, struct single_flow);
__uint(max_entries, MAX_FLOW_ENTRIES);
__uint(max_entries, MAX_SINGLE_FLOW_ENTRIES);
__uint(map_flags, BPF_F_NO_PREALLOC);
} flows SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, struct flow_tuple);
__type(value, struct single_flow_stat);
__uint(max_entries, MAX_FLOW_ENTRIES);
__uint(max_entries, MAX_SINGLE_FLOW_ENTRIES);
__uint(map_flags, BPF_F_NO_PREALLOC);
} flow_stats SEC(".maps");

Expand Down
6 changes: 3 additions & 3 deletions bpf/conntracer_streaming.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ insert_tcp_flows(pid_t pid, struct sock *sk, __u16 lport, __u8 direction)
}

static __always_inline void
insert_udp_flows(pid_t pid, struct ipv4_flow_key* flow_key)
insert_udp_flows(pid_t pid, struct aggregated_flow_tuple* flow_key)
{
struct single_flow *flow;

Expand Down Expand Up @@ -132,7 +132,7 @@ SEC("kprobe/ip_make_skb")
int BPF_KPROBE(ip_make_skb, struct sock *sk, struct flowi4 *flw4) {
__u64 pid_tgid = bpf_get_current_pid_tgid();
__u32 pid = pid_tgid >> 32;
struct ipv4_flow_key flow_key = {};
struct aggregated_flow_tuple flow_key = {};

read_flow_for_udp_send(&flow_key, sk, flw4);
insert_udp_flows(pid, &flow_key);
Expand All @@ -148,7 +148,7 @@ SEC("kprobe/skb_consume_udp")
int BPF_KPROBE(skb_consume_udp, struct sock *sk, struct sk_buff *skb) {
__u64 pid_tgid = bpf_get_current_pid_tgid();
__u32 pid = pid_tgid >> 32;
struct ipv4_flow_key flow_key = {};
struct aggregated_flow_tuple flow_key = {};

read_flow_for_udp_recv(&flow_key, sk, skb);
insert_udp_flows(pid, &flow_key);
Expand Down
16 changes: 13 additions & 3 deletions bpf/maps.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,19 @@ struct {
__uint(map_flags, BPF_F_NO_PREALLOC);
} tcp_connect_sockets SEC(".maps");

// udp_port_binding is a map for tracking LISNING or CLOSED ports.
// udp_port_binding enables to register entire local ports and
// insert or update the port number and state at the timing when the port state changes.
/* tcp_port_binding is a map for tracing listening TCP ports.
Entries are added to the map in the context of the inet_csk_accept syscall.
*/
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__uint(max_entries, MAX_PORT_BINDING_ENTRIES);
__type(key, struct port_binding_key);
__type(value, __u8); // protocol state
} tcp_port_binding SEC(".maps");

/* udp_port_binding is a map for tracking UDP LISNING or CLOSED ports.
udp_port_binding enables to register entire local ports and insert or update the port number and state at the timing when the port state changes.
*/
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__uint(max_entries, MAX_PORT_BINDING_ENTRIES);
Expand Down