From 281e84badaa22e968bab5c4ac22662554d7393d2 Mon Sep 17 00:00:00 2001 From: Frando Date: Mon, 17 Nov 2025 16:20:33 +0100 Subject: [PATCH 1/2] feat: add back path congestion metrics --- iroh/src/magicsock/metrics.rs | 37 ++-- iroh/src/magicsock/remote_map/remote_state.rs | 91 +++++----- .../remote_map/remote_state/metrics.rs | 158 ++++++++++++++++++ 3 files changed, 226 insertions(+), 60 deletions(-) create mode 100644 iroh/src/magicsock/remote_map/remote_state/metrics.rs diff --git a/iroh/src/magicsock/metrics.rs b/iroh/src/magicsock/metrics.rs index 13bfa1ce77..3046540ae5 100644 --- a/iroh/src/magicsock/metrics.rs +++ b/iroh/src/magicsock/metrics.rs @@ -1,4 +1,4 @@ -use iroh_metrics::{Counter, MetricsGroup}; +use iroh_metrics::{Counter, Histogram, MetricsGroup}; use serde::{Deserialize, Serialize}; /// Enum of metrics for the module @@ -14,8 +14,11 @@ pub struct Metrics { pub send_ipv4: Counter, pub send_ipv6: Counter, pub send_relay: Counter, + pub send_relay_error: Counter, // Data packets (non-disco) + pub send_data: Counter, + pub send_data_network_down: Counter, pub recv_data_relay: Counter, pub recv_data_ipv4: Counter, pub recv_data_ipv6: Counter, @@ -69,25 +72,27 @@ pub struct Metrics { pub actor_tick_direct_addr_heartbeat: Counter, pub actor_link_change: Counter, pub actor_tick_other: Counter, - // /// Histogram of connection latency in milliseconds across all endpoint connections. - // #[default(Histogram::new(vec![1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, f64::INFINITY]))] - // pub connection_latency_ms: Histogram, - // /* - // * Path Congestion Metrics - // */ + + /// Histogram of connection latency in milliseconds across all endpoint connections. + #[default(Histogram::new(vec![1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, f64::INFINITY]))] + pub connection_latency_ms: Histogram, + + /* + * Path Congestion Metrics + */ // /// Number of times a path was marked as outdated due to consecutive ping failures. // pub path_marked_outdated: Counter, // /// Number of ping failures recorded across all paths. // pub path_ping_failures: Counter, // /// Number of consecutive failure resets (path recovered). // pub path_failure_resets: Counter, - // /// Histogram of packet loss rates (0.0-1.0) observed on UDP paths. - // #[default(Histogram::new(vec![0.0, 0.01, 0.05, 0.1, 0.2, 0.5, 1.0]))] - // pub path_packet_loss_rate: Histogram, - // /// Histogram of RTT variance (in milliseconds) as a congestion indicator. - // #[default(Histogram::new(vec![0.0, 1.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0]))] - // pub path_rtt_variance_ms: Histogram, - // /// Histogram of path quality scores (0.0-1.0). - // #[default(Histogram::new(vec![0.0, 0.3, 0.5, 0.7, 0.85, 0.95, 1.0]))] - // pub path_quality_score: Histogram, + /// Histogram of packet loss rates (0.0-1.0) observed on UDP paths. + #[default(Histogram::new(vec![0.0, 0.01, 0.05, 0.1, 0.2, 0.5, 1.0]))] + pub path_packet_loss_rate: Histogram, + /// Histogram of RTT variance (in milliseconds) as a congestion indicator. + #[default(Histogram::new(vec![0.0, 1.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0]))] + pub path_rtt_variance_ms: Histogram, + /// Histogram of path quality scores (0.0-1.0). + #[default(Histogram::new(vec![0.0, 0.3, 0.5, 0.7, 0.85, 0.95, 1.0]))] + pub path_quality_score: Histogram, } diff --git a/iroh/src/magicsock/remote_map/remote_state.rs b/iroh/src/magicsock/remote_map/remote_state.rs index f97cbbb9c9..117611a95d 100644 --- a/iroh/src/magicsock/remote_map/remote_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state.rs @@ -41,6 +41,9 @@ use crate::{ mod guarded_channel; mod path_state; +#[cfg(feature = "metrics")] +mod metrics; + // TODO: use this // /// Number of addresses that are not active that we keep around per endpoint. // /// @@ -79,6 +82,9 @@ const APPLICATION_ABANDON_PATH: u8 = 30; /// in a high frequency, and to keep data about previous path around for subsequent connections. const ACTOR_MAX_IDLE_TIMEOUT: Duration = Duration::from_secs(60); +/// Interval in which connection and path metrics are emitted. +const METRICS_INTERVAL: Duration = Duration::from_secs(10); + /// A stream of events from all paths for all connections. /// /// The connection is identified using [`ConnId`]. The event `Err` variant happens when the @@ -224,6 +230,7 @@ impl RemoteStateActor { trace!("actor started"); let idle_timeout = MaybeFuture::None; tokio::pin!(idle_timeout); + let mut metrics_interval = time::interval(METRICS_INTERVAL); loop { let scheduled_path_open = match self.scheduled_open_path { Some(when) => MaybeFuture::Some(time::sleep_until(when)), @@ -266,6 +273,9 @@ impl RemoteStateActor { self.scheduled_holepunch = None; self.trigger_holepunching().await; } + _ = metrics_interval.tick() => { + self.record_metrics(); + } _ = &mut idle_timeout => { if self.connections.is_empty() && inbox.close_if_idle() { trace!("idle timeout expired and still idle: terminate actor"); @@ -388,7 +398,8 @@ impl RemoteStateActor { paths: Default::default(), open_paths: Default::default(), path_ids: Default::default(), - transport_summary: TransportSummary::default(), + #[cfg(feature = "metrics")] + metrics: Default::default(), }) .into_mut(); @@ -575,8 +586,10 @@ impl RemoteStateActor { fn handle_connection_close(&mut self, conn_id: ConnId) { if let Some(state) = self.connections.remove(&conn_id) { - self.metrics.num_conns_closed.inc(); - state.transport_summary.record(&self.metrics); + #[cfg(feature = "metrics")] + state.metrics.record_closed(&self.metrics); + #[cfg(not(feature = "metrics"))] + let _ = state; } if self.connections.is_empty() { trace!("last connection closed - clearing selected_path"); @@ -1025,6 +1038,13 @@ impl RemoteStateActor { } } } + + fn record_metrics(&mut self) { + #[cfg(feature = "metrics")] + for state in self.connections.values_mut() { + state.record_metrics_periodic(&self.metrics, self.selected_path.get()); + } + } } /// Messages to send to the [`RemoteStateActor`]. @@ -1127,8 +1147,11 @@ struct ConnectionState { open_paths: FxHashMap, /// Reverse map of [`Self::paths]. path_ids: FxHashMap, - /// Summary over transports used in this connection, for metrics tracking. - transport_summary: TransportSummary, + /// Tracker for stateful metrics for this connection and its paths + /// + /// Feature-gated on the `metrics` feature because it increases memory use. + #[cfg(feature = "metrics")] + metrics: self::metrics::MetricsTracker, } impl ConnectionState { @@ -1140,7 +1163,8 @@ impl ConnectionState { /// Tracks an open path for the connection. fn add_open_path(&mut self, remote: transports::Addr, path_id: PathId) { - self.transport_summary.add_path(&remote); + #[cfg(feature = "metrics")] + self.metrics.add_path(path_id, &remote); self.paths.insert(path_id, remote.clone()); self.open_paths.insert(path_id, remote.clone()); self.path_ids.insert(remote, path_id); @@ -1153,11 +1177,15 @@ impl ConnectionState { self.path_ids.remove(&addr); } self.open_paths.remove(path_id); + #[cfg(feature = "metrics")] + self.metrics.remove_path(path_id); } /// Removes the path from the open paths. fn remove_open_path(&mut self, path_id: &PathId) { self.open_paths.remove(path_id); + #[cfg(feature = "metrics")] + self.metrics.remove_path(path_id); self.update_pub_path_info(); } @@ -1177,6 +1205,19 @@ impl ConnectionState { self.pub_open_paths.set(new).ok(); } + + #[cfg(feature = "metrics")] + fn record_metrics_periodic( + &mut self, + metrics: &MagicsockMetrics, + selected_path: Option, + ) { + let Some(conn) = self.handle.upgrade() else { + return; + }; + self.metrics + .record_periodic(metrics, &conn, &self.open_paths, selected_path); + } } /// Watcher for the open paths and selected transmission path in a connection. @@ -1387,41 +1428,3 @@ impl Future for OnClosed { Poll::Ready(self.conn_id) } } - -/// Used for metrics tracking. -#[derive(Debug, Clone, Copy, Default)] -enum TransportSummary { - #[default] - None, - IpOnly, - RelayOnly, - IpAndRelay, -} - -impl TransportSummary { - fn add_path(&mut self, addr: &transports::Addr) { - use transports::Addr; - *self = match (*self, addr) { - (TransportSummary::None | TransportSummary::IpOnly, Addr::Ip(_)) => Self::IpOnly, - (TransportSummary::None | TransportSummary::RelayOnly, Addr::Relay(_, _)) => { - Self::RelayOnly - } - _ => Self::IpAndRelay, - } - } - - fn record(&self, metrics: &MagicsockMetrics) { - match self { - TransportSummary::IpOnly => { - metrics.num_conns_transport_ip_only.inc(); - } - TransportSummary::RelayOnly => { - metrics.num_conns_transport_relay_only.inc(); - } - TransportSummary::IpAndRelay => { - metrics.num_conns_transport_ip_and_relay.inc(); - } - TransportSummary::None => {} - } - } -} diff --git a/iroh/src/magicsock/remote_map/remote_state/metrics.rs b/iroh/src/magicsock/remote_map/remote_state/metrics.rs new file mode 100644 index 0000000000..de6b8d8edf --- /dev/null +++ b/iroh/src/magicsock/remote_map/remote_state/metrics.rs @@ -0,0 +1,158 @@ +//! Tracker for stateful metrics for connections and paths. + +use std::time::Duration; + +use quinn_proto::PathId; +use rustc_hash::FxHashMap; + +use crate::{magicsock::transports, metrics::MagicsockMetrics}; + +#[derive(Debug, Default)] +pub(super) struct MetricsTracker { + transport_summary: TransportSummary, + path_rtt_variance: FxHashMap, +} + +impl MetricsTracker { + pub(super) fn add_path(&mut self, path_id: PathId, remote: &transports::Addr) { + self.transport_summary.add_path(remote); + self.path_rtt_variance.insert(path_id, Default::default()); + } + + pub(super) fn remove_path(&mut self, path_id: &PathId) { + self.path_rtt_variance.remove(path_id); + } + + pub(super) fn record_periodic( + &mut self, + metrics: &MagicsockMetrics, + conn: &quinn::Connection, + path_remotes: &FxHashMap, + selected_path: Option, + ) { + for (path_id, remote) in path_remotes.iter() { + let Some(stats) = conn.path_stats(*path_id) else { + continue; + }; + + let loss_rate = if stats.sent_packets == 0 { + 0.0 + } else { + stats.lost_packets as f64 / stats.sent_packets as f64 + }; + metrics.path_packet_loss_rate.observe(loss_rate); + + if Some(remote) == selected_path.as_ref() { + metrics + .connection_latency_ms + .observe(stats.rtt.as_millis() as f64); + } + + let Some(rtt_variance) = self.path_rtt_variance.get_mut(path_id) else { + continue; + }; + rtt_variance.add_rtt_sample(stats.rtt); + if let Some(variance) = rtt_variance.rtt_variance() { + metrics + .path_rtt_variance_ms + .observe(variance.as_millis() as f64); + } + + let quality = rtt_variance.quality_score(loss_rate); + metrics.path_quality_score.observe(quality); + } + } + + pub(super) fn record_closed(&self, metrics: &MagicsockMetrics) { + metrics.num_conns_closed.inc(); + match self.transport_summary { + TransportSummary::IpOnly => { + metrics.num_conns_transport_ip_only.inc(); + } + TransportSummary::RelayOnly => { + metrics.num_conns_transport_relay_only.inc(); + } + TransportSummary::IpAndRelay => { + metrics.num_conns_transport_ip_and_relay.inc(); + } + TransportSummary::None => {} + } + } +} + +/// Tracks RTT variance over time, as a congestion marker. +#[derive(Debug, Default)] +struct RttVariance { + /// Rolling window of recent latency measurements (stores up to 8 samples). + samples: [Option; 8], + /// Index for next sample insertion (circular buffer). + index: usize, +} + +impl RttVariance { + fn add_rtt_sample(&mut self, rtt: Duration) { + self.samples[self.index] = Some(rtt); + self.index = (self.index + 1) % self.samples.len(); + } + + /// Calculate RTT variance as a congestion indicator. + /// Higher variance suggests congestion or unstable path. + fn rtt_variance(&self) -> Option { + let samples: Vec = self.samples.iter().filter_map(|&s| s).collect(); + + if samples.len() < 2 { + return None; + } + + let mean = samples.iter().sum::() / samples.len() as u32; + let variance: f64 = samples + .iter() + .map(|&s| { + let diff = s.as_secs_f64() - mean.as_secs_f64(); + diff * diff + }) + .sum::() + / samples.len() as f64; + + Some(Duration::from_secs_f64(variance.sqrt())) + } + + /// Path quality score (0.0 = worst, 1.0 = best). + /// Factors in packet loss and RTT variance. + fn quality_score(&self, packet_loss: f64) -> f64 { + let loss_penalty = (1.0 - packet_loss).clamp(0.0, 1.0); + + // Penalize high RTT variance + let variance_penalty = match self.rtt_variance() { + Some(var) if var.as_millis() > 50 => 0.7, + Some(var) if var.as_millis() > 20 => 0.85, + Some(_) => 1.0, + None => 1.0, + }; + + loss_penalty * variance_penalty + } +} + +/// Used for metrics tracking. +#[derive(Debug, Clone, Copy, Default)] +enum TransportSummary { + #[default] + None, + IpOnly, + RelayOnly, + IpAndRelay, +} + +impl TransportSummary { + fn add_path(&mut self, addr: &transports::Addr) { + use transports::Addr; + *self = match (*self, addr) { + (TransportSummary::None | TransportSummary::IpOnly, Addr::Ip(_)) => Self::IpOnly, + (TransportSummary::None | TransportSummary::RelayOnly, Addr::Relay(_, _)) => { + Self::RelayOnly + } + _ => Self::IpAndRelay, + } + } +} From 66a61f3783d7a6ca11debb73032eeabb5bdf49dc Mon Sep 17 00:00:00 2001 From: Frando Date: Tue, 18 Nov 2025 11:46:55 +0100 Subject: [PATCH 2/2] refactor: use if let not continue --- .../remote_map/remote_state/metrics.rs | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/iroh/src/magicsock/remote_map/remote_state/metrics.rs b/iroh/src/magicsock/remote_map/remote_state/metrics.rs index de6b8d8edf..2b361e8e7e 100644 --- a/iroh/src/magicsock/remote_map/remote_state/metrics.rs +++ b/iroh/src/magicsock/remote_map/remote_state/metrics.rs @@ -48,18 +48,17 @@ impl MetricsTracker { .observe(stats.rtt.as_millis() as f64); } - let Some(rtt_variance) = self.path_rtt_variance.get_mut(path_id) else { - continue; + if let Some(rtt_variance) = self.path_rtt_variance.get_mut(path_id) { + rtt_variance.add_rtt_sample(stats.rtt); + if let Some(variance) = rtt_variance.rtt_variance() { + metrics + .path_rtt_variance_ms + .observe(variance.as_millis() as f64); + } + + let quality = rtt_variance.quality_score(loss_rate); + metrics.path_quality_score.observe(quality); }; - rtt_variance.add_rtt_sample(stats.rtt); - if let Some(variance) = rtt_variance.rtt_variance() { - metrics - .path_rtt_variance_ms - .observe(variance.as_millis() as f64); - } - - let quality = rtt_variance.quality_score(loss_rate); - metrics.path_quality_score.observe(quality); } }