Skip to content

Commit

Permalink
Merge 60b1a5a into b439dc9
Browse files Browse the repository at this point in the history
  • Loading branch information
xnuter committed Mar 21, 2022
2 parents b439dc9 + 60b1a5a commit 812db6c
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "perf-gauge"
version = "0.1.13"
version = "0.1.14"
authors = ["Eugene Retunsky"]
license = "MIT OR Apache-2.0"
edition = "2018"
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ OPTIONS:
--rate_step <RATE_STEP>
Rate increase step (until it reaches --rate_max)
--request_timeout <REQUEST_TIMEOUT>
Timeout of a single request. E.g. "--request_timeout 30s". Timeouts are treated as fatal
errors
-V, --version
Print version information
Expand Down Expand Up @@ -185,6 +189,7 @@ E.g. increase RPS each minute by 1,000:
export PROMETHEUS_HOST=10.138.0.2

$ perf-gauge --concurrency 10 \
--request_timeout 30s \
--rate 1000 --rate_step 1000 --rate_max 25000 \
--max_iter 15 \
--duration 1m \
Expand All @@ -194,6 +199,7 @@ $ perf-gauge --concurrency 10 \
```

* `--concurrency 10` - the number of clients generating load concurrently
* `--request_timeout 30s` - do not wait for response longer than 30 seconds and stop execution on timeouts.
* `--rate 1000 --rate_step 1000 --rate_max 25000` - start with rate 1000 rps, then add 1000 rps after each step until it reaches 25k.
* `--duration 1m` - step duration `1m`
* `--max_iter 15` - perform `15` iterations at the max rate
Expand Down
119 changes: 107 additions & 12 deletions src/bench_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@ use crate::rate_limiter::RateLimiter;
/// except according to those terms.
use async_trait::async_trait;
use log::error;
use std::future::Future;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::mpsc::Sender;
use tokio::time::timeout;

static STOP_ON_FATAL: AtomicBool = AtomicBool::new(false);

#[derive(Clone, Debug)]
pub struct BenchRun {
pub index: usize,
bench_begin: Instant,
timeout: Option<Duration>,
requests_sent: usize,
max_requests: Option<usize>,
max_duration: Option<Duration>,
Expand All @@ -38,23 +41,26 @@ impl BenchRun {
index: usize,
max_requests: usize,
rate_limiter: RateLimiter,
timeout: Option<Duration>,
) -> Self {
Self::new(index, Some(max_requests), None, rate_limiter)
Self::new(index, Some(max_requests), None, rate_limiter, timeout)
}

pub fn from_duration_limit(
index: usize,
max_duration: Duration,
rate_limiter: RateLimiter,
timeout: Option<Duration>,
) -> Self {
Self::new(index, None, Some(max_duration), rate_limiter)
Self::new(index, None, Some(max_duration), rate_limiter, timeout)
}

fn new(
index: usize,
max_requests: Option<usize>,
max_duration: Option<Duration>,
rate_limiter: RateLimiter,
timeout: Option<Duration>,
) -> Self {
assert!(
max_duration.is_some() || max_requests.is_some(),
Expand All @@ -64,6 +70,7 @@ impl BenchRun {
Self {
index,
bench_begin: Instant::now(),
timeout,
requests_sent: 0,
max_requests,
max_duration,
Expand Down Expand Up @@ -102,15 +109,23 @@ impl BenchRun {
break;
}

let request_stats = bench_protocol_adapter.send_request(&client).await;
let fatal_error = request_stats.fatal_error;

metrics_channel
.try_send(request_stats)
.map_err(|e| {
error!("Error sending metrics: {}", e);
})
.unwrap_or_default();
let timed_request = self
.timed_operation(bench_protocol_adapter.send_request(&client))
.await;

let fatal_error = match timed_request {
Ok(request_stats) => {
let failed = request_stats.fatal_error;
metrics_channel
.try_send(request_stats)
.map_err(|e| {
error!("Error sending metrics: {}", e);
})
.unwrap_or_default();
failed
}
Err(_) => true,
};

if fatal_error {
STOP_ON_FATAL.store(true, Ordering::Relaxed);
Expand All @@ -120,6 +135,23 @@ impl BenchRun {

Ok(())
}

/// Each async operation must be time-bound.
pub async fn timed_operation<T: Future>(&self, f: T) -> Result<<T as Future>::Output, ()> {
if let Some(timeout_value) = self.timeout {
let result = timeout(timeout_value, f).await;

if let Ok(r) = result {
println!("No timeout: {:?}", timeout_value);
Ok(r)
} else {
println!("Timeout: {:?}", timeout_value);
Err(())
}
} else {
return Ok(f.await);
}
}
}

#[cfg(test)]
Expand All @@ -134,7 +166,8 @@ mod tests {
use crate::metrics::BenchRunMetrics;
use mockito::mock;
use std::sync::atomic::Ordering;
use std::time::Instant;
use std::thread::sleep;
use std::time::{Duration, Instant};

#[tokio::test]
async fn test_send_load() {
Expand Down Expand Up @@ -179,6 +212,7 @@ mod tests {
.expect("RateLadderBuilder failed"),
)
.mode(Http(http_adapter.clone()))
.request_timeout(None)
.build()
.expect("BenchmarkConfig failed");

Expand Down Expand Up @@ -259,6 +293,7 @@ mod tests {
.build()
.expect("RateLadderBuilder failed"),
)
.request_timeout(None)
.mode(Http(http_adapter.clone()))
.build()
.expect("BenchmarkConfig failed");
Expand All @@ -279,4 +314,64 @@ mod tests {
assert!(STOP_ON_FATAL.load(Ordering::Relaxed));
assert!(bench_result.is_ok());
}

#[tokio::test]
async fn test_send_load_with_timeout() {
let request_count = 100;

let _m = mock("GET", "/1")
.with_status(200)
.with_body_from_fn(|_| {
sleep(Duration::from_secs(10));
Ok(())
})
.with_header("content-type", "text/plain")
.expect(request_count)
.create();

let url = mockito::server_url().to_string();
println!("Url: {}", url);
let http_adapter = HttpBenchAdapterBuilder::default()
.request(
HttpRequestBuilder::default()
.url(vec![format!("{}/1", url)])
.build()
.unwrap(),
)
.config(HttpClientConfigBuilder::default().build().unwrap())
.build()
.unwrap();

let benchmark_config: BenchmarkConfig = BenchmarkConfigBuilder::default()
.rate_ladder(
RateLadderBuilder::default()
.start(request_count as f64)
.end(request_count as f64)
.rate_increment(None)
.step_duration(None)
.step_requests(Some(request_count))
.build()
.expect("RateLadderBuilder failed"),
)
.request_timeout(Some(Duration::from_millis(10)))
.mode(Http(http_adapter.clone()))
.build()
.expect("BenchmarkConfig failed");

let mut session = benchmark_config.clone().new_bench_session();

let bench_run_stats = BenchRunMetrics::new();

STOP_ON_FATAL.store(false, Ordering::Relaxed);

let bench_result = session
.next()
.expect("Must have runs")
.run(bench_run_stats)
.await;

// must stop on timeout, treated as fatal
assert!(STOP_ON_FATAL.load(Ordering::Relaxed));
assert!(bench_result.is_ok());
}
}
3 changes: 3 additions & 0 deletions src/bench_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct BenchSession {
mode: Arc<BenchmarkMode>,
#[builder(setter(skip))]
current_iteration: usize,
request_timeout: Option<Duration>,
}

pub struct BenchBatch {
Expand Down Expand Up @@ -68,12 +69,14 @@ impl Iterator for BenchSession {
idx,
requests,
RateLimiter::build_rate_limiter(rate_per_second),
self.request_timeout,
)
} else if let Some(duration) = self.rate_ladder.step_duration {
BenchRun::from_duration_limit(
idx,
duration,
RateLimiter::build_rate_limiter(rate_per_second),
self.request_timeout,
)
} else {
unreachable!();
Expand Down
13 changes: 13 additions & 0 deletions src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::fs::File;
use std::io::Read;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::io;

#[derive(Clone)]
Expand All @@ -39,6 +40,7 @@ pub struct BenchmarkConfig {
pub concurrency: usize,
pub rate_ladder: RateLadder,
pub mode: BenchmarkMode,
request_timeout: Option<Duration>,
#[builder(default)]
pub reporters: Vec<Arc<dyn ExternalMetricsServiceReporter + Send + Sync + 'static>>,
}
Expand Down Expand Up @@ -74,6 +76,9 @@ struct Cli {
/// If it's a part of a continuous run. In this case metrics are not reset at the end to avoid saw-like plots.
#[clap(long)]
continuous: bool,
/// Timeout of a single request. E.g. "--request_timeout 30s". Timeouts are treated as fatal errors.
#[clap(long = "request_timeout")]
request_timeout: Option<String>,
/// If you'd like to send metrics to Prometheus PushGateway, specify the server URL. E.g. 10.0.0.1:9091
#[clap(long)]
prometheus: Option<String>,
Expand Down Expand Up @@ -137,6 +142,12 @@ impl BenchmarkConfig {
.into()
});

let request_timeout = cli.request_timeout.as_ref().map(|d| {
humantime::Duration::from_str(d.as_str())
.expect("Illegal duration")
.into()
});

let number_of_requests = cli.num_req;

if duration.is_none() && number_of_requests.is_none() {
Expand Down Expand Up @@ -175,6 +186,7 @@ impl BenchmarkConfig {
.concurrency(concurrency)
.verbose(false)
.continuous(cli.continuous)
.request_timeout(request_timeout)
.mode(BenchmarkConfig::build_mode(&cli))
.reporters(BenchmarkConfig::build_metric_destinations(
cli.name.clone(),
Expand Down Expand Up @@ -330,6 +342,7 @@ impl BenchmarkConfig {
.concurrency(self.concurrency)
.rate_ladder(self.rate_ladder.clone())
.mode(Arc::new(self.mode.clone()))
.request_timeout(self.request_timeout)
.build()
.expect("BenchSessionBuilder failed")
}
Expand Down
4 changes: 2 additions & 2 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ mod tests {
fn test_has_more_work_request_limit() {
let requests = 10;
let mut metrics =
BenchRun::from_request_limit(0, requests, RateLimiter::build_rate_limiter(0.));
BenchRun::from_request_limit(0, requests, RateLimiter::build_rate_limiter(0.), None);
for _ in 0..requests {
assert!(metrics.has_more_work());
}
Expand All @@ -526,7 +526,7 @@ mod tests {
fn test_has_more_work_time_limit() {
let duration = Duration::from_secs(1);
let mut metrics =
BenchRun::from_duration_limit(0, duration, RateLimiter::build_rate_limiter(0.));
BenchRun::from_duration_limit(0, duration, RateLimiter::build_rate_limiter(0.), None);
for _ in 0..1000 {
assert!(metrics.has_more_work());
}
Expand Down

0 comments on commit 812db6c

Please sign in to comment.