Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement logical clock #6

Merged
merged 2 commits into from
Apr 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ http = { version = "1.1", optional = true }
tui-logger = { version = "0.11", optional = true }
log = { version = "0.4", optional = true }
cfg-if = "1"
parking_lot = "0.12"

[dev-dependencies]
tokio = { version = "1.36", features = ["rt-multi-thread"] }
Expand Down
9 changes: 5 additions & 4 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ use tokio::{
use tokio_util::sync::CancellationToken;

use crate::{
clock::Clock,
collector::{ReportCollector, SilentCollector, TuiCollector},
reporter::{BenchReporter, JsonReporter, TextReporter},
runner::{BenchOpts, BenchSuite, Runner},
Expand Down Expand Up @@ -144,9 +145,9 @@ pub struct BenchCli {
}

impl BenchCli {
pub(crate) fn bench_opts(&self, start: Instant) -> BenchOpts {
pub(crate) fn bench_opts(&self, clock: Clock) -> BenchOpts {
BenchOpts {
start,
clock,
concurrency: self.concurrency,
iterations: self.iterations,
duration: self.duration.map(|d| d.into()),
Expand Down Expand Up @@ -193,8 +194,8 @@ where
let (pause_tx, pause_rx) = watch::channel(false);
let cancel = CancellationToken::new();

let opts = cli.bench_opts(Instant::now());
let runner = Runner::new(bench_suite, opts, res_tx, pause_rx, cancel.clone());
let opts = cli.bench_opts(Clock::start_at(Instant::now()));
let runner = Runner::new(bench_suite, opts.clone(), res_tx, pause_rx, cancel.clone());

let mut collector: Box<dyn ReportCollector> = match cli.collector() {
Collector::Tui => Box::new(TuiCollector::new(opts, cli.fps, res_rx, pause_tx, cancel)?),
Expand Down
99 changes: 99 additions & 0 deletions src/clock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::sync::Arc;

use parking_lot::Mutex;
use tokio::time::{self, Duration, Instant};

/// A logical clock that can be paused
#[derive(Debug, Clone, Default)]
pub struct Clock {
inner: Arc<Mutex<InnerClock>>,
}

#[derive(Debug, Clone, Default)]
pub(crate) struct InnerClock {
status: Status,
elapsed: Duration,
}

#[derive(Debug, Clone, Copy, Default)]
pub(crate) enum Status {
#[default]
Paused,
Running(Instant),
}

impl Clock {
pub fn start_at(instant: Instant) -> Self {
let inner = InnerClock {
status: Status::Running(instant),
elapsed: Duration::default(),
};
Self { inner: Arc::new(Mutex::new(inner)) }
}

pub fn resume(&mut self) {
let mut inner = self.inner.lock();
if let Status::Paused = inner.status {
inner.status = Status::Running(Instant::now());
}
}

pub fn pause(&mut self) {
let mut inner = self.inner.lock();
if let Status::Running(checkpoint) = inner.status {
inner.elapsed += checkpoint.elapsed();
inner.status = Status::Paused;
}
}

pub fn elapsed(&self) -> Duration {
let inner = self.inner.lock();
match inner.status {
Status::Paused => inner.elapsed,
Status::Running(checkpoint) => inner.elapsed + checkpoint.elapsed(),
}
}

pub async fn sleep(&self, mut duration: Duration) {
let wake_time = self.elapsed() + duration;
loop {
time::sleep(duration).await;
let elapsed = self.elapsed();
if elapsed >= wake_time {
break;
}
duration = wake_time - elapsed;
}
}

async fn sleep_until(&self, deadline: Duration) {
let now = self.elapsed();
if deadline <= now {
return;
}
self.sleep(deadline - now).await;
}

pub fn ticker(&self, duration: Duration) -> Ticker {
Ticker::new(self.clone(), duration)
}
}

/// A ticker that ticks at a fixed logical interval
#[derive(Debug, Clone)]
pub struct Ticker {
clock: Clock,
interval: Duration,
next_tick: Duration,
}

impl Ticker {
pub fn new(clock: Clock, duration: Duration) -> Self {
Self { clock, interval: duration, next_tick: duration }
}

pub async fn tick(&mut self) {
self.clock.sleep_until(self.next_tick).await;
self.next_tick += self.interval;
}
}
2 changes: 1 addition & 1 deletion src/collector/silent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl super::ReportCollector for SilentCollector {
}
}

let elapsed = self.bench_opts.start.elapsed();
let elapsed = self.bench_opts.clock.elapsed();
let concurrency = self.bench_opts.concurrency;
Ok(BenchReport { concurrency, hist, stats, status_dist, error_dist, elapsed })
}
Expand Down
42 changes: 25 additions & 17 deletions src/collector/tui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ use crate::{
util::{IntoAdjustedByte, TryIntoAdjustedByte},
};

const SECOND: Duration = Duration::from_secs(1);

/// A report collector with real-time TUI support.
pub struct TuiCollector {
/// The benchmark options.
Expand Down Expand Up @@ -127,16 +129,16 @@ impl ReportCollector for TuiCollector {
let mut current_tw = TimeWindow::Second;
let mut auto_tw = true;

let start = self.bench_opts.start;
let mut clock = self.bench_opts.clock.clone();

let mut latest_iters = RotateWindowGroup::new(60);
let mut latest_iters_ticker = clock.ticker(SECOND);

let mut latest_iters = RotateWindowGroup::new(start, 60);
const SECOND: Duration = Duration::from_secs(1);
let mut latest_iters_timer = tokio::time::interval_at(start + SECOND, SECOND);
latest_iters_timer.set_missed_tick_behavior(MissedTickBehavior::Burst);
let mut latest_stats = RotateDiffWindowGroup::new(self.fps);
let mut latest_stats_ticker = clock.ticker(SECOND / self.fps as u32);

let mut latest_stats = RotateDiffWindowGroup::new(start, self.fps);
let mut refresh_timer = tokio::time::interval(Duration::from_secs(1) / self.fps as u32);
refresh_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut ui_ticker = tokio::time::interval(SECOND / self.fps as u32);
ui_ticker.set_missed_tick_behavior(MissedTickBehavior::Burst);

#[cfg(feature = "log")]
let mut show_logs = false;
Expand All @@ -146,9 +148,7 @@ impl ReportCollector for TuiCollector {
loop {
tokio::select! {
biased;
t = refresh_timer.tick() => {
latest_stats.rotate(t, &stats);

_ = ui_ticker.tick() => {
while crossterm::event::poll(Duration::from_secs(0))? {
use KeyCode::*;
if let Event::Key(KeyEvent { code, modifiers, .. }) = crossterm::event::read()? {
Expand All @@ -167,8 +167,12 @@ impl ReportCollector for TuiCollector {
break 'outer;
}
(Char('p') | Pause, _) => {
// TODO: pause logical time instead of real time
let pause = !*self.pause.borrow();
if pause {
clock.pause();
} else {
clock.resume();
}
self.pause.send_replace(pause);
}
#[cfg(feature = "log")]
Expand All @@ -195,16 +199,20 @@ impl ReportCollector for TuiCollector {
}
}

elapsed = t - start;
current_tw = if auto_tw && !*self.pause.borrow() {
elapsed = clock.elapsed();
current_tw = if auto_tw {
*TimeWindow::variants().iter().rfind(|&&ts| elapsed > ts.into()).unwrap_or(&TimeWindow::Second)
} else {
current_tw
};
break;
}
t = latest_iters_timer.tick() => {
latest_iters.rotate(t);
_ = latest_stats_ticker.tick() => {
latest_stats.rotate(&stats);
continue;
}
_ = latest_iters_ticker.tick() => {
latest_iters.rotate();
continue;
}
r = self.res_rx.recv() => match r {
Expand Down Expand Up @@ -271,7 +279,7 @@ impl ReportCollector for TuiCollector {
})?;
}

let elapsed = start.elapsed();
let elapsed = clock.elapsed();
let concurrency = self.bench_opts.concurrency;
Ok(BenchReport { concurrency, hist, stats, status_dist, error_dist, elapsed })
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
//! Stateful bench is also supported, see the [examples/http_reqwest](https://github.com/wfxr/rlt/blob/main/examples/http_reqwest.rs).
#![deny(missing_docs)]

mod clock;
mod duration;
mod histogram;
mod report;
Expand Down
26 changes: 10 additions & 16 deletions src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ use tokio::{
select,
sync::{mpsc, watch},
task::JoinSet,
time::{sleep_until, Instant, MissedTickBehavior},
time::MissedTickBehavior,
};
use tokio_util::sync::CancellationToken;

use crate::report::IterReport;
use crate::{clock::Clock, report::IterReport};

/// Core options for the benchmark runner.
#[derive(Copy, Clone, Debug)]
#[derive(Clone, Debug)]
pub struct BenchOpts {
/// Start time of the benchmark.
pub start: Instant,
pub clock: Clock,

/// Number of concurrent workers.
pub concurrency: u32,
Expand All @@ -37,12 +37,6 @@ pub struct BenchOpts {
pub rate: Option<u32>,
}

impl BenchOpts {
pub(crate) fn endtime(&self) -> Option<Instant> {
self.duration.map(|d| self.start + d)
}
}

/// A trait for benchmark suites.
#[async_trait]
pub trait BenchSuite: Clone {
Expand Down Expand Up @@ -164,7 +158,6 @@ where
async fn bench(self) -> Result<()> {
let concurrency = self.opts.concurrency;
let iterations = self.opts.iterations;
let endtime = self.opts.endtime();

let mut set: JoinSet<Result<()>> = JoinSet::new();
for worker in 0..concurrency {
Expand Down Expand Up @@ -194,10 +187,10 @@ where
});
}

if let Some(t) = endtime {
if let Some(t) = self.opts.duration {
select! {
_ = self.cancel.cancelled() => (),
_ = sleep_until(t) => self.cancel.cancel(),
_ = self.opts.clock.sleep(t) => self.cancel.cancel(),
_ = join_all(&mut set) => (),
}
};
Expand All @@ -209,7 +202,8 @@ where
async fn bench_with_rate(self, rate: u32) -> Result<()> {
let concurrency = self.opts.concurrency;
let iterations = self.opts.iterations;
let endtime = self.opts.endtime();
let clock = self.opts.clock.clone();
let duration = self.opts.duration;
let (tx, rx) = flume::bounded(self.opts.concurrency as usize);

let b = self.clone();
Expand All @@ -218,14 +212,14 @@ where
timer.set_missed_tick_behavior(MissedTickBehavior::Burst);
let mut iter = 0;
loop {
let t = timer.tick().await;
timer.tick().await;
if b.paused() {
match b.cancel.is_cancelled() {
false => continue,
true => break,
}
}
if matches!(endtime, Some(endtime) if t >= endtime) {
if matches!(duration, Some(duration) if clock.elapsed() >= duration) {
break;
}
if matches!(iterations, Some(iterations) if iter >= iterations) {
Expand Down
Loading
Loading