diff --git a/Cargo.toml b/Cargo.toml index 9f0ea72..4ca0e03 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "perf-gauge" -version = "0.1.13" +version = "0.1.14" authors = ["Eugene Retunsky"] license = "MIT OR Apache-2.0" edition = "2018" diff --git a/README.md b/README.md index 0f3792d..db841d7 100644 --- a/README.md +++ b/README.md @@ -118,6 +118,10 @@ OPTIONS: --rate_step Rate increase step (until it reaches --rate_max) + + --request_timeout + Timeout of a single request. E.g. "--request_timeout 30s". Timeouts are treated as fatal + errors -V, --version Print version information @@ -185,6 +189,7 @@ E.g. increase RPS each minute by 1,000: export PROMETHEUS_HOST=10.138.0.2 $ perf-gauge --concurrency 10 \ + --request_timeout 30s \ --rate 1000 --rate_step 1000 --rate_max 25000 \ --max_iter 15 \ --duration 1m \ @@ -194,6 +199,7 @@ $ perf-gauge --concurrency 10 \ ``` * `--concurrency 10` - the number of clients generating load concurrently +* `--request_timeout 30s` - do not wait for response longer than 30 seconds and stop execution on timeouts. * `--rate 1000 --rate_step 1000 --rate_max 25000` - start with rate 1000 rps, then add 1000 rps after each step until it reaches 25k. * `--duration 1m` - step duration `1m` * `--max_iter 15` - perform `15` iterations at the max rate diff --git a/src/bench_run.rs b/src/bench_run.rs index e4c48c9..e75dc9b 100644 --- a/src/bench_run.rs +++ b/src/bench_run.rs @@ -9,9 +9,11 @@ use crate::rate_limiter::RateLimiter; /// except according to those terms. use async_trait::async_trait; use log::error; +use std::future::Future; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Duration, Instant}; use tokio::sync::mpsc::Sender; +use tokio::time::timeout; static STOP_ON_FATAL: AtomicBool = AtomicBool::new(false); @@ -19,6 +21,7 @@ static STOP_ON_FATAL: AtomicBool = AtomicBool::new(false); pub struct BenchRun { pub index: usize, bench_begin: Instant, + timeout: Option, requests_sent: usize, max_requests: Option, max_duration: Option, @@ -38,16 +41,18 @@ impl BenchRun { index: usize, max_requests: usize, rate_limiter: RateLimiter, + timeout: Option, ) -> Self { - Self::new(index, Some(max_requests), None, rate_limiter) + Self::new(index, Some(max_requests), None, rate_limiter, timeout) } pub fn from_duration_limit( index: usize, max_duration: Duration, rate_limiter: RateLimiter, + timeout: Option, ) -> Self { - Self::new(index, None, Some(max_duration), rate_limiter) + Self::new(index, None, Some(max_duration), rate_limiter, timeout) } fn new( @@ -55,6 +60,7 @@ impl BenchRun { max_requests: Option, max_duration: Option, rate_limiter: RateLimiter, + timeout: Option, ) -> Self { assert!( max_duration.is_some() || max_requests.is_some(), @@ -64,6 +70,7 @@ impl BenchRun { Self { index, bench_begin: Instant::now(), + timeout, requests_sent: 0, max_requests, max_duration, @@ -102,15 +109,23 @@ impl BenchRun { break; } - let request_stats = bench_protocol_adapter.send_request(&client).await; - let fatal_error = request_stats.fatal_error; - - metrics_channel - .try_send(request_stats) - .map_err(|e| { - error!("Error sending metrics: {}", e); - }) - .unwrap_or_default(); + let timed_request = self + .timed_operation(bench_protocol_adapter.send_request(&client)) + .await; + + let fatal_error = match timed_request { + Ok(request_stats) => { + let failed = request_stats.fatal_error; + metrics_channel + .try_send(request_stats) + .map_err(|e| { + error!("Error sending metrics: {}", e); + }) + .unwrap_or_default(); + failed + } + Err(_) => true, + }; if fatal_error { STOP_ON_FATAL.store(true, Ordering::Relaxed); @@ -120,6 +135,23 @@ impl BenchRun { Ok(()) } + + /// Each async operation must be time-bound. + pub async fn timed_operation(&self, f: T) -> Result<::Output, ()> { + if let Some(timeout_value) = self.timeout { + let result = timeout(timeout_value, f).await; + + if let Ok(r) = result { + println!("No timeout: {:?}", timeout_value); + Ok(r) + } else { + println!("Timeout: {:?}", timeout_value); + Err(()) + } + } else { + return Ok(f.await); + } + } } #[cfg(test)] @@ -134,7 +166,8 @@ mod tests { use crate::metrics::BenchRunMetrics; use mockito::mock; use std::sync::atomic::Ordering; - use std::time::Instant; + use std::thread::sleep; + use std::time::{Duration, Instant}; #[tokio::test] async fn test_send_load() { @@ -179,6 +212,7 @@ mod tests { .expect("RateLadderBuilder failed"), ) .mode(Http(http_adapter.clone())) + .request_timeout(None) .build() .expect("BenchmarkConfig failed"); @@ -259,6 +293,7 @@ mod tests { .build() .expect("RateLadderBuilder failed"), ) + .request_timeout(None) .mode(Http(http_adapter.clone())) .build() .expect("BenchmarkConfig failed"); @@ -279,4 +314,64 @@ mod tests { assert!(STOP_ON_FATAL.load(Ordering::Relaxed)); assert!(bench_result.is_ok()); } + + #[tokio::test] + async fn test_send_load_with_timeout() { + let request_count = 100; + + let _m = mock("GET", "/1") + .with_status(200) + .with_body_from_fn(|_| { + sleep(Duration::from_secs(10)); + Ok(()) + }) + .with_header("content-type", "text/plain") + .expect(request_count) + .create(); + + let url = mockito::server_url().to_string(); + println!("Url: {}", url); + let http_adapter = HttpBenchAdapterBuilder::default() + .request( + HttpRequestBuilder::default() + .url(vec![format!("{}/1", url)]) + .build() + .unwrap(), + ) + .config(HttpClientConfigBuilder::default().build().unwrap()) + .build() + .unwrap(); + + let benchmark_config: BenchmarkConfig = BenchmarkConfigBuilder::default() + .rate_ladder( + RateLadderBuilder::default() + .start(request_count as f64) + .end(request_count as f64) + .rate_increment(None) + .step_duration(None) + .step_requests(Some(request_count)) + .build() + .expect("RateLadderBuilder failed"), + ) + .request_timeout(Some(Duration::from_millis(10))) + .mode(Http(http_adapter.clone())) + .build() + .expect("BenchmarkConfig failed"); + + let mut session = benchmark_config.clone().new_bench_session(); + + let bench_run_stats = BenchRunMetrics::new(); + + STOP_ON_FATAL.store(false, Ordering::Relaxed); + + let bench_result = session + .next() + .expect("Must have runs") + .run(bench_run_stats) + .await; + + // must stop on timeout, treated as fatal + assert!(STOP_ON_FATAL.load(Ordering::Relaxed)); + assert!(bench_result.is_ok()); + } } diff --git a/src/bench_session.rs b/src/bench_session.rs index 317f217..7d6852c 100644 --- a/src/bench_session.rs +++ b/src/bench_session.rs @@ -25,6 +25,7 @@ pub struct BenchSession { mode: Arc, #[builder(setter(skip))] current_iteration: usize, + request_timeout: Option, } pub struct BenchBatch { @@ -68,12 +69,14 @@ impl Iterator for BenchSession { idx, requests, RateLimiter::build_rate_limiter(rate_per_second), + self.request_timeout, ) } else if let Some(duration) = self.rate_ladder.step_duration { BenchRun::from_duration_limit( idx, duration, RateLimiter::build_rate_limiter(rate_per_second), + self.request_timeout, ) } else { unreachable!(); diff --git a/src/configuration.rs b/src/configuration.rs index 5627bb2..67ca984 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -20,6 +20,7 @@ use std::fs::File; use std::io::Read; use std::str::FromStr; use std::sync::Arc; +use std::time::Duration; use tokio::io; #[derive(Clone)] @@ -39,6 +40,7 @@ pub struct BenchmarkConfig { pub concurrency: usize, pub rate_ladder: RateLadder, pub mode: BenchmarkMode, + request_timeout: Option, #[builder(default)] pub reporters: Vec>, } @@ -74,6 +76,9 @@ struct Cli { /// If it's a part of a continuous run. In this case metrics are not reset at the end to avoid saw-like plots. #[clap(long)] continuous: bool, + /// Timeout of a single request. E.g. "--request_timeout 30s". Timeouts are treated as fatal errors. + #[clap(long = "request_timeout")] + request_timeout: Option, /// If you'd like to send metrics to Prometheus PushGateway, specify the server URL. E.g. 10.0.0.1:9091 #[clap(long)] prometheus: Option, @@ -137,6 +142,12 @@ impl BenchmarkConfig { .into() }); + let request_timeout = cli.request_timeout.as_ref().map(|d| { + humantime::Duration::from_str(d.as_str()) + .expect("Illegal duration") + .into() + }); + let number_of_requests = cli.num_req; if duration.is_none() && number_of_requests.is_none() { @@ -175,6 +186,7 @@ impl BenchmarkConfig { .concurrency(concurrency) .verbose(false) .continuous(cli.continuous) + .request_timeout(request_timeout) .mode(BenchmarkConfig::build_mode(&cli)) .reporters(BenchmarkConfig::build_metric_destinations( cli.name.clone(), @@ -330,6 +342,7 @@ impl BenchmarkConfig { .concurrency(self.concurrency) .rate_ladder(self.rate_ladder.clone()) .mode(Arc::new(self.mode.clone())) + .request_timeout(self.request_timeout) .build() .expect("BenchSessionBuilder failed") } diff --git a/src/metrics.rs b/src/metrics.rs index 0f98a53..f11b2a2 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -515,7 +515,7 @@ mod tests { fn test_has_more_work_request_limit() { let requests = 10; let mut metrics = - BenchRun::from_request_limit(0, requests, RateLimiter::build_rate_limiter(0.)); + BenchRun::from_request_limit(0, requests, RateLimiter::build_rate_limiter(0.), None); for _ in 0..requests { assert!(metrics.has_more_work()); } @@ -526,7 +526,7 @@ mod tests { fn test_has_more_work_time_limit() { let duration = Duration::from_secs(1); let mut metrics = - BenchRun::from_duration_limit(0, duration, RateLimiter::build_rate_limiter(0.)); + BenchRun::from_duration_limit(0, duration, RateLimiter::build_rate_limiter(0.), None); for _ in 0..1000 { assert!(metrics.has_more_work()); }