Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request timeouts. #15

Merged
merged 1 commit into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "perf-gauge"
version = "0.1.13"
version = "0.1.14"
authors = ["Eugene Retunsky"]
license = "MIT OR Apache-2.0"
edition = "2018"
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ OPTIONS:

--rate_step <RATE_STEP>
Rate increase step (until it reaches --rate_max)

--request_timeout <REQUEST_TIMEOUT>
Timeout of a single request. E.g. "--request_timeout 30s". Timeouts are treated as fatal
errors

-V, --version
Print version information
Expand Down Expand Up @@ -185,6 +189,7 @@ E.g. increase RPS each minute by 1,000:
export PROMETHEUS_HOST=10.138.0.2

$ perf-gauge --concurrency 10 \
--request_timeout 30s \
--rate 1000 --rate_step 1000 --rate_max 25000 \
--max_iter 15 \
--duration 1m \
Expand All @@ -194,6 +199,7 @@ $ perf-gauge --concurrency 10 \
```

* `--concurrency 10` - the number of clients generating load concurrently
* `--request_timeout 30s` - do not wait for response longer than 30 seconds and stop execution on timeouts.
* `--rate 1000 --rate_step 1000 --rate_max 25000` - start with rate 1000 rps, then add 1000 rps after each step until it reaches 25k.
* `--duration 1m` - step duration `1m`
* `--max_iter 15` - perform `15` iterations at the max rate
Expand Down
119 changes: 107 additions & 12 deletions src/bench_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@ use crate::rate_limiter::RateLimiter;
/// except according to those terms.
use async_trait::async_trait;
use log::error;
use std::future::Future;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::mpsc::Sender;
use tokio::time::timeout;

static STOP_ON_FATAL: AtomicBool = AtomicBool::new(false);

#[derive(Clone, Debug)]
pub struct BenchRun {
pub index: usize,
bench_begin: Instant,
timeout: Option<Duration>,
requests_sent: usize,
max_requests: Option<usize>,
max_duration: Option<Duration>,
Expand All @@ -38,23 +41,26 @@ impl BenchRun {
index: usize,
max_requests: usize,
rate_limiter: RateLimiter,
timeout: Option<Duration>,
) -> Self {
Self::new(index, Some(max_requests), None, rate_limiter)
Self::new(index, Some(max_requests), None, rate_limiter, timeout)
}

pub fn from_duration_limit(
index: usize,
max_duration: Duration,
rate_limiter: RateLimiter,
timeout: Option<Duration>,
) -> Self {
Self::new(index, None, Some(max_duration), rate_limiter)
Self::new(index, None, Some(max_duration), rate_limiter, timeout)
}

fn new(
index: usize,
max_requests: Option<usize>,
max_duration: Option<Duration>,
rate_limiter: RateLimiter,
timeout: Option<Duration>,
) -> Self {
assert!(
max_duration.is_some() || max_requests.is_some(),
Expand All @@ -64,6 +70,7 @@ impl BenchRun {
Self {
index,
bench_begin: Instant::now(),
timeout,
requests_sent: 0,
max_requests,
max_duration,
Expand Down Expand Up @@ -102,15 +109,23 @@ impl BenchRun {
break;
}

let request_stats = bench_protocol_adapter.send_request(&client).await;
let fatal_error = request_stats.fatal_error;

metrics_channel
.try_send(request_stats)
.map_err(|e| {
error!("Error sending metrics: {}", e);
})
.unwrap_or_default();
let timed_request = self
.timed_operation(bench_protocol_adapter.send_request(&client))
.await;

let fatal_error = match timed_request {
Ok(request_stats) => {
let failed = request_stats.fatal_error;
metrics_channel
.try_send(request_stats)
.map_err(|e| {
error!("Error sending metrics: {}", e);
})
.unwrap_or_default();
failed
}
Err(_) => true,
};

if fatal_error {
STOP_ON_FATAL.store(true, Ordering::Relaxed);
Expand All @@ -120,6 +135,23 @@ impl BenchRun {

Ok(())
}

/// Each async operation must be time-bound.
pub async fn timed_operation<T: Future>(&self, f: T) -> Result<<T as Future>::Output, ()> {
if let Some(timeout_value) = self.timeout {
let result = timeout(timeout_value, f).await;

if let Ok(r) = result {
println!("No timeout: {:?}", timeout_value);
Ok(r)
} else {
println!("Timeout: {:?}", timeout_value);
Err(())
}
} else {
return Ok(f.await);
}
}
}

#[cfg(test)]
Expand All @@ -134,7 +166,8 @@ mod tests {
use crate::metrics::BenchRunMetrics;
use mockito::mock;
use std::sync::atomic::Ordering;
use std::time::Instant;
use std::thread::sleep;
use std::time::{Duration, Instant};

#[tokio::test]
async fn test_send_load() {
Expand Down Expand Up @@ -179,6 +212,7 @@ mod tests {
.expect("RateLadderBuilder failed"),
)
.mode(Http(http_adapter.clone()))
.request_timeout(None)
.build()
.expect("BenchmarkConfig failed");

Expand Down Expand Up @@ -259,6 +293,7 @@ mod tests {
.build()
.expect("RateLadderBuilder failed"),
)
.request_timeout(None)
.mode(Http(http_adapter.clone()))
.build()
.expect("BenchmarkConfig failed");
Expand All @@ -279,4 +314,64 @@ mod tests {
assert!(STOP_ON_FATAL.load(Ordering::Relaxed));
assert!(bench_result.is_ok());
}

#[tokio::test]
async fn test_send_load_with_timeout() {
let request_count = 100;

let _m = mock("GET", "/1")
.with_status(200)
.with_body_from_fn(|_| {
sleep(Duration::from_secs(10));
Ok(())
})
.with_header("content-type", "text/plain")
.expect(request_count)
.create();

let url = mockito::server_url().to_string();
println!("Url: {}", url);
let http_adapter = HttpBenchAdapterBuilder::default()
.request(
HttpRequestBuilder::default()
.url(vec![format!("{}/1", url)])
.build()
.unwrap(),
)
.config(HttpClientConfigBuilder::default().build().unwrap())
.build()
.unwrap();

let benchmark_config: BenchmarkConfig = BenchmarkConfigBuilder::default()
.rate_ladder(
RateLadderBuilder::default()
.start(request_count as f64)
.end(request_count as f64)
.rate_increment(None)
.step_duration(None)
.step_requests(Some(request_count))
.build()
.expect("RateLadderBuilder failed"),
)
.request_timeout(Some(Duration::from_millis(10)))
.mode(Http(http_adapter.clone()))
.build()
.expect("BenchmarkConfig failed");

let mut session = benchmark_config.clone().new_bench_session();

let bench_run_stats = BenchRunMetrics::new();

STOP_ON_FATAL.store(false, Ordering::Relaxed);

let bench_result = session
.next()
.expect("Must have runs")
.run(bench_run_stats)
.await;

// must stop on timeout, treated as fatal
assert!(STOP_ON_FATAL.load(Ordering::Relaxed));
assert!(bench_result.is_ok());
}
}
3 changes: 3 additions & 0 deletions src/bench_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct BenchSession {
mode: Arc<BenchmarkMode>,
#[builder(setter(skip))]
current_iteration: usize,
request_timeout: Option<Duration>,
}

pub struct BenchBatch {
Expand Down Expand Up @@ -68,12 +69,14 @@ impl Iterator for BenchSession {
idx,
requests,
RateLimiter::build_rate_limiter(rate_per_second),
self.request_timeout,
)
} else if let Some(duration) = self.rate_ladder.step_duration {
BenchRun::from_duration_limit(
idx,
duration,
RateLimiter::build_rate_limiter(rate_per_second),
self.request_timeout,
)
} else {
unreachable!();
Expand Down
13 changes: 13 additions & 0 deletions src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::fs::File;
use std::io::Read;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::io;

#[derive(Clone)]
Expand All @@ -39,6 +40,7 @@ pub struct BenchmarkConfig {
pub concurrency: usize,
pub rate_ladder: RateLadder,
pub mode: BenchmarkMode,
request_timeout: Option<Duration>,
#[builder(default)]
pub reporters: Vec<Arc<dyn ExternalMetricsServiceReporter + Send + Sync + 'static>>,
}
Expand Down Expand Up @@ -74,6 +76,9 @@ struct Cli {
/// If it's a part of a continuous run. In this case metrics are not reset at the end to avoid saw-like plots.
#[clap(long)]
continuous: bool,
/// Timeout of a single request. E.g. "--request_timeout 30s". Timeouts are treated as fatal errors.
#[clap(long = "request_timeout")]
request_timeout: Option<String>,
/// If you'd like to send metrics to Prometheus PushGateway, specify the server URL. E.g. 10.0.0.1:9091
#[clap(long)]
prometheus: Option<String>,
Expand Down Expand Up @@ -137,6 +142,12 @@ impl BenchmarkConfig {
.into()
});

let request_timeout = cli.request_timeout.as_ref().map(|d| {
humantime::Duration::from_str(d.as_str())
.expect("Illegal duration")
.into()
});

let number_of_requests = cli.num_req;

if duration.is_none() && number_of_requests.is_none() {
Expand Down Expand Up @@ -175,6 +186,7 @@ impl BenchmarkConfig {
.concurrency(concurrency)
.verbose(false)
.continuous(cli.continuous)
.request_timeout(request_timeout)
.mode(BenchmarkConfig::build_mode(&cli))
.reporters(BenchmarkConfig::build_metric_destinations(
cli.name.clone(),
Expand Down Expand Up @@ -330,6 +342,7 @@ impl BenchmarkConfig {
.concurrency(self.concurrency)
.rate_ladder(self.rate_ladder.clone())
.mode(Arc::new(self.mode.clone()))
.request_timeout(self.request_timeout)
.build()
.expect("BenchSessionBuilder failed")
}
Expand Down
4 changes: 2 additions & 2 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ mod tests {
fn test_has_more_work_request_limit() {
let requests = 10;
let mut metrics =
BenchRun::from_request_limit(0, requests, RateLimiter::build_rate_limiter(0.));
BenchRun::from_request_limit(0, requests, RateLimiter::build_rate_limiter(0.), None);
for _ in 0..requests {
assert!(metrics.has_more_work());
}
Expand All @@ -526,7 +526,7 @@ mod tests {
fn test_has_more_work_time_limit() {
let duration = Duration::from_secs(1);
let mut metrics =
BenchRun::from_duration_limit(0, duration, RateLimiter::build_rate_limiter(0.));
BenchRun::from_duration_limit(0, duration, RateLimiter::build_rate_limiter(0.), None);
for _ in 0..1000 {
assert!(metrics.has_more_work());
}
Expand Down