From 9d8e5b399062888b35ced6c3667287bf1cec1755 Mon Sep 17 00:00:00 2001 From: Vasily Evseenko Date: Mon, 20 May 2024 23:41:00 +0300 Subject: [PATCH] Added various runtime statistics: - MCS, bandwidth and used FEC scheme - Radio and UDP flow rates CLI UI refactoring --- src/rx.cpp | 90 ++++++++++++++++++++++++++------ src/rx.hpp | 44 +++++++++++++--- src/tx.cpp | 28 ++++++---- src/tx.hpp | 32 ++++++++---- src/wifibroadcast.hpp | 2 + wfb_ng/cli.py | 114 ++++++++++++++++++++++++++++------------- wfb_ng/conf/master.cfg | 4 -- wfb_ng/server.py | 49 ++++++++++-------- 8 files changed, 258 insertions(+), 105 deletions(-) diff --git a/src/rx.cpp b/src/rx.cpp index 0581897a..31111216 100644 --- a/src/rx.cpp +++ b/src/rx.cpp @@ -117,6 +117,9 @@ void Receiver::loop_iter(void) int8_t noise[RX_ANT_MAX]; uint8_t flags = 0; bool self_injected = false; + uint8_t mcs_index = 0; + uint8_t bandwidth = 20; + struct ieee80211_radiotap_iterator iterator; int ret = ieee80211_radiotap_iterator_init(&iterator, (ieee80211_radiotap_header*)pkt, pktlen, NULL); @@ -170,6 +173,45 @@ void Receiver::loop_iter(void) self_injected = true; break; + case IEEE80211_RADIOTAP_MCS: + { + /* u8,u8,u8 */ + + uint8_t mcs_have = iterator.this_arg[0]; + + if (mcs_have & IEEE80211_RADIOTAP_MCS_HAVE_MCS) + { + mcs_index = iterator.this_arg[2] & 0x7f; + } + + if ((mcs_have & 1) && (iterator.this_arg[1] & 1)) + { + bandwidth = 40; + } + } + break; + + case IEEE80211_RADIOTAP_VHT: + { + /* u16 known, u8 flags, u8 bandwidth, u8 mcs_nss[4], u8 coding, u8 group_id, u16 partial_aid */ + u8 known = iterator.this_arg[0]; + + if(known & 0x40) + { + int bwidth = iterator.this_arg[3] & 0x1f; + if(bwidth >= 1 && bwidth <= 3) + { + bandwidth = 40; + } + else if(bwidth >= 4 && bwidth <= 10) + { + bandwidth = 80; + } + } + mcs_index = (iterator.this_arg[4] >> 4) & 0x0f; + } + break; + default: break; } @@ -201,9 +243,11 @@ void Receiver::loop_iter(void) pkt += iterator._max_length; pktlen -= iterator._max_length; + //fprintf(stderr, "CAPTURE: mcs: %u, bw: %u\n", mcs_index, bandwidth); if (pktlen > (int)sizeof(ieee80211_header)) { - agg->process_packet(pkt + sizeof(ieee80211_header), pktlen - sizeof(ieee80211_header), wlan_idx, antenna, rssi, noise, freq, NULL); + agg->process_packet(pkt + sizeof(ieee80211_header), pktlen - sizeof(ieee80211_header), + wlan_idx, antenna, rssi, noise, freq, mcs_index, bandwidth, NULL); } else { fprintf(stderr, "Short packet (ieee header)\n"); continue; @@ -213,8 +257,8 @@ void Receiver::loop_iter(void) Aggregator::Aggregator(const string &client_addr, int client_port, const string &keypair, uint64_t epoch, uint32_t channel_id) : \ - count_p_all(0), count_p_dec_err(0), count_p_dec_ok(0), count_p_fec_recovered(0), - count_p_lost(0), count_p_bad(0), count_p_override(0), count_p_outgoing(0), + count_p_all(0), count_b_all(0), count_p_dec_err(0), count_p_dec_ok(0), count_p_fec_recovered(0), + count_p_lost(0), count_p_bad(0), count_p_override(0), count_p_outgoing(0), count_b_outgoing(0), fec_p(NULL), fec_k(-1), fec_n(-1), seq(0), rx_ring_front(0), rx_ring_alloc(0), last_known_block((uint64_t)-1), epoch(epoch), channel_id(channel_id) { @@ -307,10 +351,14 @@ Forwarder::Forwarder(const string &client_addr, int client_port) } -void Forwarder::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, const int8_t *rssi, const int8_t *noise, uint16_t freq, sockaddr_in *sockaddr) +void Forwarder::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, + const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index, + uint8_t bandwidth, sockaddr_in *sockaddr) { wrxfwd_t fwd_hdr = { .wlan_idx = wlan_idx, - .freq = freq }; + .freq = freq, + .mcs_index = mcs_index, + .bandwidth = bandwidth }; memcpy(fwd_hdr.antenna, antenna, RX_ANT_MAX * sizeof(uint8_t)); memcpy(fwd_hdr.rssi, rssi, RX_ANT_MAX * sizeof(int8_t)); @@ -414,13 +462,14 @@ void Aggregator::dump_stats(FILE *fp) for(rx_antenna_stat_t::iterator it = antenna_stat.begin(); it != antenna_stat.end(); it++) { - fprintf(fp, "%" PRIu64 "\tRX_ANT\t%u\t%" PRIx64 "\t%d" ":%d:%d:%d" ":%d:%d:%d\n", - ts, it->first.freq, it->first.antenna_id, it->second.count_all, + fprintf(fp, "%" PRIu64 "\tRX_ANT\t%u:%u:%u\t%" PRIx64 "\t%d" ":%d:%d:%d" ":%d:%d:%d\n", + ts, it->first.freq, it->first.mcs_index, it->first.bandwidth, it->first.antenna_id, it->second.count_all, it->second.rssi_min, it->second.rssi_sum / it->second.count_all, it->second.rssi_max, it->second.snr_min, it->second.snr_sum / it->second.count_all, it->second.snr_max); } - fprintf(fp, "%" PRIu64 "\tPKT\t%u:%u:%u:%u:%u:%u:%u\n", ts, count_p_all, count_p_dec_err, count_p_dec_ok, count_p_fec_recovered, count_p_lost, count_p_bad, count_p_outgoing); + fprintf(fp, "%" PRIu64 "\tPKT\t%u:%u:%u:%u:%u:%u:%u:%u:%u\n", ts, count_p_all, count_b_all, count_p_dec_err, + count_p_dec_ok, count_p_fec_recovered, count_p_lost, count_p_bad, count_p_outgoing, count_b_outgoing); fflush(fp); if(count_p_override) @@ -437,12 +486,16 @@ void Aggregator::dump_stats(FILE *fp) } -void Aggregator::log_rssi(const sockaddr_in *sockaddr, uint8_t wlan_idx, const uint8_t *ant, const int8_t *rssi, const int8_t *noise, uint16_t freq) +void Aggregator::log_rssi(const sockaddr_in *sockaddr, uint8_t wlan_idx, const uint8_t *ant, const int8_t *rssi, const int8_t *noise, + uint16_t freq, uint8_t mcs_index, uint8_t bandwidth) { for(int i = 0; i < RX_ANT_MAX && ant[i] != 0xff; i++) { // antenna_id: addr + port + wlan_idx + ant - rxAntennaKey key = {.freq = freq, .antenna_id = 0}; + rxAntennaKey key = {.freq = freq, + .antenna_id = 0, + .mcs_index=mcs_index, + .bandwidth=bandwidth}; if (sockaddr != NULL && sockaddr->sin_family == AF_INET) { @@ -456,10 +509,13 @@ void Aggregator::log_rssi(const sockaddr_in *sockaddr, uint8_t wlan_idx, const u } -void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, const int8_t *rssi, const int8_t *noise, uint16_t freq, sockaddr_in *sockaddr) +void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, + const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index, + uint8_t bandwidth, sockaddr_in *sockaddr) { wsession_data_t new_session_data; count_p_all += 1; + count_b_all += size; if(size == 0) return; @@ -536,7 +592,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id } count_p_dec_ok += 1; - log_rssi(sockaddr, wlan_idx, antenna, rssi, noise, freq); + log_rssi(sockaddr, wlan_idx, antenna, rssi, noise, freq, mcs_index, bandwidth); if (memcmp(session_key, new_session_data.session_key, sizeof(session_key)) != 0) { @@ -579,7 +635,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id } count_p_dec_ok += 1; - log_rssi(sockaddr, wlan_idx, antenna, rssi, noise, freq); + log_rssi(sockaddr, wlan_idx, antenna, rssi, noise, freq, mcs_index, bandwidth); assert(decrypted_len <= MAX_FEC_PAYLOAD); @@ -715,10 +771,12 @@ void Aggregator::send_packet(int ring_idx, int fragment_idx) { fprintf(stderr, "Corrupted packet %u\n", seq); count_p_bad += 1; - }else if(!(flags & WFB_PACKET_FEC_ONLY)) + } + else if(!(flags & WFB_PACKET_FEC_ONLY)) { send(sockfd, payload, packet_size, MSG_DONTWAIT); count_p_outgoing += 1; + count_b_outgoing += packet_size; } } @@ -882,7 +940,9 @@ void network_loop(int srv_port, Aggregator &agg, int log_interval, int rcv_buf_s fprintf(stderr, "Short packet (rx fwd header)\n"); continue; } - agg.process_packet(buf, rsize - sizeof(wrxfwd_t), fwd_hdr.wlan_idx, fwd_hdr.antenna, fwd_hdr.rssi, fwd_hdr.noise, fwd_hdr.freq, &sockaddr); + agg.process_packet(buf, rsize - sizeof(wrxfwd_t), fwd_hdr.wlan_idx, fwd_hdr.antenna, + fwd_hdr.rssi, fwd_hdr.noise, fwd_hdr.freq, + fwd_hdr.mcs_index, fwd_hdr.bandwidth, &sockaddr); } if(errno != EWOULDBLOCK) throw runtime_error(string_format("Error receiving packet: %s", strerror(errno))); } diff --git a/src/rx.hpp b/src/rx.hpp index f91f2aed..250e63bf 100644 --- a/src/rx.hpp +++ b/src/rx.hpp @@ -39,7 +39,10 @@ typedef enum { class BaseAggregator { public: - virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, const int8_t *rssi, const int8_t *noise, uint16_t freq, sockaddr_in *sockaddr) = 0; + virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, + const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index, + uint8_t bandwidth, sockaddr_in *sockaddr) = 0; + virtual void dump_stats(FILE *fp) = 0; protected: int open_udp_socket_for_tx(const std::string &client_addr, int client_port) @@ -67,7 +70,9 @@ class Forwarder : public BaseAggregator public: Forwarder(const std::string &client_addr, int client_port); ~Forwarder(); - virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, const int8_t *rssi, const int8_t *noise, uint16_t freq, sockaddr_in *sockaddr); + virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, + const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index, + uint8_t bandwidth,sockaddr_in *sockaddr); virtual void dump_stats(FILE *) {} private: int sockfd; @@ -129,21 +134,37 @@ struct rxAntennaKey { uint16_t freq; uint64_t antenna_id; + uint8_t mcs_index; + uint8_t bandwidth; bool operator==(const rxAntennaKey &other) const { - return (freq == other.freq && antenna_id == other.antenna_id); + return (freq == other.freq && \ + antenna_id == other.antenna_id && \ + mcs_index == other.mcs_index && \ + bandwidth == other.bandwidth); } }; + +template +void hash_combine(std::size_t& seed, const T& v) +{ + seed ^= std::hash{}(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2); +} + + template<> struct std::hash { std::size_t operator()(const rxAntennaKey& k) const noexcept { - std::size_t h1 = std::hash{}(k.freq); - std::size_t h2 = std::hash{}(k.antenna_id); - return h1 ^ (h2 << 1); // combine hashes + std::size_t h = 0; + hash_combine(h, k.freq); + hash_combine(h, k.antenna_id); + hash_combine(h, k.mcs_index); + hash_combine(h, k.bandwidth); + return h; } }; @@ -154,7 +175,9 @@ class Aggregator : public BaseAggregator public: Aggregator(const std::string &client_addr, int client_port, const std::string &keypair, uint64_t epoch, uint32_t channel_id); ~Aggregator(); - virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, const int8_t *rssi, const int8_t *noise, uint16_t freq, sockaddr_in *sockaddr); + virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, + const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index, + uint8_t bandwidth, sockaddr_in *sockaddr); virtual void dump_stats(FILE *fp); // Make stats public for android userspace receiver @@ -162,6 +185,7 @@ class Aggregator : public BaseAggregator { antenna_stat.clear(); count_p_all = 0; + count_b_all = 0; count_p_dec_err = 0; count_p_dec_ok = 0; count_p_fec_recovered = 0; @@ -169,10 +193,12 @@ class Aggregator : public BaseAggregator count_p_bad = 0; count_p_override = 0; count_p_outgoing = 0; + count_b_outgoing = 0; } rx_antenna_stat_t antenna_stat; uint32_t count_p_all; + uint32_t count_b_all; uint32_t count_p_dec_err; uint32_t count_p_dec_ok; uint32_t count_p_fec_recovered; @@ -180,13 +206,15 @@ class Aggregator : public BaseAggregator uint32_t count_p_bad; uint32_t count_p_override; uint32_t count_p_outgoing; + uint32_t count_b_outgoing; private: void init_fec(int k, int n); void deinit_fec(void); void send_packet(int ring_idx, int fragment_idx); void apply_fec(int ring_idx); - void log_rssi(const sockaddr_in *sockaddr, uint8_t wlan_idx, const uint8_t *ant, const int8_t *rssi, const int8_t *noise, uint16_t freq); + void log_rssi(const sockaddr_in *sockaddr, uint8_t wlan_idx, const uint8_t *ant, const int8_t *rssi, + const int8_t *noise, uint16_t freq, uint8_t mcs_index, uint8_t bandwidth); int get_block_ring_idx(uint64_t block_idx); int rx_ring_push(void); fec_t* fec_p; diff --git a/src/tx.cpp b/src/tx.cpp index d71c6f00..c580a619 100644 --- a/src/tx.cpp +++ b/src/tx.cpp @@ -234,7 +234,7 @@ void RawSocketTransmitter::inject_packet(const uint8_t *buf, size_t size) } uint64_t key = (uint64_t)(current_output) << 8 | (uint64_t)0xff; - antenna_stat[key].log_latency(get_time_us() - start_us, rc >= 0); + antenna_stat[key].log_latency(get_time_us() - start_us, rc >= 0, size); } else { @@ -252,25 +252,26 @@ void RawSocketTransmitter::inject_packet(const uint8_t *buf, size_t size) } uint64_t key = (uint64_t)(i) << 8 | (uint64_t)0xff; - antenna_stat[key].log_latency(get_time_us() - start_us, rc >= 0); + antenna_stat[key].log_latency(get_time_us() - start_us, rc >= 0, size); } } } -void RawSocketTransmitter::dump_stats(FILE *fp, uint64_t ts, uint32_t &injected, uint32_t &dropped) +void RawSocketTransmitter::dump_stats(FILE *fp, uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes) { for(tx_antenna_stat_t::iterator it = antenna_stat.begin(); it != antenna_stat.end(); it++) { fprintf(fp, "%" PRIu64 "\tTX_ANT\t%" PRIx64 "\t%u:%u:%" PRIu64 ":%" PRIu64 ":%" PRIu64 "\n", ts, it->first, - it->second.count_injected, it->second.count_dropped, + it->second.count_p_injected, it->second.count_p_dropped, it->second.latency_min, - it->second.latency_sum / (it->second.count_injected + it->second.count_dropped), + it->second.latency_sum / (it->second.count_p_injected + it->second.count_p_dropped), it->second.latency_max); - injected += it->second.count_injected; - dropped += it->second.count_dropped; + injected_packets += it->second.count_p_injected; + dropped_packets += it->second.count_p_dropped; + injected_bytes += it->second.count_b_injected; } antenna_stat.clear(); } @@ -393,7 +394,9 @@ void data_source(shared_ptr &t, vector &rx_fd, int fec_timeout uint64_t fec_close_ts = fec_timeout > 0 ? get_time_ms() + fec_timeout : 0; uint32_t count_p_fec_timeouts = 0; // empty packets sent to close fec block due to timeout uint32_t count_p_incoming = 0; // incoming udp packets (received + dropped due to rxq overflow) - uint32_t count_p_injected = 0; // successfully injected (include additional fec packets) + uint32_t count_b_incoming = 0; // incoming udp bytes (received + dropped due to rxq overflow) + uint32_t count_p_injected = 0; // successfully injected packets (include additional fec packets) + uint32_t count_b_injected = 0; // successfully injected bytes (include additional fec packets) uint32_t count_p_dropped = 0; // dropped due to rxq overflows or injection timeout uint32_t count_p_truncated = 0; // injected large packets that were truncated int start_fd_idx = 0; @@ -420,10 +423,10 @@ void data_source(shared_ptr &t, vector &rx_fd, int fec_timeout if (cur_ts >= log_send_ts) // log timeout expired { - t->dump_stats(stdout, cur_ts, count_p_injected, count_p_dropped); + t->dump_stats(stdout, cur_ts, count_p_injected, count_p_dropped, count_b_injected); - fprintf(stdout, "%" PRIu64 "\tPKT\t%u:%u:%u:%u:%u\n", - cur_ts, count_p_fec_timeouts, count_p_incoming, count_p_injected, count_p_dropped, count_p_truncated); + fprintf(stdout, "%" PRIu64 "\tPKT\t%u:%u:%u:%u:%u:%u:%u\n", + cur_ts, count_p_fec_timeouts, count_p_incoming, count_b_incoming, count_p_injected, count_b_injected, count_p_dropped, count_p_truncated); fflush(stdout); if(count_p_dropped) @@ -438,7 +441,9 @@ void data_source(shared_ptr &t, vector &rx_fd, int fec_timeout count_p_fec_timeouts = 0; count_p_incoming = 0; + count_b_incoming = 0; count_p_injected = 0; + count_b_injected = 0; count_p_dropped = 0; count_p_truncated = 0; @@ -502,6 +507,7 @@ void data_source(shared_ptr &t, vector &rx_fd, int fec_timeout } count_p_incoming += 1; + count_b_incoming += rsize; if (rsize > (ssize_t)MAX_PAYLOAD_SIZE) { diff --git a/src/tx.hpp b/src/tx.hpp index 1c4401d7..ca2b894f 100644 --- a/src/tx.hpp +++ b/src/tx.hpp @@ -38,7 +38,7 @@ class Transmitter bool send_packet(const uint8_t *buf, size_t size, uint8_t flags); void send_session_key(void); virtual void select_output(int idx) = 0; - virtual void dump_stats(FILE *fp, uint64_t ts, uint32_t &injected, uint32_t &dropped) = 0; + virtual void dump_stats(FILE *fp, uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes) = 0; protected: virtual void inject_packet(const uint8_t *buf, size_t size) = 0; @@ -66,26 +66,36 @@ class Transmitter class txAntennaItem { public: - txAntennaItem(void) : count_injected(0), count_dropped(0), latency_sum(0), latency_min(0), latency_max(0) {} + txAntennaItem(void) : count_p_injected(0), count_b_injected(0), count_p_dropped(0), latency_sum(0), latency_min(0), latency_max(0) {} - void log_latency(uint64_t latency, bool succeeded) { - if(count_injected + count_dropped == 0) + void log_latency(uint64_t latency, bool succeeded, uint32_t packet_size) { + if(count_p_injected + count_p_dropped == 0) { latency_min = latency; latency_max = latency; - } else { + } + else + { latency_min = std::min(latency, latency_min); latency_max = std::max(latency, latency_max); } latency_sum += latency; - if (succeeded) count_injected += 1; - else count_dropped += 1; + if (succeeded) + { + count_p_injected += 1; + count_b_injected += packet_size; + } + else + { + count_p_dropped += 1; + } } - uint32_t count_injected; - uint32_t count_dropped; + uint32_t count_p_injected; + uint32_t count_b_injected; + uint32_t count_p_dropped; uint64_t latency_sum; uint64_t latency_min; uint64_t latency_max; @@ -99,7 +109,7 @@ class RawSocketTransmitter : public Transmitter RawSocketTransmitter(int k, int m, const std::string &keypair, uint64_t epoch, uint32_t channel_id, const std::vector &wlans, shared_ptr radiotap_header, size_t radiotap_header_len, uint8_t frame_type); virtual ~RawSocketTransmitter(); virtual void select_output(int idx) { current_output = idx; } - virtual void dump_stats(FILE *fp, uint64_t ts, uint32_t &injected, uint32_t &dropped); + virtual void dump_stats(FILE *fp, uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes); private: virtual void inject_packet(const uint8_t *buf, size_t size); const uint32_t channel_id; @@ -129,7 +139,7 @@ class UdpTransmitter : public Transmitter saddr.sin_port = htons((unsigned short)base_port); } - virtual void dump_stats(FILE *fp, uint64_t ts, uint32_t &injected, uint32_t &dropped) {} + virtual void dump_stats(FILE *fp, uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes) {} virtual ~UdpTransmitter() { diff --git a/src/wifibroadcast.hpp b/src/wifibroadcast.hpp index da4c91d4..d3199e78 100644 --- a/src/wifibroadcast.hpp +++ b/src/wifibroadcast.hpp @@ -197,6 +197,8 @@ typedef struct { int8_t rssi[RX_ANT_MAX]; //RADIOTAP_DBM_ANTSIGNAL, list of rssi for corresponding antenna idx int8_t noise[RX_ANT_MAX]; //RADIOTAP_DBM_ANTNOISE, list of (rssi - snr) for corresponding antenna idx uint16_t freq; //IEEE80211_RADIOTAP_CHANNEL -- channel frequency in MHz + uint8_t mcs_index; + uint8_t bandwidth; } __attribute__ ((packed)) wrxfwd_t; // Network packet headers. All numbers are in network (big endian) format diff --git a/wfb_ng/cli.py b/wfb_ng/cli.py index 801d11c4..fb4648a4 100644 --- a/wfb_ng/cli.py +++ b/wfb_ng/cli.py @@ -35,6 +35,10 @@ from .common import abort_on_crash, exit_status from .conf import settings +_orig_stdout = sys.stdout + +def set_window_title(s): + print("\033]2;%s\007" % (s,), file=_orig_stdout) # Workarond for ncurses bug that show error on output to the last position on the screen @@ -44,6 +48,10 @@ def ignore_curses_err(f, *args, **kwargs): except curses.error: pass +def addcstr(window, s, attrs=0): + h, w = window.getmaxyx() + addstr(window, h // 2, max((w - len(s)) // 2, 0), s, attrs) + def rectangle(win, uly, ulx, lry, lrx): """Draw a rectangle with corners at the provided upper-left @@ -67,6 +75,23 @@ def addstr(window, y, x, s, *attrs): pass + +def human_rate(r): + rate = r * 8 + + if rate > 1024 * 1024: + rate = rate / 1024 / 1024 + mod = 'mbit/s' + else: + rate = rate / 1024 + mod = 'kbit/s' + + if rate < 10: + return '%0.1f %s' % (rate, mod) + else: + return '%3d %s' % (rate, mod) + + class AntennaStat(Int32StringReceiver): MAX_LENGTH = 1024 * 1024 @@ -77,9 +102,12 @@ def stringReceived(self, string): self.draw_rx(attrs) elif attrs['type'] == 'tx': self.draw_tx(attrs) + elif attrs['type'] == 'cli_title': + set_window_title(attrs['cli_title']) def draw_rx(self, attrs): p = attrs['packets'] + session_d = attrs['session'] stats_d = attrs['rx_ant_stats'] tx_ant = attrs.get('tx_ant') rx_id = attrs['id'] @@ -89,10 +117,12 @@ def draw_rx(self, attrs): return window.erase() - addstr(window, 0, 0, '[RX] pkt/s pkt') + addstr(window, 0, 0, ' pkt/s pkt') msg_l = (('recv %4d %d' % tuple(p['all']), 0), + #('recvb %4d %d' % tuple(p['all_bytes']), 0), ('udp %4d %d' % tuple(p['out']), 0), + #('udpb %4d %d' % tuple(p['out_bytes']), 0), ('fec_r %4d %d' % tuple(p['fec_rec']), curses.A_REVERSE if p['fec_rec'][0] else 0), ('lost %4d %d' % tuple(p['lost']), curses.A_REVERSE if p['lost'][0] else 0), ('d_err %4d %d' % tuple(p['dec_err']), curses.A_REVERSE if p['dec_err'][0] else 0), @@ -103,18 +133,27 @@ def draw_rx(self, attrs): if y < ymax: addstr(window, y, 0, msg, attr) + session = '' + if session_d: + session = ', FEC: %(fec_k)d/%(fec_n)d' % (session_d) + + addstr(window, 0, 20, 'Flow: %s -> %s%s' % \ + (human_rate(p['all_bytes'][0]), + human_rate(p['out_bytes'][0]), + session)) + if stats_d: - addstr(window, 0, 24, 'Freq [ANT] pkt/s RSSI [dBm] SNR [dB]') - for y, ((freq, ant_id), v) in enumerate(sorted(stats_d.items()), 1): + addstr(window, 2, 20, 'Freq MCS BW [ANT] pkt/s RSSI [dBm] SNR [dB]') + for y, (((freq, mcs_index, bandwith), ant_id), v) in enumerate(sorted(stats_d.items()), 3): pkt_s, rssi_min, rssi_avg, rssi_max, snr_min, snr_avg, snr_max = v if y < ymax: active_tx = '*' if (ant_id >> 8) == tx_ant else ' ' - addstr(window, y, 24, '%04d %s%04x %4d %3d < %3d < %3d %3d < %3d < %3d' % \ - (freq, active_tx, ant_id, pkt_s, + addstr(window, y, 20, '%04d %3d %2d %s%04x %4d %3d < %3d < %3d %3d < %3d < %3d' % \ + (freq, mcs_index, bandwith, active_tx, ant_id, pkt_s, rssi_min, rssi_avg, rssi_max, snr_min, snr_avg, snr_max)) else: - addstr(window, 0, 25, '[No data]', curses.A_REVERSE) + addstr(window, 2, 20, '[No data]', curses.A_REVERSE) window.refresh() @@ -128,10 +167,12 @@ def draw_tx(self, attrs): return window.erase() - addstr(window, 0, 0, '[TX] pkt/s pkt') + addstr(window, 0, 0, ' pkt/s pkt') msg_l = (('sent %4d %d' % tuple(p['injected']), 0), + #('sentb %4d %d' % tuple(p['injected_bytes']), 0), ('udp %4d %d' % tuple(p['incoming']), 0), + #('udpb %4d %d' % tuple(p['incoming_bytes']), 0), ('fec_t %4d %d' % tuple(p['fec_timeouts']), 0), ('drop %4d %d' % tuple(p['dropped']), curses.A_REVERSE if p['dropped'][0] else 0), ('trunc %4d %d' % tuple(p['truncated']), curses.A_REVERSE if p['truncated'][0] else 0)) @@ -141,15 +182,19 @@ def draw_tx(self, attrs): if y < ymax: addstr(window, y, 0, msg, attr) + addstr(window, 0, 20, 'Flow: %s -> %s' % \ + (human_rate(p['incoming_bytes'][0]), + human_rate(p['injected_bytes'][0]))) + if latency_d: - addstr(window, 0, 25, '[ANT] pkt/s Injection [us]') - for y, (k, v) in enumerate(sorted(latency_d.items()), 1): + addstr(window, 2, 20, '[ANT] pkt/s Injection [us]') + for y, (k, v) in enumerate(sorted(latency_d.items()), 3): k = int(k) # json doesn't support int keys injected, dropped, lat_min, lat_avg, lat_max = v if y < ymax: - addstr(window, y, 25, '%04x: %4d %4d < %4d < %4d' % (k, injected, lat_min, lat_avg, lat_max)) + addstr(window, y, 20, '%04x: %4d %4d < %4d < %4d' % (k, injected, lat_min, lat_avg, lat_max)) else: - addstr(window, 0, 25, '[No data]', curses.A_REVERSE) + addstr(window, 2, 20, '[No data]', curses.A_REVERSE) window.refresh() @@ -172,7 +217,7 @@ def init_windows(self): curses.resize_term(height, width) self.stdscr.clear() - service_list = list((s_name, cfg.show_rx_stats, cfg.show_tx_stats) for s_name, _, cfg in parse_services(self.profile)) + service_list = list((s_name, cfg.stream_rx is not None, cfg.stream_tx is not None) for s_name, _, cfg in parse_services(self.profile)) if not service_list: rectangle(self.stdscr, 0, 0, height - 1, width - 1) @@ -203,47 +248,44 @@ def init_windows(self): hoff_float += h_fixed whl = [] - for ww, xoff, txrx, show_stats in [((width // 2 - 1), 0, 'rx', show_rx_stats), - ((width - width // 2 - 1), width // 2, 'tx', show_tx_stats)]: - if show_stats: - err = round(hoff_float) - (hoff_int + int(h_exp)) - wh = int(h_exp) + err - if wh < h_fixed: - raise Exception('Terminal height is too small') - else: - wh = h_fixed + for ww, xoff, txrx, show_stats in [((width * 4 // 7 - 1), 0, 'rx', show_rx_stats), + ((width - width * 4 // 7 - 1), width * 4 // 7, 'tx', show_tx_stats)]: + if not show_stats: + whl.append(0) + continue + + err = round(hoff_float) - (hoff_int + int(h_exp)) + wh = int(h_exp) + err + if wh < h_fixed: + raise Exception('Terminal height is too small') window = self.stdscr.subpad(wh - 2, ww - 2, hoff_int + 1, xoff + 1) window.idlok(1) window.scrollok(1) rectangle(self.stdscr, hoff_int, xoff, hoff_int + wh - 1, xoff + ww) - addstr(self.stdscr, hoff_int, 3 + xoff, '[%s %s %s]' % (self.profile, name, txrx)) - - if show_stats: - self.windows['%s %s' % (name, txrx)] = window - else: - addstr(window, 0, 0, '[statistics disabled]', curses.A_REVERSE) - window.refresh() + addstr(self.stdscr, hoff_int, 3 + xoff, '[%s: %s %s]' % (txrx.upper(), self.profile, name)) + self.windows['%s %s' % (name, txrx)] = window whl.append(wh) + hoff_int += max(whl) self.stdscr.refresh() def startedConnecting(self, connector): - log.msg('Connecting to %s:%d ...' % (connector.host, connector.port)) + set_window_title('Connecting to %s:%d ...' % (connector.host, connector.port)) for window in self.windows.values(): window.erase() - addstr(window, 0, 0, 'Connecting...') + addcstr(window, 'Connecting...') window.refresh() def buildProtocol(self, addr): - log.msg('Connected to %s' % (addr,)) + set_window_title('Connected to %s' % (addr,)) for window in self.windows.values(): window.erase() - addstr(window, 0, 0, 'Waiting for data...') + addcstr(window, 'Waiting for data...') window.refresh() self.resetDelay() @@ -252,21 +294,21 @@ def buildProtocol(self, addr): return p def clientConnectionLost(self, connector, reason): - log.msg('Connection lost: %s' % (reason.value,)) + set_window_title('Connection lost: %s' % (reason.value,)) for window in self.windows.values(): window.erase() - addstr(window, 0, 0, 'Connection lost: %s' % (reason.value,)) + addcstr(window, '[Connection lost]', curses.A_REVERSE) window.refresh() ReconnectingClientFactory.clientConnectionLost(self, connector, reason) def clientConnectionFailed(self, connector, reason): - log.msg('Connection failed: %s' % (reason.value,)) + set_window_title('Connection failed: %s' % (reason.value,)) for window in self.windows.values(): window.erase() - addstr(window, 0, 0, 'Connection failed: %s' % (reason.value,)) + addcstr(window, '[Connection failed]', curses.A_REVERSE) window.refresh() ReconnectingClientFactory.clientConnectionFailed(self, connector, reason) diff --git a/wfb_ng/conf/master.cfg b/wfb_ng/conf/master.cfg index 379c693e..79bceb92 100644 --- a/wfb_ng/conf/master.cfg +++ b/wfb_ng/conf/master.cfg @@ -112,8 +112,6 @@ link_domain = "test_two_way_udp" stream_rx = None stream_tx = None keypair = None -show_rx_stats = True -show_tx_stats = True mirror = False # Set to true if you want to mirror packet via all cards for redundancy. Not recommended if cards are on one frequency channel. # Radio settings for TX and RX @@ -175,7 +173,6 @@ fec_timeout = 0 # [ms], 0 to disable. If no new packets during timeout, emi [drone_video] -show_rx_stats = False # this is tx only endpoint and has not rx stats peer = 'listen://0.0.0.0:5602' # listen for video stream (drone) @@ -191,7 +188,6 @@ default_route = False [gs_video] -show_tx_stats = False # this is rx only endpoint and has not tx stats peer = 'connect://127.0.0.1:5600' # outgoing connection for video sink (GS) diff --git a/wfb_ng/server.py b/wfb_ng/server.py index 36d4ccc1..5b8f4d5e 100644 --- a/wfb_ng/server.py +++ b/wfb_ng/server.py @@ -61,6 +61,7 @@ class StatisticsProtocol(Int32StringReceiver): MAX_LENGTH = 1024 * 1024 def connectionMade(self): + self.sendString(msgpack.packb(dict(type='cli_title', cli_title=self.factory.cli_title))) self.factory.ui_sessions.append(self) def stringReceived(self, string): @@ -81,7 +82,7 @@ class StatsAndSelectorFactory(Factory): Aggregate RX stats and select TX antenna """ - def __init__(self): + def __init__(self, profile, wlans, link_domain): self.ant_sel_cb_list = [] self.rssi_cb_l = [] @@ -92,6 +93,9 @@ def __init__(self): # tcp sockets for UI self.ui_sessions = [] + # CLI title + self.cli_title = 'WFB-ng_%s @%s %s [%s]' % (settings.common.version, profile, ', '.join(wlans), link_domain) + def add_ant_sel_cb(self, ant_sel_cb): self.ant_sel_cb_list.append(ant_sel_cb) ant_sel_cb(self.tx_sel) @@ -102,9 +106,10 @@ def add_rssi_cb(self, rssi_cb): def _stats_agg_by_freq(self, ant_stats): stats_agg = {} - for (freq, ant_id), (pkt_s, - rssi_min, rssi_avg, rssi_max, - snr_min, snr_avg, snr_max) in ant_stats.items(): + for (((freq, mcs_index, bandwith), ant_id), + (pkt_s, + rssi_min, rssi_avg, rssi_max, + snr_min, snr_avg, snr_max)) in ant_stats.items(): if ant_id not in stats_agg: stats_agg[ant_id] = (pkt_s, @@ -157,7 +162,7 @@ def select_tx_antenna(self, stats_agg): self.tx_sel = max_rssi_ant - def update_rx_stats(self, rx_id, packet_stats, ant_stats): + def update_rx_stats(self, rx_id, packet_stats, ant_stats, session): stats_agg = self._stats_agg_by_freq(ant_stats) card_rssi_l = list(rssi_avg for pkt_s, @@ -193,7 +198,9 @@ def update_rx_stats(self, rx_id, packet_stats, ant_stats): # Send stats to CLI sessions for s in self.ui_sessions: - s.send_stats(dict(type='rx', id=rx_id, tx_ant=self.tx_sel, packets=packet_stats, rx_ant_stats=ant_stats)) + s.send_stats(dict(type='rx', id=rx_id, tx_ant=self.tx_sel, + packets=packet_stats, rx_ant_stats=ant_stats, + session=session)) def update_tx_stats(self, tx_id, packet_stats, ant_latency): if settings.common.debug: @@ -217,6 +224,7 @@ def __init__(self, ant_stat_cb, rx_id): self.rx_id = rx_id self.ant = {} self.count_all = None + self.session = None def lineReceived(self, line): line = line.decode('utf-8').strip() @@ -232,27 +240,27 @@ def lineReceived(self, line): if cmd == 'RX_ANT': if len(cols) != 5: raise BadTelemetry() - self.ant[(int(cols[2]), int(cols[3], 16))] = tuple(int(i) for i in cols[4].split(':')) + self.ant[(tuple(int(i) for i in cols[2].split(':')), int(cols[3], 16))] = tuple(int(i) for i in cols[4].split(':')) elif cmd == 'PKT': if len(cols) != 3: raise BadTelemetry() - p_all, p_dec_err, p_dec_ok, p_fec_rec, p_lost, p_bad, p_outgoing = list(int(i) for i in cols[2].split(':')) + p_all, b_all, p_dec_err, p_dec_ok, p_fec_rec, p_lost, p_bad, p_outgoing, b_outgoing = list(int(i) for i in cols[2].split(':')) if not self.count_all: - self.count_all = (p_all, p_dec_ok, p_fec_rec, p_lost, p_dec_err, p_bad, p_outgoing) + self.count_all = (p_all, b_all, p_dec_ok, p_fec_rec, p_lost, p_dec_err, p_bad, p_outgoing, b_outgoing) else: - self.count_all = tuple((a + b) for a, b in zip((p_all, p_dec_ok, p_fec_rec, p_lost, p_dec_err, p_bad, p_outgoing), + self.count_all = tuple((a + b) for a, b in zip((p_all, b_all, p_dec_ok, p_fec_rec, p_lost, p_dec_err, p_bad, p_outgoing, b_outgoing), self.count_all)) - stats = dict(zip(('all', 'dec_ok', 'fec_rec', 'lost', 'dec_err', 'bad', 'out'), - zip((p_all, p_dec_ok, p_fec_rec, p_lost, p_dec_err, p_bad, p_outgoing), + stats = dict(zip(('all', 'all_bytes', 'dec_ok', 'fec_rec', 'lost', 'dec_err', 'bad', 'out', 'out_bytes'), + zip((p_all, b_all, p_dec_ok, p_fec_rec, p_lost, p_dec_err, p_bad, p_outgoing, b_outgoing), self.count_all))) # Send stats to aggregators if self.ant_stat_cb is not None: - self.ant_stat_cb.update_rx_stats(self.rx_id, stats, dict(self.ant)) + self.ant_stat_cb.update_rx_stats(self.rx_id, stats, dict(self.ant), self.session) self.ant.clear() @@ -261,6 +269,7 @@ def lineReceived(self, line): raise BadTelemetry() epoch, fec_type, fec_k, fec_n = list(int(i) for i in cols[2].split(':')) + self.session = dict(fec_type=fec_types.get(fec_type, 'Unknown'), fec_k=fec_k, fec_n=fec_n) log.msg('New session detected [%s]: FEC=%s K=%d, N=%d, epoch=%d' % (self.rx_id, fec_types.get(fec_type, 'Unknown'), fec_k, fec_n, epoch)) else: @@ -319,16 +328,16 @@ def lineReceived(self, line): if len(cols) != 3: raise BadTelemetry() - p_fec_timeouts, p_incoming, p_injected, p_dropped, p_truncated = list(int(i) for i in cols[2].split(':')) + p_fec_timeouts, p_incoming, b_incoming, p_injected, b_injected, p_dropped, p_truncated = list(int(i) for i in cols[2].split(':')) if not self.count_all: - self.count_all = (p_fec_timeouts, p_incoming, p_injected, p_dropped, p_truncated) + self.count_all = (p_fec_timeouts, p_incoming, b_incoming, p_injected, b_injected, p_dropped, p_truncated) else: - self.count_all = tuple((a + b) for a, b in zip((p_fec_timeouts, p_incoming, p_injected, p_dropped, p_truncated), + self.count_all = tuple((a + b) for a, b in zip((p_fec_timeouts, p_incoming, b_incoming, p_injected, b_injected, p_dropped, p_truncated), self.count_all)) - stats = dict(zip(('fec_timeouts', 'incoming', 'injected', 'dropped', 'truncated'), - zip((p_fec_timeouts, p_incoming, p_injected, p_dropped, p_truncated), + stats = dict(zip(('fec_timeouts', 'incoming', 'incoming_bytes', 'injected', 'injected_bytes', 'dropped', 'truncated'), + zip((p_fec_timeouts, p_incoming, b_incoming, p_injected, b_injected, p_dropped, p_truncated), self.count_all))) # Send stats to aggregators @@ -500,8 +509,8 @@ def init(profiles, wlans): for profile, service_list in services: # Domain wide antenna selector - ant_sel_f = StatsAndSelectorFactory() profile_cfg = getattr(settings, profile) + ant_sel_f = StatsAndSelectorFactory(profile, wlans, profile_cfg.link_domain) link_id = int.from_bytes(hashlib.sha1(profile_cfg.link_domain.encode('utf-8')).digest()[:3], 'big') if profile_cfg.stats_port: @@ -897,7 +906,7 @@ def main(): log.theLogPublisher._startLogging(obs.emit, False) - log.msg('WFB version %s-%s' % (settings.common.version, settings.common.commit[:8])) + log.msg('WFB-ng version %s-%s' % (settings.common.version, settings.common.commit[:8])) profiles, wlans = sys.argv[1], list(wlan for arg in sys.argv[2:] for wlan in arg.split()) uname = os.uname() log.msg('Run on %s/%s @%s, profile(s) %s using %s' % (uname[4], uname[2], uname[1], profiles, ', '.join(wlans)))