Skip to content

Commit

Permalink
change rtt logic to use kprobe and get rtt from tcp socket
Browse files Browse the repository at this point in the history
Signed-off-by: Mohamed Mahmoud <mmahmoud@redhat.com>
  • Loading branch information
msherif1234 committed Jan 3, 2024
1 parent 554f400 commit fd0b425
Show file tree
Hide file tree
Showing 11 changed files with 343 additions and 204 deletions.
1 change: 0 additions & 1 deletion bpf/configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
// Constant definitions, to be overridden by the invoker
volatile const u32 sampling = 0;
volatile const u8 trace_messages = 0;
volatile const u8 enable_rtt = 0;
volatile const u16 pca_port = 0;
volatile const u8 pca_proto = 0;
volatile const u8 enable_dns_tracking = 0;
Expand Down
11 changes: 0 additions & 11 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
id.if_index = skb->ifindex;
id.direction = direction;

// We calculate the RTT before looking up aggregated_flows map because we want
// to keep the critical section between map lookup and update consume minimum time.
if (enable_rtt) {
// This is currently not to be enabled by default.
calculate_flow_rtt(&pkt, direction, data_end);
}
int dns_errno = 0;
if (enable_dns_tracking) {
dns_errno = track_dns_packet(skb, &pkt);
Expand All @@ -90,10 +84,6 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
}
aggregate_flow->flags |= pkt.flags;
aggregate_flow->dscp = pkt.dscp;
// Does not matter the gate. Will be zero if not enabled.
if (pkt.rtt > aggregate_flow->flow_rtt) {
aggregate_flow->flow_rtt = pkt.rtt;
}
aggregate_flow->dns_record.id = pkt.dns_id;
aggregate_flow->dns_record.flags = pkt.dns_flags;
aggregate_flow->dns_record.latency = pkt.dns_latency;
Expand All @@ -115,7 +105,6 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
.start_mono_time_ts = pkt.current_ts,
.end_mono_time_ts = pkt.current_ts,
.flags = pkt.flags,
.flow_rtt = pkt.rtt,
.dscp = pkt.dscp,
.dns_record.id = pkt.dns_id,
.dns_record.flags = pkt.dns_flags,
Expand Down
12 changes: 0 additions & 12 deletions bpf/maps_definition.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,6 @@ struct {
__uint(map_flags, BPF_F_NO_PREALLOC);
} aggregated_flows SEC(".maps");

// Common hashmap to keep track of all flow sequences.
// Key is flow_seq_id which is standard 4 tuple and a sequence id
// sequence id is specific to the type of transport protocol
// Value is u64 which represents the occurrence timestamp of the packet.
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__uint(max_entries, 1 << 20); // Will take around 64MB of space.
__type(key, flow_seq_id);
__type(value, u64);
__uint(map_flags, BPF_F_NO_PREALLOC);
} flow_sequences SEC(".maps");

//PerfEvent Array for Packet Payloads
struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
Expand Down
232 changes: 118 additions & 114 deletions bpf/rtt_tracker.h
Original file line number Diff line number Diff line change
@@ -1,143 +1,147 @@
/*
A simple RTT tracker implemented to be used at the ebpf layer inside the flow_monitor hookpoint.
This tracker currently tracks RTT for TCP flows by looking at the TCP start sequence and estimates
RTT by perform (timestamp of receiveing ack packet - timestamp of sending syn packet)
A simple RTT tracker implemented to be using eBPF kprobe hook.
*/

#ifndef __RTT_TRACKER_H__
#define __RTT_TRACKER_H__

#include <bpf_core_read.h>
#include <bpf_tracing.h>
#include "utils.h"
#include "maps_definition.h"

const u64 MIN_RTT = 50000; //50 micro seconds
static __always_inline void rtt_fill_in_l2(struct sk_buff *skb, flow_id *id) {
u16 skb_mac_header = BPF_CORE_READ(skb, mac_header);
u8 *skb_head = BPF_CORE_READ(skb, head);
struct ethhdr eth;

static __always_inline void fill_flow_seq_id(flow_seq_id *seq_id, pkt_info *pkt, u32 seq, bool reverse) {
flow_id *id = pkt->id;
if (reverse) {
__builtin_memcpy(seq_id->src_ip, id->dst_ip, IP_MAX_LEN);
__builtin_memcpy(seq_id->dst_ip, id->src_ip, IP_MAX_LEN);
seq_id->src_port = id->dst_port;
seq_id->dst_port = id->src_port;
} else {
__builtin_memcpy(seq_id->src_ip, id->src_ip, IP_MAX_LEN);
__builtin_memcpy(seq_id->dst_ip, id->dst_ip, IP_MAX_LEN);
seq_id->src_port = id->src_port;
seq_id->dst_port = id->dst_port;
}
seq_id->transport_protocol = id->transport_protocol;
seq_id->seq_id = seq;
seq_id->if_index = id->if_index;
}
__builtin_memset(&eth, 0, sizeof(eth));

static __always_inline void reverse_flow_id_struct(flow_id *src, flow_id *dst) {
// Fields which remain same
dst->eth_protocol = src->eth_protocol;
dst->transport_protocol = src->transport_protocol;
dst->if_index = src->if_index;

// Fields which should be reversed
dst->direction = (src->direction == INGRESS) ? EGRESS : INGRESS;
__builtin_memcpy(dst->src_mac, src->dst_mac, ETH_ALEN);
__builtin_memcpy(dst->dst_mac, src->src_mac, ETH_ALEN);
__builtin_memcpy(dst->src_ip, src->dst_ip, IP_MAX_LEN);
__builtin_memcpy(dst->dst_ip, src->src_ip, IP_MAX_LEN);
dst->src_port = src->dst_port;
dst->dst_port = src->src_port;
/* ICMP type can be ignore for now. We only deal with TCP packets for now.*/
bpf_probe_read(&eth, sizeof(eth), (struct ethhdr *)(skb_head + skb_mac_header));
__builtin_memcpy(id->dst_mac, eth.h_dest, ETH_ALEN);
__builtin_memcpy(id->src_mac, eth.h_source, ETH_ALEN);
id->eth_protocol = bpf_ntohs(eth.h_proto);
}

static __always_inline void update_reverse_flow_rtt(pkt_info *pkt, u32 seq) {
flow_id rev_flow_id;
__builtin_memset(&rev_flow_id, 0, sizeof(rev_flow_id));
reverse_flow_id_struct(pkt->id, &rev_flow_id);

flow_metrics *reverse_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &rev_flow_id);
if (reverse_flow != NULL) {
if (pkt->rtt > reverse_flow->flow_rtt) {
reverse_flow->flow_rtt = pkt->rtt;
long ret = bpf_map_update_elem(&aggregated_flows, &rev_flow_id, reverse_flow, BPF_EXIST);
if (trace_messages && ret != 0) {
bpf_printk("error updating rtt value in flow %d\n", ret);
}
}
}
}

static __always_inline void __calculate_tcp_rtt(pkt_info *pkt, struct tcphdr *tcp, flow_seq_id *seq_id) {
// Stored sequence should be ack_seq - 1
u32 seq = bpf_ntohl(tcp->ack_seq) - 1;
// check reversed flow
fill_flow_seq_id(seq_id, pkt, seq, true);

u64 *prev_ts = (u64 *)bpf_map_lookup_elem(&flow_sequences, seq_id);
if (prev_ts != NULL) {
u64 rtt = pkt->current_ts - *prev_ts;
/**
* FIXME: Because of SAMPLING the way it is done if we miss one of SYN/SYN+ACK/ACK
* then we can get RTT values which are the process response time rather than actual RTT.
* This check below clears them out but needs to be modified with a better solution or change
* the algorithm for calculating RTT so it doesn't interact with SAMPLING like this.
*/
if (rtt < MIN_RTT) {
return;
static __always_inline void rtt_fill_in_l3(struct sk_buff *skb, flow_id *id, u16 family, u8 *dscp) {
u16 skb_network_header = BPF_CORE_READ(skb, network_header);
u8 *skb_head = BPF_CORE_READ(skb, head);

switch (family) {
case AF_INET: {
struct iphdr ip;
__builtin_memset(&ip, 0, sizeof(ip));
bpf_probe_read(&ip, sizeof(ip), (struct iphdr *)(skb_head + skb_network_header));
__builtin_memcpy(id->src_ip, ip4in6, sizeof(ip4in6));
__builtin_memcpy(id->dst_ip, ip4in6, sizeof(ip4in6));
__builtin_memcpy(id->src_ip + sizeof(ip4in6), &ip.saddr, sizeof(ip.saddr));
__builtin_memcpy(id->dst_ip + sizeof(ip4in6), &ip.daddr, sizeof(ip.daddr));
*dscp = ipv4_get_dscp(&ip);
break;
}
pkt->rtt = rtt;
// Delete the flow from flow sequence map so if it
// restarts we have a new RTT calculation.
long ret = bpf_map_delete_elem(&flow_sequences, seq_id);
if (trace_messages && ret != 0) {
bpf_printk("error evicting flow sequence: %d", ret);
case AF_INET6: {
struct ipv6hdr ip;
__builtin_memset(&ip, 0, sizeof(ip));
bpf_probe_read(&ip, sizeof(ip), (struct ipv6hdr *)(skb_head + skb_network_header));
__builtin_memcpy(id->src_ip, ip.saddr.in6_u.u6_addr8, IP_MAX_LEN);
__builtin_memcpy(id->dst_ip, ip.daddr.in6_u.u6_addr8, IP_MAX_LEN);
*dscp = ipv6_get_dscp(&ip);
break;
}
// This is an ACK packet with valid sequence id so a SYN must
// have been sent. We can safely update the reverse flow RTT here.
update_reverse_flow_rtt(pkt, seq);
default:
return;
}
return;
}

static __always_inline void __store_tcp_ts(pkt_info *pkt, struct tcphdr *tcp, flow_seq_id *seq_id) {
// store timestamp of syn packets.
u32 seq = bpf_ntohl(tcp->seq);
fill_flow_seq_id(seq_id, pkt, seq, false);
long ret = bpf_map_update_elem(&flow_sequences, seq_id, &pkt->current_ts, BPF_NOEXIST);
if (trace_messages && ret != 0) {
bpf_printk("err saving flow sequence record %d", ret);
}
return;
static __always_inline void rtt_fill_in_tcp(struct sk_buff *skb, flow_id *id, u16 *flags) {
u16 skb_transport_header = BPF_CORE_READ(skb, transport_header);
u8 *skb_head = BPF_CORE_READ(skb, head);
struct tcphdr tcp;
u16 sport, dport;

__builtin_memset(&tcp, 0, sizeof(tcp));

bpf_probe_read(&tcp, sizeof(tcp), (struct tcphdr *)(skb_head + skb_transport_header));
sport = bpf_ntohs(tcp.source);
dport = bpf_ntohs(tcp.dest);
id->src_port = sport;
id->dst_port = dport;
set_flags(&tcp, flags);
id->transport_protocol = IPPROTO_TCP;
}

static __always_inline void calculate_flow_rtt_tcp(pkt_info *pkt, u8 direction, void *data_end, flow_seq_id *seq_id) {
struct tcphdr *tcp = (struct tcphdr *) pkt->l4_hdr;
if ( !tcp || ((void *)tcp + sizeof(*tcp) > data_end) ) {
return;
}
static __always_inline int calculate_flow_rtt_tcp(struct sock *sk, struct sk_buff *skb) {
struct tcp_sock *ts;
u16 family, flags;
u64 rtt, len;
int ret = 0;
flow_id id;
u8 dscp;

/* We calculate RTT for both SYN/SYN+ACK and SYN+ACK/ACK and take the maximum of both.*/
if (tcp->syn && tcp->ack) { // SYN ACK Packet
__calculate_tcp_rtt(pkt, tcp, seq_id);
__store_tcp_ts(pkt, tcp, seq_id);
if (skb == NULL) {
return 0;
}
else if (tcp->ack) {
__calculate_tcp_rtt(pkt, tcp, seq_id);
}
else if (tcp->syn) {
__store_tcp_ts(pkt, tcp, seq_id);

__builtin_memset(&id, 0, sizeof(id));

id.if_index = BPF_CORE_READ(skb, skb_iif);
len = BPF_CORE_READ(skb, len);

// read L2 info
rtt_fill_in_l2(skb, &id);

family = BPF_CORE_READ(sk, __sk_common.skc_family);

// read L3 info
rtt_fill_in_l3(skb, &id, family, &dscp);

// read TCP info
rtt_fill_in_tcp(skb, &id, &flags);

// read TCP socket rtt and store it in nanoseconds
ts = (struct tcp_sock *)(sk);
rtt = BPF_CORE_READ(ts, srtt_us) >> 3;
rtt *= 1000u;
u64 current_ts = bpf_ktime_get_ns();
flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
aggregate_flow->end_mono_time_ts = current_ts;
aggregate_flow->flow_rtt = rtt;
aggregate_flow->dscp = dscp;
aggregate_flow->flags |= flags;
aggregate_flow->packets ++;
aggregate_flow->bytes += len;
ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY);
if (trace_messages && ret != 0) {
bpf_printk("error rtt track updating flow %d\n", ret);
}
} else {
flow_metrics new_flow = {
.packets = 1,
.bytes = len,
.start_mono_time_ts = current_ts,
.end_mono_time_ts = current_ts,
.flags = flags,
.flow_rtt = rtt,
.dscp = dscp,
};
ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY);
if (trace_messages && ret != 0) {
bpf_printk("error creating new rtt flow %d\n", ret);
}
}
return ret;
}

static __always_inline void calculate_flow_rtt(pkt_info *pkt, u8 direction, void *data_end) {
flow_seq_id seq_id;
__builtin_memset(&seq_id, 0, sizeof(flow_seq_id));
SEC("kprobe/tcp_rcv_established")
int tcp_rcv_kprobe(struct pt_regs *regs) {
struct sock *sk;
struct sk_buff *skb;

switch (pkt->id->transport_protocol)
{
case IPPROTO_TCP:
calculate_flow_rtt_tcp(pkt, direction, data_end, &seq_id);
break;
default:
break;
}
sk = (struct sock *)PT_REGS_PARM1_CORE(regs);
skb = (struct sk_buff *)PT_REGS_PARM2_CORE(regs);

return calculate_flow_rtt_tcp(sk, skb);
}

#endif /* __RTT_TRACKER_H__ */
1 change: 0 additions & 1 deletion bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ typedef struct pkt_info_t {
u64 current_ts; // ts recorded when pkt came.
u16 flags; // TCP specific
void *l4_hdr; // Stores the actual l4 header
u64 rtt; // rtt calculated from the flow if possible. else zero
u8 dscp; // IPv4/6 DSCP value
u16 dns_id;
u16 dns_flags;
Expand Down
16 changes: 3 additions & 13 deletions pkg/ebpf/bpf_bpfeb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_bpfeb.o
Binary file not shown.
Binary file removed pkg/ebpf/bpf_bpfel.o
Binary file not shown.

0 comments on commit fd0b425

Please sign in to comment.