diff --git a/quinn/src/lib.rs b/quinn/src/lib.rs index 4720e98768..044115e52b 100644 --- a/quinn/src/lib.rs +++ b/quinn/src/lib.rs @@ -58,6 +58,7 @@ mod mutex; mod platform; mod recv_stream; mod send_stream; +mod work_limiter; pub use proto::{ crypto, ApplicationClose, Certificate, CertificateChain, Chunk, ConfigError, ConnectError, diff --git a/quinn/src/work_limiter.rs b/quinn/src/work_limiter.rs new file mode 100644 index 0000000000..6a14415c47 --- /dev/null +++ b/quinn/src/work_limiter.rs @@ -0,0 +1,235 @@ +use std::time::{Duration, Instant}; + +/// Limits the amount of time spent on a certain type of work in a cycle +/// +/// The limiter works dynamically: For a sampled subset of cycles it measures the +/// the time that is approximately required for fulfilling 1 work item, and +/// calculates the amount of allowed work items per cycle. +/// The estimates are smoothed over all cycles where the exact duration is measured. +/// +/// In cycles where no measurement is performed the previously determined work limit +/// is used. +/// +/// For the limiter the exact definition of a work item does not matter. +/// It could for example track the amount of transmitted bytes per cycle, +/// or the amount of transmitted datagrams per cycle. +/// It will however work best if the required time to complete a work item is +/// constant. +#[derive(Debug)] +pub struct WorkLimiter { + /// Whether to measure the required work time, or to use the previous estimates + mode: LimiterMode, + /// The current cycle number + cycle: u16, + /// The time the cycle started - only used in measurement mode + start_time: Instant, + /// How many work items have been completed in the cycle + completed_work_items: usize, + /// The amount of work items which are allowed for a cycle + allowed_work_items: usize, + /// The desired cycle time + desired_cycle_time: Duration, + /// The estimated and smoothed time per work item in nanoseconds + smoothed_time_per_work_item_nanos: f64, +} + +impl WorkLimiter { + pub fn new(desired_cycle_time: Duration) -> Self { + Self { + mode: LimiterMode::Measure, + cycle: 0, + start_time: Instant::now(), + completed_work_items: 0, + allowed_work_items: 0, + desired_cycle_time, + smoothed_time_per_work_item_nanos: 0.0, + } + } + + /// Starts one work cycle + pub fn start_cycle(&mut self) { + self.completed_work_items = 0; + match self.mode { + LimiterMode::Measure => { + self.start_time = Instant::now(); + } + LimiterMode::HistoricData => {} + } + } + + /// Returns whether more work can be performed inside the `desired_cycle_time` + /// + /// Requires that previous work was tracked using `record_work`. + pub fn allow_work(&mut self) -> bool { + match self.mode { + LimiterMode::Measure => self.start_time.elapsed() < self.desired_cycle_time, + LimiterMode::HistoricData => self.completed_work_items < self.allowed_work_items, + } + } + + /// Records that `work` additional work items have been completed inside the cycle + /// + /// Must be called between `start_cycle` and `finish_cycle`. + pub fn record_work(&mut self, work: usize) { + self.completed_work_items += work; + } + + /// Finishes one work cycle + /// + /// For cycles where the exact duration is measured this will update the estimates + /// for the time per work item and the limit of allowed work items per cycle. + pub fn finish_cycle(&mut self) { + // If no work was done in the cycle drop the measurement, it won't be useful + if self.completed_work_items == 0 { + return; + } + + if let LimiterMode::Measure = self.mode { + let elapsed = self.start_time.elapsed(); + + let time_per_work_item_nanos = + (elapsed.as_nanos()) as f64 / self.completed_work_items as f64; + + if self.allowed_work_items == 0 { + // Initial estimate + self.smoothed_time_per_work_item_nanos = time_per_work_item_nanos; + } else { + // Smoothed estimate + self.smoothed_time_per_work_item_nanos = + (7.0 * self.smoothed_time_per_work_item_nanos + time_per_work_item_nanos) / 8.0; + } + + self.allowed_work_items = ((self.desired_cycle_time.as_nanos()) as f64 + / self.smoothed_time_per_work_item_nanos) + as usize; + } + + self.cycle = self.cycle.wrapping_add(1); + if self.cycle % SAMPLING_INTERVAL == 0 { + self.mode = LimiterMode::Measure; + } else { + self.mode = LimiterMode::HistoricData; + } + } +} + +/// We take a measurement sample once every `SAMPLING_INTERVAL` cycles +const SAMPLING_INTERVAL: u16 = 256; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum LimiterMode { + Measure, + HistoricData, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn limit_work() { + const CYCLE_TIME: Duration = Duration::from_millis(500); + const BATCH_WORK_ITEMS: usize = 12; + const BATCH_TIME: Duration = Duration::from_millis(50); + + const EXPECTED_INITIAL_BATCHES: usize = + (CYCLE_TIME.as_nanos() / BATCH_TIME.as_nanos()) as usize; + const EXPECTED_ALLOWED_WORK_ITEMS: usize = EXPECTED_INITIAL_BATCHES * BATCH_WORK_ITEMS; + + let mut limiter = WorkLimiter::new(CYCLE_TIME); + + // The initial cycle is measuring + limiter.start_cycle(); + let mut initial_batches = 0; + while limiter.allow_work() { + limiter.record_work(BATCH_WORK_ITEMS); + std::thread::sleep(BATCH_TIME); + initial_batches += 1; + } + limiter.finish_cycle(); + + assert!( + approximates(initial_batches, EXPECTED_INITIAL_BATCHES), + "Expected {} allowed initial batches, but {} had been performed", + EXPECTED_INITIAL_BATCHES, + initial_batches + ); + + assert!( + limiter.allowed_work_items >= 3 * EXPECTED_ALLOWED_WORK_ITEMS / 4 + && limiter.allowed_work_items <= 5 * EXPECTED_ALLOWED_WORK_ITEMS / 4, + "Expected {} allowed work items, but {} are allowed", + EXPECTED_ALLOWED_WORK_ITEMS, + limiter.allowed_work_items + ); + let initial_time_per_work_item = limiter.smoothed_time_per_work_item_nanos; + + // The next cycles are using historic data + const BATCH_SIZES: [usize; 4] = [1, 2, 3, 5]; + for &batch_size in &BATCH_SIZES { + limiter.start_cycle(); + let mut allowed_work = 0; + while limiter.allow_work() { + limiter.record_work(batch_size); + allowed_work += batch_size; + } + limiter.finish_cycle(); + + assert!( + approximates(allowed_work, EXPECTED_ALLOWED_WORK_ITEMS), + "Expected {} allowed work items, but {} are allowed", + EXPECTED_ALLOWED_WORK_ITEMS, + allowed_work + ); + } + + // After `SAMPLING_INTERVAL`, we get into measurement mode again + for _ in 0..(SAMPLING_INTERVAL as usize - BATCH_SIZES.len() - 1) { + limiter.start_cycle(); + limiter.record_work(1); + limiter.finish_cycle(); + } + + // We now do more work per cycle, and expect the estimate of allowed + // work items to go up + const BATCH_WORK_ITEMS_2: usize = 96; + const TIME_PER_WORK_ITEMS_2_NANOS: f64 = + CYCLE_TIME.as_nanos() as f64 / (EXPECTED_INITIAL_BATCHES * BATCH_WORK_ITEMS_2) as f64; + + let expected_updated_time_per_work_item = + (initial_time_per_work_item * 7.0 + TIME_PER_WORK_ITEMS_2_NANOS) / 8.0; + let expected_updated_allowed_work_items = + (CYCLE_TIME.as_nanos() as f64 / expected_updated_time_per_work_item) as usize; + + limiter.start_cycle(); + let mut initial_batches = 0; + while limiter.allow_work() { + limiter.record_work(BATCH_WORK_ITEMS_2); + std::thread::sleep(BATCH_TIME); + initial_batches += 1; + } + limiter.finish_cycle(); + + assert!( + approximates(initial_batches, EXPECTED_INITIAL_BATCHES), + "Expected {} allowed initial batches, but {} had been performed", + EXPECTED_INITIAL_BATCHES, + initial_batches + ); + + assert!( + approximates( + limiter.allowed_work_items, + expected_updated_allowed_work_items + ), + "Expected {} allowed work items, but {} are allowed", + expected_updated_allowed_work_items, + limiter.allowed_work_items + ); + } + + /// Checks whether a and b are approximately the same (25% error rate) + fn approximates(a: usize, b: usize) -> bool { + a >= b * 3 / 4 && a <= b * 5 / 4 + } +}