-
Notifications
You must be signed in to change notification settings - Fork 317
feat(multipath): add back path congestion metrics #3669
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: Frando/mp-metrics-basics
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,6 +41,9 @@ use crate::{ | |
| mod guarded_channel; | ||
| mod path_state; | ||
|
|
||
| #[cfg(feature = "metrics")] | ||
| mod metrics; | ||
|
|
||
| // TODO: use this | ||
| // /// Number of addresses that are not active that we keep around per endpoint. | ||
| // /// | ||
|
|
@@ -79,6 +82,9 @@ const APPLICATION_ABANDON_PATH: u8 = 30; | |
| /// in a high frequency, and to keep data about previous path around for subsequent connections. | ||
| const ACTOR_MAX_IDLE_TIMEOUT: Duration = Duration::from_secs(60); | ||
|
|
||
| /// Interval in which connection and path metrics are emitted. | ||
| const METRICS_INTERVAL: Duration = Duration::from_secs(10); | ||
|
|
||
| /// A stream of events from all paths for all connections. | ||
| /// | ||
| /// The connection is identified using [`ConnId`]. The event `Err` variant happens when the | ||
|
|
@@ -224,6 +230,7 @@ impl RemoteStateActor { | |
| trace!("actor started"); | ||
| let idle_timeout = MaybeFuture::None; | ||
| tokio::pin!(idle_timeout); | ||
| let mut metrics_interval = time::interval(METRICS_INTERVAL); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this be cfged as well?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Failed to get the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, sth like that I have done in the past |
||
| loop { | ||
| let scheduled_path_open = match self.scheduled_open_path { | ||
| Some(when) => MaybeFuture::Some(time::sleep_until(when)), | ||
|
|
@@ -266,6 +273,9 @@ impl RemoteStateActor { | |
| self.scheduled_holepunch = None; | ||
| self.trigger_holepunching().await; | ||
| } | ||
| _ = metrics_interval.tick() => { | ||
| self.record_metrics(); | ||
| } | ||
| _ = &mut idle_timeout => { | ||
| if self.connections.is_empty() && inbox.close_if_idle() { | ||
| trace!("idle timeout expired and still idle: terminate actor"); | ||
|
|
@@ -388,7 +398,8 @@ impl RemoteStateActor { | |
| paths: Default::default(), | ||
| open_paths: Default::default(), | ||
| path_ids: Default::default(), | ||
| transport_summary: TransportSummary::default(), | ||
| #[cfg(feature = "metrics")] | ||
| metrics: Default::default(), | ||
| }) | ||
| .into_mut(); | ||
|
|
||
|
|
@@ -575,8 +586,10 @@ impl RemoteStateActor { | |
|
|
||
| fn handle_connection_close(&mut self, conn_id: ConnId) { | ||
| if let Some(state) = self.connections.remove(&conn_id) { | ||
| self.metrics.num_conns_closed.inc(); | ||
| state.transport_summary.record(&self.metrics); | ||
| #[cfg(feature = "metrics")] | ||
| state.metrics.record_closed(&self.metrics); | ||
| #[cfg(not(feature = "metrics"))] | ||
| let _ = state; | ||
| } | ||
| if self.connections.is_empty() { | ||
| trace!("last connection closed - clearing selected_path"); | ||
|
|
@@ -1025,6 +1038,13 @@ impl RemoteStateActor { | |
| } | ||
| } | ||
| } | ||
|
|
||
| fn record_metrics(&mut self) { | ||
| #[cfg(feature = "metrics")] | ||
| for state in self.connections.values_mut() { | ||
| state.record_metrics_periodic(&self.metrics, self.selected_path.get()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry if I am missing sth, but should this be the path for the specific connection that we pass in here, this seems like a generic one?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have a single selected path per ~~connection ~~remote endpoint. This is passed through here, because the metrics tracking code further down uses the RTT of the selected path as the connection's RTT. (There is no connection-level RTT exposed anymore with multipath on the quinn level).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but why is it the same for all connections?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently the selected path logic works on the level of the remote endpoint, and the result is then applied to all connections. |
||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Messages to send to the [`RemoteStateActor`]. | ||
|
|
@@ -1127,8 +1147,11 @@ struct ConnectionState { | |
| open_paths: FxHashMap<PathId, transports::Addr>, | ||
| /// Reverse map of [`Self::paths]. | ||
| path_ids: FxHashMap<transports::Addr, PathId>, | ||
| /// Summary over transports used in this connection, for metrics tracking. | ||
| transport_summary: TransportSummary, | ||
| /// Tracker for stateful metrics for this connection and its paths | ||
| /// | ||
| /// Feature-gated on the `metrics` feature because it increases memory use. | ||
| #[cfg(feature = "metrics")] | ||
| metrics: self::metrics::MetricsTracker, | ||
| } | ||
|
|
||
| impl ConnectionState { | ||
|
|
@@ -1140,7 +1163,8 @@ impl ConnectionState { | |
|
|
||
| /// Tracks an open path for the connection. | ||
| fn add_open_path(&mut self, remote: transports::Addr, path_id: PathId) { | ||
| self.transport_summary.add_path(&remote); | ||
| #[cfg(feature = "metrics")] | ||
| self.metrics.add_path(path_id, &remote); | ||
| self.paths.insert(path_id, remote.clone()); | ||
| self.open_paths.insert(path_id, remote.clone()); | ||
| self.path_ids.insert(remote, path_id); | ||
|
|
@@ -1153,11 +1177,15 @@ impl ConnectionState { | |
| self.path_ids.remove(&addr); | ||
| } | ||
| self.open_paths.remove(path_id); | ||
| #[cfg(feature = "metrics")] | ||
| self.metrics.remove_path(path_id); | ||
| } | ||
|
|
||
| /// Removes the path from the open paths. | ||
| fn remove_open_path(&mut self, path_id: &PathId) { | ||
| self.open_paths.remove(path_id); | ||
| #[cfg(feature = "metrics")] | ||
| self.metrics.remove_path(path_id); | ||
|
|
||
| self.update_pub_path_info(); | ||
| } | ||
|
|
@@ -1177,6 +1205,19 @@ impl ConnectionState { | |
|
|
||
| self.pub_open_paths.set(new).ok(); | ||
| } | ||
|
|
||
| #[cfg(feature = "metrics")] | ||
| fn record_metrics_periodic( | ||
| &mut self, | ||
| metrics: &MagicsockMetrics, | ||
| selected_path: Option<transports::Addr>, | ||
| ) { | ||
| let Some(conn) = self.handle.upgrade() else { | ||
| return; | ||
| }; | ||
| self.metrics | ||
| .record_periodic(metrics, &conn, &self.open_paths, selected_path); | ||
| } | ||
| } | ||
|
|
||
| /// Watcher for the open paths and selected transmission path in a connection. | ||
|
|
@@ -1387,41 +1428,3 @@ impl Future for OnClosed { | |
| Poll::Ready(self.conn_id) | ||
| } | ||
| } | ||
|
|
||
| /// Used for metrics tracking. | ||
| #[derive(Debug, Clone, Copy, Default)] | ||
| enum TransportSummary { | ||
| #[default] | ||
| None, | ||
| IpOnly, | ||
| RelayOnly, | ||
| IpAndRelay, | ||
| } | ||
|
|
||
| impl TransportSummary { | ||
| fn add_path(&mut self, addr: &transports::Addr) { | ||
| use transports::Addr; | ||
| *self = match (*self, addr) { | ||
| (TransportSummary::None | TransportSummary::IpOnly, Addr::Ip(_)) => Self::IpOnly, | ||
| (TransportSummary::None | TransportSummary::RelayOnly, Addr::Relay(_, _)) => { | ||
| Self::RelayOnly | ||
| } | ||
| _ => Self::IpAndRelay, | ||
| } | ||
| } | ||
|
|
||
| fn record(&self, metrics: &MagicsockMetrics) { | ||
| match self { | ||
| TransportSummary::IpOnly => { | ||
| metrics.num_conns_transport_ip_only.inc(); | ||
| } | ||
| TransportSummary::RelayOnly => { | ||
| metrics.num_conns_transport_relay_only.inc(); | ||
| } | ||
| TransportSummary::IpAndRelay => { | ||
| metrics.num_conns_transport_ip_and_relay.inc(); | ||
| } | ||
| TransportSummary::None => {} | ||
| } | ||
| } | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is metrics backend code. I'll have a much easier time with this PR if we don't try and do anything like that here.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,157 @@ | ||
| //! Tracker for stateful metrics for connections and paths. | ||
|
|
||
| use std::time::Duration; | ||
|
|
||
| use quinn_proto::PathId; | ||
| use rustc_hash::FxHashMap; | ||
|
|
||
| use crate::{magicsock::transports, metrics::MagicsockMetrics}; | ||
|
|
||
| #[derive(Debug, Default)] | ||
| pub(super) struct MetricsTracker { | ||
| transport_summary: TransportSummary, | ||
| path_rtt_variance: FxHashMap<PathId, RttVariance>, | ||
| } | ||
|
|
||
| impl MetricsTracker { | ||
| pub(super) fn add_path(&mut self, path_id: PathId, remote: &transports::Addr) { | ||
| self.transport_summary.add_path(remote); | ||
| self.path_rtt_variance.insert(path_id, Default::default()); | ||
| } | ||
|
|
||
| pub(super) fn remove_path(&mut self, path_id: &PathId) { | ||
| self.path_rtt_variance.remove(path_id); | ||
| } | ||
|
|
||
| pub(super) fn record_periodic( | ||
| &mut self, | ||
| metrics: &MagicsockMetrics, | ||
| conn: &quinn::Connection, | ||
| path_remotes: &FxHashMap<PathId, transports::Addr>, | ||
| selected_path: Option<transports::Addr>, | ||
| ) { | ||
| for (path_id, remote) in path_remotes.iter() { | ||
| let Some(stats) = conn.path_stats(*path_id) else { | ||
| continue; | ||
| }; | ||
|
|
||
| let loss_rate = if stats.sent_packets == 0 { | ||
| 0.0 | ||
| } else { | ||
| stats.lost_packets as f64 / stats.sent_packets as f64 | ||
| }; | ||
| metrics.path_packet_loss_rate.observe(loss_rate); | ||
|
|
||
| if Some(remote) == selected_path.as_ref() { | ||
| metrics | ||
| .connection_latency_ms | ||
| .observe(stats.rtt.as_millis() as f64); | ||
| } | ||
|
|
||
| if let Some(rtt_variance) = self.path_rtt_variance.get_mut(path_id) { | ||
| rtt_variance.add_rtt_sample(stats.rtt); | ||
| if let Some(variance) = rtt_variance.rtt_variance() { | ||
| metrics | ||
| .path_rtt_variance_ms | ||
| .observe(variance.as_millis() as f64); | ||
| } | ||
|
|
||
| let quality = rtt_variance.quality_score(loss_rate); | ||
| metrics.path_quality_score.observe(quality); | ||
| }; | ||
| } | ||
| } | ||
|
|
||
| pub(super) fn record_closed(&self, metrics: &MagicsockMetrics) { | ||
| metrics.num_conns_closed.inc(); | ||
| match self.transport_summary { | ||
| TransportSummary::IpOnly => { | ||
| metrics.num_conns_transport_ip_only.inc(); | ||
| } | ||
| TransportSummary::RelayOnly => { | ||
| metrics.num_conns_transport_relay_only.inc(); | ||
| } | ||
| TransportSummary::IpAndRelay => { | ||
| metrics.num_conns_transport_ip_and_relay.inc(); | ||
| } | ||
| TransportSummary::None => {} | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Tracks RTT variance over time, as a congestion marker. | ||
| #[derive(Debug, Default)] | ||
| struct RttVariance { | ||
| /// Rolling window of recent latency measurements (stores up to 8 samples). | ||
| samples: [Option<Duration>; 8], | ||
| /// Index for next sample insertion (circular buffer). | ||
| index: usize, | ||
| } | ||
|
|
||
| impl RttVariance { | ||
| fn add_rtt_sample(&mut self, rtt: Duration) { | ||
| self.samples[self.index] = Some(rtt); | ||
| self.index = (self.index + 1) % self.samples.len(); | ||
| } | ||
|
|
||
| /// Calculate RTT variance as a congestion indicator. | ||
| /// Higher variance suggests congestion or unstable path. | ||
| fn rtt_variance(&self) -> Option<Duration> { | ||
| let samples: Vec<Duration> = self.samples.iter().filter_map(|&s| s).collect(); | ||
|
|
||
| if samples.len() < 2 { | ||
| return None; | ||
| } | ||
|
|
||
| let mean = samples.iter().sum::<Duration>() / samples.len() as u32; | ||
| let variance: f64 = samples | ||
| .iter() | ||
| .map(|&s| { | ||
| let diff = s.as_secs_f64() - mean.as_secs_f64(); | ||
| diff * diff | ||
| }) | ||
| .sum::<f64>() | ||
| / samples.len() as f64; | ||
|
|
||
| Some(Duration::from_secs_f64(variance.sqrt())) | ||
| } | ||
|
|
||
| /// Path quality score (0.0 = worst, 1.0 = best). | ||
| /// Factors in packet loss and RTT variance. | ||
| fn quality_score(&self, packet_loss: f64) -> f64 { | ||
| let loss_penalty = (1.0 - packet_loss).clamp(0.0, 1.0); | ||
|
|
||
| // Penalize high RTT variance | ||
| let variance_penalty = match self.rtt_variance() { | ||
| Some(var) if var.as_millis() > 50 => 0.7, | ||
| Some(var) if var.as_millis() > 20 => 0.85, | ||
| Some(_) => 1.0, | ||
| None => 1.0, | ||
| }; | ||
|
|
||
| loss_penalty * variance_penalty | ||
| } | ||
| } | ||
|
|
||
| /// Used for metrics tracking. | ||
| #[derive(Debug, Clone, Copy, Default)] | ||
| enum TransportSummary { | ||
| #[default] | ||
| None, | ||
| IpOnly, | ||
| RelayOnly, | ||
| IpAndRelay, | ||
| } | ||
|
|
||
| impl TransportSummary { | ||
| fn add_path(&mut self, addr: &transports::Addr) { | ||
| use transports::Addr; | ||
| *self = match (*self, addr) { | ||
| (TransportSummary::None | TransportSummary::IpOnly, Addr::Ip(_)) => Self::IpOnly, | ||
| (TransportSummary::None | TransportSummary::RelayOnly, Addr::Relay(_, _)) => { | ||
| Self::RelayOnly | ||
| } | ||
| _ => Self::IpAndRelay, | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Soo, I'm going to argue that this is fundamentally doing metrics wrong. Metrics increment counters or record historgrams at discreet points. If you are recording something on a timer you were supposed to provide the metrics that can compute the thing, and the metrics backend can compute it on the fly over the range it wants, using sampling intervals it wants etc etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would be discreet points to record connection latency at? Open/close path events?