Skip to content

Commit

Permalink
Added various runtime statistics:
Browse files Browse the repository at this point in the history
 - MCS, bandwidth and used FEC scheme
 - Radio and UDP flow rates

CLI UI refactoring
  • Loading branch information
svpcom committed May 20, 2024
1 parent d3ca31b commit 9d8e5b3
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 105 deletions.
90 changes: 75 additions & 15 deletions src/rx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ void Receiver::loop_iter(void)
int8_t noise[RX_ANT_MAX];
uint8_t flags = 0;
bool self_injected = false;
uint8_t mcs_index = 0;
uint8_t bandwidth = 20;

struct ieee80211_radiotap_iterator iterator;
int ret = ieee80211_radiotap_iterator_init(&iterator, (ieee80211_radiotap_header*)pkt, pktlen, NULL);

Expand Down Expand Up @@ -170,6 +173,45 @@ void Receiver::loop_iter(void)
self_injected = true;
break;

case IEEE80211_RADIOTAP_MCS:
{
/* u8,u8,u8 */

uint8_t mcs_have = iterator.this_arg[0];

if (mcs_have & IEEE80211_RADIOTAP_MCS_HAVE_MCS)
{
mcs_index = iterator.this_arg[2] & 0x7f;
}

if ((mcs_have & 1) && (iterator.this_arg[1] & 1))
{
bandwidth = 40;
}
}
break;

case IEEE80211_RADIOTAP_VHT:
{
/* u16 known, u8 flags, u8 bandwidth, u8 mcs_nss[4], u8 coding, u8 group_id, u16 partial_aid */
u8 known = iterator.this_arg[0];

if(known & 0x40)
{
int bwidth = iterator.this_arg[3] & 0x1f;
if(bwidth >= 1 && bwidth <= 3)
{
bandwidth = 40;
}
else if(bwidth >= 4 && bwidth <= 10)
{
bandwidth = 80;
}
}
mcs_index = (iterator.this_arg[4] >> 4) & 0x0f;
}
break;

default:
break;
}
Expand Down Expand Up @@ -201,9 +243,11 @@ void Receiver::loop_iter(void)
pkt += iterator._max_length;
pktlen -= iterator._max_length;

//fprintf(stderr, "CAPTURE: mcs: %u, bw: %u\n", mcs_index, bandwidth);
if (pktlen > (int)sizeof(ieee80211_header))
{
agg->process_packet(pkt + sizeof(ieee80211_header), pktlen - sizeof(ieee80211_header), wlan_idx, antenna, rssi, noise, freq, NULL);
agg->process_packet(pkt + sizeof(ieee80211_header), pktlen - sizeof(ieee80211_header),
wlan_idx, antenna, rssi, noise, freq, mcs_index, bandwidth, NULL);
} else {
fprintf(stderr, "Short packet (ieee header)\n");
continue;
Expand All @@ -213,8 +257,8 @@ void Receiver::loop_iter(void)


Aggregator::Aggregator(const string &client_addr, int client_port, const string &keypair, uint64_t epoch, uint32_t channel_id) : \
count_p_all(0), count_p_dec_err(0), count_p_dec_ok(0), count_p_fec_recovered(0),
count_p_lost(0), count_p_bad(0), count_p_override(0), count_p_outgoing(0),
count_p_all(0), count_b_all(0), count_p_dec_err(0), count_p_dec_ok(0), count_p_fec_recovered(0),
count_p_lost(0), count_p_bad(0), count_p_override(0), count_p_outgoing(0), count_b_outgoing(0),
fec_p(NULL), fec_k(-1), fec_n(-1), seq(0), rx_ring_front(0), rx_ring_alloc(0),
last_known_block((uint64_t)-1), epoch(epoch), channel_id(channel_id)
{
Expand Down Expand Up @@ -307,10 +351,14 @@ Forwarder::Forwarder(const string &client_addr, int client_port)
}


void Forwarder::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, const int8_t *rssi, const int8_t *noise, uint16_t freq, sockaddr_in *sockaddr)
void Forwarder::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna,
const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index,
uint8_t bandwidth, sockaddr_in *sockaddr)
{
wrxfwd_t fwd_hdr = { .wlan_idx = wlan_idx,
.freq = freq };
.freq = freq,
.mcs_index = mcs_index,
.bandwidth = bandwidth };

memcpy(fwd_hdr.antenna, antenna, RX_ANT_MAX * sizeof(uint8_t));
memcpy(fwd_hdr.rssi, rssi, RX_ANT_MAX * sizeof(int8_t));
Expand Down Expand Up @@ -414,13 +462,14 @@ void Aggregator::dump_stats(FILE *fp)

for(rx_antenna_stat_t::iterator it = antenna_stat.begin(); it != antenna_stat.end(); it++)
{
fprintf(fp, "%" PRIu64 "\tRX_ANT\t%u\t%" PRIx64 "\t%d" ":%d:%d:%d" ":%d:%d:%d\n",
ts, it->first.freq, it->first.antenna_id, it->second.count_all,
fprintf(fp, "%" PRIu64 "\tRX_ANT\t%u:%u:%u\t%" PRIx64 "\t%d" ":%d:%d:%d" ":%d:%d:%d\n",
ts, it->first.freq, it->first.mcs_index, it->first.bandwidth, it->first.antenna_id, it->second.count_all,
it->second.rssi_min, it->second.rssi_sum / it->second.count_all, it->second.rssi_max,
it->second.snr_min, it->second.snr_sum / it->second.count_all, it->second.snr_max);
}

fprintf(fp, "%" PRIu64 "\tPKT\t%u:%u:%u:%u:%u:%u:%u\n", ts, count_p_all, count_p_dec_err, count_p_dec_ok, count_p_fec_recovered, count_p_lost, count_p_bad, count_p_outgoing);
fprintf(fp, "%" PRIu64 "\tPKT\t%u:%u:%u:%u:%u:%u:%u:%u:%u\n", ts, count_p_all, count_b_all, count_p_dec_err,
count_p_dec_ok, count_p_fec_recovered, count_p_lost, count_p_bad, count_p_outgoing, count_b_outgoing);
fflush(fp);

if(count_p_override)
Expand All @@ -437,12 +486,16 @@ void Aggregator::dump_stats(FILE *fp)
}


void Aggregator::log_rssi(const sockaddr_in *sockaddr, uint8_t wlan_idx, const uint8_t *ant, const int8_t *rssi, const int8_t *noise, uint16_t freq)
void Aggregator::log_rssi(const sockaddr_in *sockaddr, uint8_t wlan_idx, const uint8_t *ant, const int8_t *rssi, const int8_t *noise,
uint16_t freq, uint8_t mcs_index, uint8_t bandwidth)
{
for(int i = 0; i < RX_ANT_MAX && ant[i] != 0xff; i++)
{
// antenna_id: addr + port + wlan_idx + ant
rxAntennaKey key = {.freq = freq, .antenna_id = 0};
rxAntennaKey key = {.freq = freq,
.antenna_id = 0,
.mcs_index=mcs_index,
.bandwidth=bandwidth};

if (sockaddr != NULL && sockaddr->sin_family == AF_INET)
{
Expand All @@ -456,10 +509,13 @@ void Aggregator::log_rssi(const sockaddr_in *sockaddr, uint8_t wlan_idx, const u
}


void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, const int8_t *rssi, const int8_t *noise, uint16_t freq, sockaddr_in *sockaddr)
void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna,
const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index,
uint8_t bandwidth, sockaddr_in *sockaddr)
{
wsession_data_t new_session_data;
count_p_all += 1;
count_b_all += size;

if(size == 0) return;

Expand Down Expand Up @@ -536,7 +592,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
}

count_p_dec_ok += 1;
log_rssi(sockaddr, wlan_idx, antenna, rssi, noise, freq);
log_rssi(sockaddr, wlan_idx, antenna, rssi, noise, freq, mcs_index, bandwidth);

if (memcmp(session_key, new_session_data.session_key, sizeof(session_key)) != 0)
{
Expand Down Expand Up @@ -579,7 +635,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
}

count_p_dec_ok += 1;
log_rssi(sockaddr, wlan_idx, antenna, rssi, noise, freq);
log_rssi(sockaddr, wlan_idx, antenna, rssi, noise, freq, mcs_index, bandwidth);

assert(decrypted_len <= MAX_FEC_PAYLOAD);

Expand Down Expand Up @@ -715,10 +771,12 @@ void Aggregator::send_packet(int ring_idx, int fragment_idx)
{
fprintf(stderr, "Corrupted packet %u\n", seq);
count_p_bad += 1;
}else if(!(flags & WFB_PACKET_FEC_ONLY))
}
else if(!(flags & WFB_PACKET_FEC_ONLY))
{
send(sockfd, payload, packet_size, MSG_DONTWAIT);
count_p_outgoing += 1;
count_b_outgoing += packet_size;
}
}

Expand Down Expand Up @@ -882,7 +940,9 @@ void network_loop(int srv_port, Aggregator &agg, int log_interval, int rcv_buf_s
fprintf(stderr, "Short packet (rx fwd header)\n");
continue;
}
agg.process_packet(buf, rsize - sizeof(wrxfwd_t), fwd_hdr.wlan_idx, fwd_hdr.antenna, fwd_hdr.rssi, fwd_hdr.noise, fwd_hdr.freq, &sockaddr);
agg.process_packet(buf, rsize - sizeof(wrxfwd_t), fwd_hdr.wlan_idx, fwd_hdr.antenna,
fwd_hdr.rssi, fwd_hdr.noise, fwd_hdr.freq,
fwd_hdr.mcs_index, fwd_hdr.bandwidth, &sockaddr);
}
if(errno != EWOULDBLOCK) throw runtime_error(string_format("Error receiving packet: %s", strerror(errno)));
}
Expand Down
44 changes: 36 additions & 8 deletions src/rx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ typedef enum {
class BaseAggregator
{
public:
virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, const int8_t *rssi, const int8_t *noise, uint16_t freq, sockaddr_in *sockaddr) = 0;
virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna,
const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index,
uint8_t bandwidth, sockaddr_in *sockaddr) = 0;

virtual void dump_stats(FILE *fp) = 0;
protected:
int open_udp_socket_for_tx(const std::string &client_addr, int client_port)
Expand Down Expand Up @@ -67,7 +70,9 @@ class Forwarder : public BaseAggregator
public:
Forwarder(const std::string &client_addr, int client_port);
~Forwarder();
virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, const int8_t *rssi, const int8_t *noise, uint16_t freq, sockaddr_in *sockaddr);
virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna,
const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index,
uint8_t bandwidth,sockaddr_in *sockaddr);
virtual void dump_stats(FILE *) {}
private:
int sockfd;
Expand Down Expand Up @@ -129,21 +134,37 @@ struct rxAntennaKey
{
uint16_t freq;
uint64_t antenna_id;
uint8_t mcs_index;
uint8_t bandwidth;

bool operator==(const rxAntennaKey &other) const
{
return (freq == other.freq && antenna_id == other.antenna_id);
return (freq == other.freq && \
antenna_id == other.antenna_id && \
mcs_index == other.mcs_index && \
bandwidth == other.bandwidth);
}
};


template <typename T>
void hash_combine(std::size_t& seed, const T& v)
{
seed ^= std::hash<T>{}(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
}


template<>
struct std::hash<rxAntennaKey>
{
std::size_t operator()(const rxAntennaKey& k) const noexcept
{
std::size_t h1 = std::hash<uint16_t>{}(k.freq);
std::size_t h2 = std::hash<uint64_t>{}(k.antenna_id);
return h1 ^ (h2 << 1); // combine hashes
std::size_t h = 0;
hash_combine(h, k.freq);
hash_combine(h, k.antenna_id);
hash_combine(h, k.mcs_index);
hash_combine(h, k.bandwidth);
return h;
}
};

Expand All @@ -154,39 +175,46 @@ class Aggregator : public BaseAggregator
public:
Aggregator(const std::string &client_addr, int client_port, const std::string &keypair, uint64_t epoch, uint32_t channel_id);
~Aggregator();
virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, const int8_t *rssi, const int8_t *noise, uint16_t freq, sockaddr_in *sockaddr);
virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna,
const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index,
uint8_t bandwidth, sockaddr_in *sockaddr);
virtual void dump_stats(FILE *fp);

// Make stats public for android userspace receiver
void clear_stats(void)
{
antenna_stat.clear();
count_p_all = 0;
count_b_all = 0;
count_p_dec_err = 0;
count_p_dec_ok = 0;
count_p_fec_recovered = 0;
count_p_lost = 0;
count_p_bad = 0;
count_p_override = 0;
count_p_outgoing = 0;
count_b_outgoing = 0;
}

rx_antenna_stat_t antenna_stat;
uint32_t count_p_all;
uint32_t count_b_all;
uint32_t count_p_dec_err;
uint32_t count_p_dec_ok;
uint32_t count_p_fec_recovered;
uint32_t count_p_lost;
uint32_t count_p_bad;
uint32_t count_p_override;
uint32_t count_p_outgoing;
uint32_t count_b_outgoing;

private:
void init_fec(int k, int n);
void deinit_fec(void);
void send_packet(int ring_idx, int fragment_idx);
void apply_fec(int ring_idx);
void log_rssi(const sockaddr_in *sockaddr, uint8_t wlan_idx, const uint8_t *ant, const int8_t *rssi, const int8_t *noise, uint16_t freq);
void log_rssi(const sockaddr_in *sockaddr, uint8_t wlan_idx, const uint8_t *ant, const int8_t *rssi,
const int8_t *noise, uint16_t freq, uint8_t mcs_index, uint8_t bandwidth);
int get_block_ring_idx(uint64_t block_idx);
int rx_ring_push(void);
fec_t* fec_p;
Expand Down
28 changes: 17 additions & 11 deletions src/tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ void RawSocketTransmitter::inject_packet(const uint8_t *buf, size_t size)
}

uint64_t key = (uint64_t)(current_output) << 8 | (uint64_t)0xff;
antenna_stat[key].log_latency(get_time_us() - start_us, rc >= 0);
antenna_stat[key].log_latency(get_time_us() - start_us, rc >= 0, size);
}
else
{
Expand All @@ -252,25 +252,26 @@ void RawSocketTransmitter::inject_packet(const uint8_t *buf, size_t size)
}

uint64_t key = (uint64_t)(i) << 8 | (uint64_t)0xff;
antenna_stat[key].log_latency(get_time_us() - start_us, rc >= 0);
antenna_stat[key].log_latency(get_time_us() - start_us, rc >= 0, size);
}
}

}

void RawSocketTransmitter::dump_stats(FILE *fp, uint64_t ts, uint32_t &injected, uint32_t &dropped)
void RawSocketTransmitter::dump_stats(FILE *fp, uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes)
{
for(tx_antenna_stat_t::iterator it = antenna_stat.begin(); it != antenna_stat.end(); it++)
{
fprintf(fp, "%" PRIu64 "\tTX_ANT\t%" PRIx64 "\t%u:%u:%" PRIu64 ":%" PRIu64 ":%" PRIu64 "\n",
ts, it->first,
it->second.count_injected, it->second.count_dropped,
it->second.count_p_injected, it->second.count_p_dropped,
it->second.latency_min,
it->second.latency_sum / (it->second.count_injected + it->second.count_dropped),
it->second.latency_sum / (it->second.count_p_injected + it->second.count_p_dropped),
it->second.latency_max);

injected += it->second.count_injected;
dropped += it->second.count_dropped;
injected_packets += it->second.count_p_injected;
dropped_packets += it->second.count_p_dropped;
injected_bytes += it->second.count_b_injected;
}
antenna_stat.clear();
}
Expand Down Expand Up @@ -393,7 +394,9 @@ void data_source(shared_ptr<Transmitter> &t, vector<int> &rx_fd, int fec_timeout
uint64_t fec_close_ts = fec_timeout > 0 ? get_time_ms() + fec_timeout : 0;
uint32_t count_p_fec_timeouts = 0; // empty packets sent to close fec block due to timeout
uint32_t count_p_incoming = 0; // incoming udp packets (received + dropped due to rxq overflow)
uint32_t count_p_injected = 0; // successfully injected (include additional fec packets)
uint32_t count_b_incoming = 0; // incoming udp bytes (received + dropped due to rxq overflow)
uint32_t count_p_injected = 0; // successfully injected packets (include additional fec packets)
uint32_t count_b_injected = 0; // successfully injected bytes (include additional fec packets)
uint32_t count_p_dropped = 0; // dropped due to rxq overflows or injection timeout
uint32_t count_p_truncated = 0; // injected large packets that were truncated
int start_fd_idx = 0;
Expand All @@ -420,10 +423,10 @@ void data_source(shared_ptr<Transmitter> &t, vector<int> &rx_fd, int fec_timeout

if (cur_ts >= log_send_ts) // log timeout expired
{
t->dump_stats(stdout, cur_ts, count_p_injected, count_p_dropped);
t->dump_stats(stdout, cur_ts, count_p_injected, count_p_dropped, count_b_injected);

fprintf(stdout, "%" PRIu64 "\tPKT\t%u:%u:%u:%u:%u\n",
cur_ts, count_p_fec_timeouts, count_p_incoming, count_p_injected, count_p_dropped, count_p_truncated);
fprintf(stdout, "%" PRIu64 "\tPKT\t%u:%u:%u:%u:%u:%u:%u\n",
cur_ts, count_p_fec_timeouts, count_p_incoming, count_b_incoming, count_p_injected, count_b_injected, count_p_dropped, count_p_truncated);
fflush(stdout);

if(count_p_dropped)
Expand All @@ -438,7 +441,9 @@ void data_source(shared_ptr<Transmitter> &t, vector<int> &rx_fd, int fec_timeout

count_p_fec_timeouts = 0;
count_p_incoming = 0;
count_b_incoming = 0;
count_p_injected = 0;
count_b_injected = 0;
count_p_dropped = 0;
count_p_truncated = 0;

Expand Down Expand Up @@ -502,6 +507,7 @@ void data_source(shared_ptr<Transmitter> &t, vector<int> &rx_fd, int fec_timeout
}

count_p_incoming += 1;
count_b_incoming += rsize;

if (rsize > (ssize_t)MAX_PAYLOAD_SIZE)
{
Expand Down

0 comments on commit 9d8e5b3

Please sign in to comment.