Skip to content

Commit

Permalink
Add WorkLimiter , which can limit work based on the observed time spe…
Browse files Browse the repository at this point in the history
…nt on it

This change adds the `WorkLimiter` component, which measures the amount of
time required to perform some work items and will limit work based on time
instead of pure iterations.
  • Loading branch information
Matthias247 committed May 27, 2021
1 parent 5ab895b commit 8e5ba90
Show file tree
Hide file tree
Showing 2 changed files with 236 additions and 0 deletions.
1 change: 1 addition & 0 deletions quinn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ mod mutex;
mod platform;
mod recv_stream;
mod send_stream;
mod work_limiter;

pub use proto::{
crypto, ApplicationClose, Certificate, CertificateChain, Chunk, ConfigError, ConnectError,
Expand Down
235 changes: 235 additions & 0 deletions quinn/src/work_limiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
use std::time::{Duration, Instant};

/// Limits the amount of time spent on a certain type of work in a cycle
///
/// The limiter works dynamically: For a sampled subset of cycles it measures the
/// the time that is approximately required for fulfilling 1 work item, and
/// calculates the amount of allowed work items per cycle.
/// The estimates are smoothed over all cycles where the exact duration is measured.
///
/// In cycles where no measurement is performed the previously determined work limit
/// is used.
///
/// For the limiter the exact definition of a work item does not matter.
/// It could for example track the amount of transmitted bytes per cycle,
/// or the amount of transmitted datagrams per cycle.
/// It will however work best if the required time to complete a work item is
/// constant.
#[derive(Debug)]
pub struct WorkLimiter {
/// Whether to measure the required work time, or to use the previous estimates
mode: LimiterMode,
/// The current cycle number
cycle: u16,
/// The time the cycle started - only used in measurement mode
start_time: Instant,
/// How many work items have been completed in the cycle
completed_work_items: usize,
/// The amount of work items which are allowed for a cycle
allowed_work_items: usize,
/// The desired cycle time
desired_cycle_time: Duration,
/// The estimated and smoothed time per work item in nanoseconds
smoothed_time_per_work_item_nanos: f64,
}

impl WorkLimiter {
pub fn new(desired_cycle_time: Duration) -> Self {
Self {
mode: LimiterMode::Measure,
cycle: 0,
start_time: Instant::now(),
completed_work_items: 0,
allowed_work_items: 0,
desired_cycle_time,
smoothed_time_per_work_item_nanos: 0.0,
}
}

/// Starts one work cycle
pub fn start_cycle(&mut self) {
self.completed_work_items = 0;
match self.mode {
LimiterMode::Measure => {
self.start_time = Instant::now();
}
LimiterMode::HistoricData => {}
}
}

/// Returns whether more work can be performed inside the `desired_cycle_time`
///
/// Requires that previous work was tracked using `record_work`.
pub fn allow_work(&mut self) -> bool {
match self.mode {
LimiterMode::Measure => self.start_time.elapsed() < self.desired_cycle_time,
LimiterMode::HistoricData => self.completed_work_items < self.allowed_work_items,
}
}

/// Records that `work` additional work items have been completed inside the cycle
///
/// Must be called between `start_cycle` and `finish_cycle`.
pub fn record_work(&mut self, work: usize) {
self.completed_work_items += work;
}

/// Finishes one work cycle
///
/// For cycles where the exact duration is measured this will update the estimates
/// for the time per work item and the limit of allowed work items per cycle.
pub fn finish_cycle(&mut self) {
// If no work was done in the cycle drop the measurement, it won't be useful
if self.completed_work_items == 0 {
return;
}

if let LimiterMode::Measure = self.mode {
let elapsed = self.start_time.elapsed();

let time_per_work_item_nanos =
(elapsed.as_nanos()) as f64 / self.completed_work_items as f64;

if self.allowed_work_items == 0 {
// Initial estimate
self.smoothed_time_per_work_item_nanos = time_per_work_item_nanos;
} else {
// Smoothed estimate
self.smoothed_time_per_work_item_nanos =
(7.0 * self.smoothed_time_per_work_item_nanos + time_per_work_item_nanos) / 8.0;
}

self.allowed_work_items = ((self.desired_cycle_time.as_nanos()) as f64
/ self.smoothed_time_per_work_item_nanos)
as usize;
}

self.cycle = self.cycle.wrapping_add(1);
if self.cycle % SAMPLING_INTERVAL == 0 {
self.mode = LimiterMode::Measure;
} else {
self.mode = LimiterMode::HistoricData;
}
}
}

/// We take a measurement sample once every `SAMPLING_INTERVAL` cycles
const SAMPLING_INTERVAL: u16 = 256;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum LimiterMode {
Measure,
HistoricData,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn limit_work() {
const CYCLE_TIME: Duration = Duration::from_millis(500);
const BATCH_WORK_ITEMS: usize = 12;
const BATCH_TIME: Duration = Duration::from_millis(50);

const EXPECTED_INITIAL_BATCHES: usize =
(CYCLE_TIME.as_nanos() / BATCH_TIME.as_nanos()) as usize;
const EXPECTED_ALLOWED_WORK_ITEMS: usize = EXPECTED_INITIAL_BATCHES * BATCH_WORK_ITEMS;

let mut limiter = WorkLimiter::new(CYCLE_TIME);

// The initial cycle is measuring
limiter.start_cycle();
let mut initial_batches = 0;
while limiter.allow_work() {
limiter.record_work(BATCH_WORK_ITEMS);
std::thread::sleep(BATCH_TIME);
initial_batches += 1;
}
limiter.finish_cycle();

assert!(
approximates(initial_batches, EXPECTED_INITIAL_BATCHES),
"Expected {} allowed initial batches, but {} had been performed",
EXPECTED_INITIAL_BATCHES,
initial_batches
);

assert!(
limiter.allowed_work_items >= 3 * EXPECTED_ALLOWED_WORK_ITEMS / 4
&& limiter.allowed_work_items <= 5 * EXPECTED_ALLOWED_WORK_ITEMS / 4,
"Expected {} allowed work items, but {} are allowed",
EXPECTED_ALLOWED_WORK_ITEMS,
limiter.allowed_work_items
);
let initial_time_per_work_item = limiter.smoothed_time_per_work_item_nanos;

// The next cycles are using historic data
const BATCH_SIZES: [usize; 4] = [1, 2, 3, 5];
for &batch_size in &BATCH_SIZES {
limiter.start_cycle();
let mut allowed_work = 0;
while limiter.allow_work() {
limiter.record_work(batch_size);
allowed_work += batch_size;
}
limiter.finish_cycle();

assert!(
approximates(allowed_work, EXPECTED_ALLOWED_WORK_ITEMS),
"Expected {} allowed work items, but {} are allowed",
EXPECTED_ALLOWED_WORK_ITEMS,
allowed_work
);
}

// After `SAMPLING_INTERVAL`, we get into measurement mode again
for _ in 0..(SAMPLING_INTERVAL as usize - BATCH_SIZES.len() - 1) {
limiter.start_cycle();
limiter.record_work(1);
limiter.finish_cycle();
}

// We now do more work per cycle, and expect the estimate of allowed
// work items to go up
const BATCH_WORK_ITEMS_2: usize = 96;
const TIME_PER_WORK_ITEMS_2_NANOS: f64 =
CYCLE_TIME.as_nanos() as f64 / (EXPECTED_INITIAL_BATCHES * BATCH_WORK_ITEMS_2) as f64;

let expected_updated_time_per_work_item =
(initial_time_per_work_item * 7.0 + TIME_PER_WORK_ITEMS_2_NANOS) / 8.0;
let expected_updated_allowed_work_items =
(CYCLE_TIME.as_nanos() as f64 / expected_updated_time_per_work_item) as usize;

limiter.start_cycle();
let mut initial_batches = 0;
while limiter.allow_work() {
limiter.record_work(BATCH_WORK_ITEMS_2);
std::thread::sleep(BATCH_TIME);
initial_batches += 1;
}
limiter.finish_cycle();

assert!(
approximates(initial_batches, EXPECTED_INITIAL_BATCHES),
"Expected {} allowed initial batches, but {} had been performed",
EXPECTED_INITIAL_BATCHES,
initial_batches
);

assert!(
approximates(
limiter.allowed_work_items,
expected_updated_allowed_work_items
),
"Expected {} allowed work items, but {} are allowed",
expected_updated_allowed_work_items,
limiter.allowed_work_items
);
}

/// Checks whether a and b are approximately the same (25% error rate)
fn approximates(a: usize, b: usize) -> bool {
a >= b * 3 / 4 && a <= b * 5 / 4
}
}

0 comments on commit 8e5ba90

Please sign in to comment.