Skip to content

Commit

Permalink
Merge pull request #12 from yuuki/in-kernel-aggr-bytes
Browse files Browse the repository at this point in the history
trace tcp/udp message bytes for in-kernel-aggregation
  • Loading branch information
yuuki committed Apr 2, 2021
2 parents f2ed38f + 2264496 commit 3ca7ee9
Show file tree
Hide file tree
Showing 14 changed files with 5,191 additions and 4,715 deletions.
153 changes: 103 additions & 50 deletions bpf/conntracer.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,56 +20,69 @@ char LICENSE[] SEC("license") = "Dual BSD/GPL";

struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, struct ipv4_flow_key);
__type(key, struct aggregated_flow_tuple);
__type(value, struct aggregated_flow);
__uint(max_entries, MAX_FLOW_ENTRIES);
__uint(map_flags, BPF_F_NO_PREALLOC);
} flows SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, struct aggregated_flow_tuple);
__type(value, struct aggregated_flow_stat);
__uint(max_entries, MAX_FLOW_ENTRIES);
__uint(map_flags, BPF_F_NO_PREALLOC);
} flow_stats SEC(".maps");

static __always_inline void
insert_tcp_flows(pid_t pid, struct sock *sk, __u16 lport, __u8 direction)
{
insert_tcp_flows(struct aggregated_flow_tuple *tuple, pid_t pid) {
struct aggregated_flow flow = {}, *val;
struct ipv4_flow_key flow_key = {};

BPF_CORE_READ_INTO(&flow.saddr, sk, __sk_common.skc_rcv_saddr);
BPF_CORE_READ_INTO(&flow.daddr, sk, __sk_common.skc_daddr);
flow.lport = lport;
flow.ts_us = bpf_ktime_get_ns() / 1000;
flow.saddr = tuple->saddr;
flow.daddr = tuple->daddr;
flow.lport = tuple->lport;
flow.pid = pid;
flow.direction = direction;
flow.direction = tuple->direction;
bpf_get_current_comm(flow.task, sizeof(flow.task));

flow_key.saddr = flow.saddr;
flow_key.daddr = flow.daddr;
flow_key.lport = flow.lport;
flow_key.direction = flow.direction;
flow_key.l4_proto = IPPROTO_TCP;

val = bpf_map_lookup_elem(&flows, &flow_key);
if (val) {
__u32 *cnt = &(val->stat.connections);
__atomic_add_fetch(cnt, 1, __ATOMIC_RELAXED);
return;
}

flow.stat.connections = 1;
bpf_map_update_elem(&flows, &flow_key, &flow, BPF_ANY);
bpf_map_update_elem(&flows, tuple, &flow, BPF_ANY);
}

static __always_inline void
insert_udp_flows(pid_t pid, struct ipv4_flow_key* flow_key)
insert_udp_flows(pid_t pid, struct aggregated_flow_tuple* tuple)
{
struct aggregated_flow flow = {};

flow.saddr = flow_key->saddr;
flow.daddr = flow_key->daddr;
flow.lport = flow_key->lport;
flow.direction = flow_key->direction;
flow.l4_proto = flow_key->l4_proto;
flow.saddr = tuple->saddr;
flow.daddr = tuple->daddr;
flow.lport = tuple->lport;
flow.direction = tuple->direction;
flow.l4_proto = tuple->l4_proto;
flow.pid = pid;
bpf_get_current_comm(flow.task, sizeof(flow.task));

bpf_map_update_elem(&flows, flow_key, &flow, BPF_ANY);
bpf_map_update_elem(&flows, tuple, &flow, BPF_ANY);
}

static __always_inline void
update_message(struct aggregated_flow_tuple* tuple, size_t sent_bytes, size_t recv_bytes)
{
struct aggregated_flow_stat *val, empty = {};

__builtin_memset(&empty, 0, sizeof(struct aggregated_flow_stat));
bpf_map_update_elem(&flow_stats, tuple, &empty, BPF_NOEXIST);

val = bpf_map_lookup_elem(&flow_stats, tuple);
if (!val) return;
val->ts_us = bpf_ktime_get_ns() / 1000;

if (sent_bytes) {
__atomic_add_fetch(&val->sent_bytes, sent_bytes, __ATOMIC_RELAXED);
}
if (recv_bytes) {
__atomic_add_fetch(&val->recv_bytes, recv_bytes, __ATOMIC_RELAXED);
}
}

SEC("kprobe/tcp_v4_connect")
Expand Down Expand Up @@ -102,9 +115,10 @@ int BPF_KRETPROBE(tcp_v4_connect_ret, int ret)

struct sock* sk = *skpp;

BPF_CORE_READ_INTO(&dport, sk, __sk_common.skc_dport);

insert_tcp_flows(pid, sk, dport, FLOW_ACTIVE);
struct aggregated_flow_tuple tuple = {};
read_aggr_flow_tuple_for_tcp(&tuple, sk, FLOW_ACTIVE);
insert_tcp_flows(&tuple, pid);
update_message(&tuple, 0, 0);

log_debug("kretprobe/tcp_v4_connect: dport:%u, tid:%u\n", dport, pid_tgid);
end:
Expand All @@ -117,49 +131,88 @@ int BPF_KRETPROBE(inet_csk_accept_ret, struct sock *sk)
{
__u64 pid_tgid = bpf_get_current_pid_tgid();
__u32 pid = pid_tgid >> 32;
__u16 lport = 0;

if (!sk)
return 0;
if (!sk) return 0;

BPF_CORE_READ_INTO(&lport, sk, __sk_common.skc_num);

insert_tcp_flows(pid, sk, lport, FLOW_PASSIVE);
struct aggregated_flow_tuple tuple = {};
read_aggr_flow_tuple_for_tcp(&tuple, sk, FLOW_PASSIVE);
insert_tcp_flows(&tuple, pid);
update_port_binding(tuple.lport);
update_message(&tuple, 0, 0);

log_debug("kretprobe/inet_csk_accept: lport:%u,pid_tgid:%u\n", pid_tgid, lport);
return 0;
}

SEC("kprobe/tcp_sendmsg")
int BPF_KPROBE(tcp_sendmsg, struct sock* sk, struct msghdr *msg, size_t size) {
__u64 pid_tgid = bpf_get_current_pid_tgid();
__u32 pid = pid_tgid >> 32;
log_debug("kprobe/tcp_sendmsg: pid_tgid:%d, size:%d\n", pid_tgid, size);

struct aggregated_flow_tuple tuple = {};
read_aggr_flow_tuple_for_tcp(&tuple, sk, FLOW_UNKNOWN);
update_message(&tuple, size, 0);

return 0;
}

SEC("kprobe/tcp_cleanup_rbuf")
int BPF_KPROBE(tcp_cleanup_rbuf, struct sock* sk, int copied) {
if (copied < 0) {
return 0;
}
__u64 pid_tgid = bpf_get_current_pid_tgid();
__u32 pid = pid_tgid >> 32;
log_debug("kprobe/tcp_cleanup_rbuf: pid_tgid:%d, copied:%d\n", pid_tgid, copied);

struct aggregated_flow_tuple tuple = {};
read_aggr_flow_tuple_for_tcp(&tuple, sk, FLOW_UNKNOWN);
update_message(&tuple, 0, copied);

return 0;
}

// struct sock with udp_sendmsg may not miss ip addresses on listening socket.
// Addresses are retrieved from struct flowi4 with ip_make_skb.
// https://github.com/DataDog/datadog-agent/pull/6307
SEC("kprobe/ip_make_skb")
int BPF_KPROBE(ip_make_skb, struct sock *sk, struct flowi4 *flw4) {
size_t msglen = (size_t)PT_REGS_PARM5(ctx);

__u64 pid_tgid = bpf_get_current_pid_tgid();
__u32 pid = pid_tgid >> 32;
struct ipv4_flow_key flow_key = {};
struct aggregated_flow_tuple tuple = {};

read_flow_for_udp_send(&flow_key, sk, flw4);
insert_udp_flows(pid, &flow_key);
msglen = msglen - sizeof(struct udphdr);

log_debug("kprobe/ip_make_skb: lport:%u, tgid:%u\n",
flow_key.lport, pid_tgid);
read_flow_for_udp_send(&tuple, sk, flw4);
insert_udp_flows(pid, &tuple);
update_message(&tuple, msglen, 0);

log_debug("kprobe/ip_make_skb: lport:%u, msglen:%u, tgid:%u\n",
tuple.lport, msglen, pid_tgid);
return 0;
}

// struct sock with udp_recvmsg may not miss ip addresses on listening socket.
// Addresses are retrieved from arguments of skb_consume_udp.
SEC("kprobe/skb_consume_udp")
int BPF_KPROBE(skb_consume_udp, struct sock *sk, struct sk_buff *skb) {
int BPF_KPROBE(skb_consume_udp, struct sock *sk, struct sk_buff *skb, int len) {
if (len < 0) {
return 0;
}

__u64 pid_tgid = bpf_get_current_pid_tgid();
__u32 pid = pid_tgid >> 32;
struct ipv4_flow_key flow_key = {};
struct aggregated_flow_tuple tuple = {};

read_flow_for_udp_recv(&flow_key, sk, skb);
insert_udp_flows(pid, &flow_key);
read_flow_for_udp_recv(&tuple, sk, skb);
insert_udp_flows(pid, &tuple);
update_message(&tuple, 0, len);

log_debug("kprobe/skb_consume_udp: sport:%u, dport:%u, tid:%u\n",
flow_key.lport, pid_tgid);
log_debug("kprobe/skb_consume_udp: lport:%u, len:%u, tid:%u\n",
tuple.lport, len, pid_tgid);
return 0;
}

Expand Down
72 changes: 52 additions & 20 deletions bpf/conntracer_bpf_read.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,60 @@ void read_flow_tuple_for_tcp(struct flow_tuple *tuple, struct sock *sk, pid_t pi
tuple->l4_proto = IPPROTO_TCP;
}

static __always_inline void read_flow_for_udp_send(struct ipv4_flow_key *flow_key, struct sock *sk, struct flowi4 *flw4) {
static __always_inline
void read_aggr_flow_tuple_for_tcp(struct aggregated_flow_tuple *tuple, struct sock *sk, flow_direction direction) {
__u16 sport, dport;

BPF_CORE_READ_INTO(&tuple->saddr, sk, __sk_common.skc_rcv_saddr);
BPF_CORE_READ_INTO(&tuple->daddr, sk, __sk_common.skc_daddr);
BPF_CORE_READ_INTO(&sport, sk, __sk_common.skc_num);
BPF_CORE_READ_INTO(&dport, sk, __sk_common.skc_dport);

tuple->l4_proto = IPPROTO_TCP;

struct port_binding_key pb = {};
switch (direction) {
case FLOW_ACTIVE:
tuple->lport = bpf_ntohs(dport);
break;
case FLOW_PASSIVE:
tuple->lport = bpf_ntohs(sport);
break;
case FLOW_UNKNOWN:
pb.port = bpf_ntohs(sport);
__u8 *ok = bpf_map_lookup_elem(&tcp_port_binding, &pb);
direction = ok ? FLOW_PASSIVE : FLOW_ACTIVE;
tuple->lport = ok ? sport : dport;
break;
default:
log_debug("unreachable statement\n");
break;
}
tuple->direction = direction;
}

static __always_inline void read_flow_for_udp_send(struct aggregated_flow_tuple *tuple, struct sock *sk, struct flowi4 *flw4) {
__u16 dport, sport;

BPF_CORE_READ_INTO(&sport, sk, __sk_common.skc_num);
BPF_CORE_READ_INTO(&dport, sk, __sk_common.skc_dport);

__u8 *sstate = bpf_map_lookup_elem(&udp_port_binding, &sport);
if (sstate) {
BPF_CORE_READ_INTO(&flow_key->saddr, flw4, daddr);
BPF_CORE_READ_INTO(&flow_key->daddr, flw4, saddr);
flow_key->direction = FLOW_PASSIVE;
flow_key->lport = bpf_htons(sport);
BPF_CORE_READ_INTO(&tuple->saddr, flw4, daddr);
BPF_CORE_READ_INTO(&tuple->daddr, flw4, saddr);
tuple->direction = FLOW_PASSIVE;
tuple->lport = bpf_htons(sport);
} else {
BPF_CORE_READ_INTO(&flow_key->saddr, flw4, saddr);
BPF_CORE_READ_INTO(&flow_key->daddr, flw4, daddr);
flow_key->direction = FLOW_ACTIVE;
flow_key->lport = dport;
BPF_CORE_READ_INTO(&tuple->saddr, flw4, saddr);
BPF_CORE_READ_INTO(&tuple->daddr, flw4, daddr);
tuple->direction = FLOW_ACTIVE;
tuple->lport = dport;
}
flow_key->l4_proto = IPPROTO_UDP;
tuple->l4_proto = IPPROTO_UDP;
}

static __always_inline void read_flow_for_udp_recv(struct ipv4_flow_key *flow_key, struct sock *sk, struct sk_buff *skb) {
static __always_inline void read_flow_for_udp_recv(struct aggregated_flow_tuple *tuple, struct sock *sk, struct sk_buff *skb) {
struct udphdr *udphdr = (struct udphdr *)(BPF_CORE_READ(skb, head)
+ BPF_CORE_READ(skb,transport_header));
struct iphdr *iphdr = (struct iphdr *)(BPF_CORE_READ(skb, head)
Expand All @@ -52,18 +84,18 @@ static __always_inline void read_flow_for_udp_recv(struct ipv4_flow_key *flow_ke
__u16 dport_key = bpf_htons(dport);
__u8 *sstate = bpf_map_lookup_elem(&udp_port_binding, &dport_key);
if (sstate) {
flow_key->saddr = BPF_CORE_READ(iphdr, saddr);
flow_key->daddr = BPF_CORE_READ(iphdr, daddr);
flow_key->direction = FLOW_PASSIVE;
flow_key->lport = dport;
tuple->saddr = BPF_CORE_READ(iphdr, saddr);
tuple->daddr = BPF_CORE_READ(iphdr, daddr);
tuple->direction = FLOW_PASSIVE;
tuple->lport = dport;
} else {
flow_key->saddr = BPF_CORE_READ(iphdr, daddr);
flow_key->daddr = BPF_CORE_READ(iphdr, saddr);
flow_key->direction = FLOW_ACTIVE;
flow_key->lport = sport;
tuple->saddr = BPF_CORE_READ(iphdr, daddr);
tuple->daddr = BPF_CORE_READ(iphdr, saddr);
tuple->direction = FLOW_ACTIVE;
tuple->lport = sport;
}

flow_key->l4_proto = IPPROTO_UDP;
tuple->l4_proto = IPPROTO_UDP;
}

static __always_inline
Expand Down
4 changes: 2 additions & 2 deletions bpf/conntracer_in_flow_aggr.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, struct flow_tuple);
__type(value, struct single_flow);
__uint(max_entries, MAX_FLOW_ENTRIES);
__uint(max_entries, MAX_SINGLE_FLOW_ENTRIES);
__uint(map_flags, BPF_F_NO_PREALLOC);
} flows SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, struct flow_tuple);
__type(value, struct single_flow_stat);
__uint(max_entries, MAX_FLOW_ENTRIES);
__uint(max_entries, MAX_SINGLE_FLOW_ENTRIES);
__uint(map_flags, BPF_F_NO_PREALLOC);
} flow_stats SEC(".maps");

Expand Down
6 changes: 3 additions & 3 deletions bpf/conntracer_streaming.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ insert_tcp_flows(pid_t pid, struct sock *sk, __u16 lport, __u8 direction)
}

static __always_inline void
insert_udp_flows(pid_t pid, struct ipv4_flow_key* flow_key)
insert_udp_flows(pid_t pid, struct aggregated_flow_tuple* flow_key)
{
struct single_flow *flow;

Expand Down Expand Up @@ -132,7 +132,7 @@ SEC("kprobe/ip_make_skb")
int BPF_KPROBE(ip_make_skb, struct sock *sk, struct flowi4 *flw4) {
__u64 pid_tgid = bpf_get_current_pid_tgid();
__u32 pid = pid_tgid >> 32;
struct ipv4_flow_key flow_key = {};
struct aggregated_flow_tuple flow_key = {};

read_flow_for_udp_send(&flow_key, sk, flw4);
insert_udp_flows(pid, &flow_key);
Expand All @@ -148,7 +148,7 @@ SEC("kprobe/skb_consume_udp")
int BPF_KPROBE(skb_consume_udp, struct sock *sk, struct sk_buff *skb) {
__u64 pid_tgid = bpf_get_current_pid_tgid();
__u32 pid = pid_tgid >> 32;
struct ipv4_flow_key flow_key = {};
struct aggregated_flow_tuple flow_key = {};

read_flow_for_udp_recv(&flow_key, sk, skb);
insert_udp_flows(pid, &flow_key);
Expand Down
16 changes: 13 additions & 3 deletions bpf/maps.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,19 @@ struct {
__uint(map_flags, BPF_F_NO_PREALLOC);
} tcp_connect_sockets SEC(".maps");

// udp_port_binding is a map for tracking LISNING or CLOSED ports.
// udp_port_binding enables to register entire local ports and
// insert or update the port number and state at the timing when the port state changes.
/* tcp_port_binding is a map for tracing listening TCP ports.
Entries are added to the map in the context of the inet_csk_accept syscall.
*/
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__uint(max_entries, MAX_PORT_BINDING_ENTRIES);
__type(key, struct port_binding_key);
__type(value, __u8); // protocol state
} tcp_port_binding SEC(".maps");

/* udp_port_binding is a map for tracking UDP LISNING or CLOSED ports.
udp_port_binding enables to register entire local ports and insert or update the port number and state at the timing when the port state changes.
*/
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__uint(max_entries, MAX_PORT_BINDING_ENTRIES);
Expand Down

0 comments on commit 3ca7ee9

Please sign in to comment.