diff --git a/Cargo.toml b/Cargo.toml index 9846db9..aefca74 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ Gauging performance of network services. Snapshot or continuous, supports Promet # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -clap = "2.33" +clap = { version = "3.1.6", features = ["derive"] } base64 = "0.13" derive_builder = "0.9" log = "0.4" diff --git a/src/bench_run.rs b/src/bench_run.rs index 933c393..e4c48c9 100644 --- a/src/bench_run.rs +++ b/src/bench_run.rs @@ -9,9 +9,12 @@ use crate::rate_limiter::RateLimiter; /// except according to those terms. use async_trait::async_trait; use log::error; +use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Duration, Instant}; use tokio::sync::mpsc::Sender; +static STOP_ON_FATAL: AtomicBool = AtomicBool::new(false); + #[derive(Clone, Debug)] pub struct BenchRun { pub index: usize, @@ -95,7 +98,12 @@ impl BenchRun { .await .expect("Unexpected LeakyBucket.acquire error"); + if STOP_ON_FATAL.load(Ordering::Relaxed) { + break; + } + let request_stats = bench_protocol_adapter.send_request(&client).await; + let fatal_error = request_stats.fatal_error; metrics_channel .try_send(request_stats) @@ -103,6 +111,11 @@ impl BenchRun { error!("Error sending metrics: {}", e); }) .unwrap_or_default(); + + if fatal_error { + STOP_ON_FATAL.store(true, Ordering::Relaxed); + break; + } } Ok(()) @@ -111,6 +124,7 @@ impl BenchRun { #[cfg(test)] mod tests { + use crate::bench_run::STOP_ON_FATAL; use crate::bench_session::RateLadderBuilder; use crate::configuration::BenchmarkMode::Http; use crate::configuration::{BenchmarkConfig, BenchmarkConfigBuilder}; @@ -119,6 +133,7 @@ mod tests { }; use crate::metrics::BenchRunMetrics; use mockito::mock; + use std::sync::atomic::Ordering; use std::time::Instant; #[tokio::test] @@ -143,7 +158,12 @@ mod tests { .build() .unwrap(), ) - .config(HttpClientConfigBuilder::default().build().unwrap()) + .config( + HttpClientConfigBuilder::default() + .stop_on_errors(vec![401]) + .build() + .unwrap(), + ) .build() .unwrap(); @@ -168,12 +188,15 @@ mod tests { let bench_run_stats = BenchRunMetrics::new(); + STOP_ON_FATAL.store(false, Ordering::Relaxed); + let bench_result = session .next() .expect("Must have runs") .run(bench_run_stats) .await; + assert!(!STOP_ON_FATAL.load(Ordering::Relaxed)); assert!(bench_result.is_ok()); let elapsed = Instant::now().duration_since(start).as_secs_f64(); @@ -193,4 +216,67 @@ mod tests { Some(&(request_count as i32)) ); } + + #[tokio::test] + async fn test_send_load_fatal_code() { + let body = "world"; + + let request_count = 100; + + let _m = mock("GET", "/1") + .with_status(401) + .with_header("content-type", "text/plain") + .with_body(body) + .expect(request_count) + .create(); + + let url = mockito::server_url().to_string(); + println!("Url: {}", url); + let http_adapter = HttpBenchAdapterBuilder::default() + .request( + HttpRequestBuilder::default() + .url(vec![format!("{}/1", url)]) + .build() + .unwrap(), + ) + .config( + HttpClientConfigBuilder::default() + .stop_on_errors(vec![401]) + .build() + .unwrap(), + ) + .build() + .unwrap(); + + let benchmark_config: BenchmarkConfig = BenchmarkConfigBuilder::default() + .rate_ladder( + RateLadderBuilder::default() + .start(request_count as f64) + .end(request_count as f64) + .rate_increment(None) + .step_duration(None) + .step_requests(Some(request_count)) + .build() + .expect("RateLadderBuilder failed"), + ) + .mode(Http(http_adapter.clone())) + .build() + .expect("BenchmarkConfig failed"); + + let mut session = benchmark_config.clone().new_bench_session(); + + let bench_run_stats = BenchRunMetrics::new(); + + STOP_ON_FATAL.store(false, Ordering::Relaxed); + + let bench_result = session + .next() + .expect("Must have runs") + .run(bench_run_stats) + .await; + + // must stop on fatal + assert!(STOP_ON_FATAL.load(Ordering::Relaxed)); + assert!(bench_result.is_ok()); + } } diff --git a/src/configuration.rs b/src/configuration.rs index 1ffdafc..751d558 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -10,13 +10,14 @@ use crate::http_bench_session::{ HttpBenchAdapter, HttpBenchAdapterBuilder, HttpClientConfigBuilder, HttpRequestBuilder, }; use crate::metrics::{DefaultConsoleReporter, ExternalMetricsServiceReporter}; -use clap::{clap_app, ArgMatches}; +use clap::Args; +use clap::Parser; +use clap::Subcommand; use core::fmt; use rand::Rng; use std::fs; use std::fs::File; use std::io::Read; -use std::process::exit; use std::str::FromStr; use std::sync::Arc; use tokio::io; @@ -42,98 +43,145 @@ pub struct BenchmarkConfig { pub reporters: Vec>, } +#[derive(Parser, Debug)] +// #[clap(name = "Performance Gauge")] +// #[clap(author = "Eugene Retunsky")] +// #[clap(version = "0.1.9")] +// #[clap(about = "A tool for gauging performance of network services", long_about = None)] +#[clap(author, version, about, long_about = None)] +#[clap(propagate_version = true)] +struct Cli { + /// Concurrent clients. Default `1`. + #[clap(short, long, default_value_t = 1)] + concurrency: usize, + /// Duration of the test. + #[clap(short, long)] + duration: Option, + /// Number of requests per client. + #[clap(short, long = "num_req")] + num_req: Option, + /// Test case name. Optional. Can be used for tagging metrics. + #[clap(short = 'N', long)] + name: Option, + /// Request rate per second. E.g. 100 or 0.1. By default no limit. + #[clap(short, long)] + rate: Option, + /// Rate increase step (until it reaches --rate_max). + #[clap(long = "rate_step")] + rate_step: Option, + /// Max rate per second. Requires --rate-step + #[clap(long = "rate_max")] + rate_max: Option, + /// takes_value "The number of iterations with the max rate. By default `1`. + #[clap(short, long = "max_iter", default_value_t = 1)] + max_iter: usize, + /// If it's a part of a continuous run. In this case metrics are not reset at the end to avoid saw-like plots. + #[clap(long)] + continuous: bool, + /// If you'd like to send metrics to Prometheus PushGateway, specify the server URL. E.g. 10.0.0.1:9091 + #[clap(long)] + prometheus: Option, + /// Prometheus Job (by default `pushgateway`) + #[clap(long = "prometheus_job")] + prometheus_job: Option, + #[clap(subcommand)] + command: Commands, +} + +#[derive(Subcommand, Debug)] +enum Commands { + Http(HttpOptions), +} + +#[derive(Args, Debug)] +#[clap(about = "Run in HTTP(S) mode", long_about = None)] +#[clap(author, version, long_about = None)] +#[clap(propagate_version = true)] +struct HttpOptions { + /// Target, e.g. https://my-service.com:8443/8kb Can be multiple ones (with random choice balancing). + #[clap()] + target: Vec, + /// Headers in \"Name:Value\" form. Can be provided multiple times. + #[clap(short = 'H', long)] + header: Vec, + /// Method. By default GET. + #[clap(short = 'M', long)] + method: Option, + /// Stop immediately on error codes. E.g. `-E 401 -E 403` + #[clap(short = 'E', long = "error_stop")] + error_stop: Vec, + /// Body of the request. Could be either `random://[0-9]+`, `file://$filename` or `base64://${valid_base64}`. Optional. + #[clap(short = 'B', long)] + body: Option, + /// Allow self signed certificates. + #[clap(long = "ignore_cert")] + ignore_cert: bool, + /// If connections should be re-used. + #[clap(long = "conn_reuse")] + conn_reuse: bool, + /// Enforce HTTP/2 only. + #[clap(long = "http2_only")] + http2_only: bool, +} + impl BenchmarkConfig { pub fn from_command_line() -> io::Result { - let matches = clap_app!(myapp => - (name: "Performance Gauge") - (version: "0.1.8") - (author: "Eugene Retunsky") - (about: "A tool for gauging performance of network services") - (@arg CONCURRENCY: --concurrency -c +takes_value "Concurrent clients. Default `1`.") - (@group duration => - (@arg NUMBER_OF_REQUESTS: --num_req -n +takes_value "Number of requests per client.") - (@arg DURATION: --duration -d +takes_value "Duration of the test.") - ) - (@arg TEST_CASE_NAME: --name -N +takes_value "Test case name. Optional. Can be used for tagging metrics.") - (@arg RATE: --rate -r +takes_value "Request rate per second. E.g. 100 or 0.1. By default no limit.") - (@arg RATE_STEP: --rate_step +takes_value "Rate increase step (until it reaches --rate_max).") - (@arg RATE_MAX: --rate_max +takes_value "Max rate per second. Requires --rate-step") - (@arg MAX_RATE_ITERATIONS: --max_iter -m +takes_value "The number of iterations with the max rate. By default `1`.") - (@arg CONTINUOUS: --continuous "If it's a part of a continuous run. In this case metrics are not reset at the end to avoid saw-like plots.") - (@arg PROMETHEUS_ADDR: --prometheus +takes_value "If you'd like to send metrics to Prometheus PushGateway, specify the server URL. E.g. 10.0.0.1:9091") - (@arg PROMETHEUS_JOB: --prometheus_job +takes_value "Prometheus Job (by default `pushgateway`)") - (@subcommand http => - (about: "Run in HTTP(S) mode") - (version: "0.1.8") - (@arg IGNORE_CERT: --ignore_cert "Allow self signed certificates.") - (@arg CONN_REUSE: --conn_reuse "If connections should be re-used") - (@arg HTTP2_ONLY: --http2_only "Enforce HTTP/2 only") - (@arg TARGET: +required ... "Target, e.g. https://my-service.com:8443/8kb Can be multiple ones (with random choice balancing)") - (@arg METHOD: --method -M +takes_value "Method. By default GET") - (@arg HEADER: --header -H ... "Headers in \"Name:Value\" form. Can be provided multiple times.") - (@arg BODY: --body -B +takes_value "Body of the request. Could be either `random://[0-9]+`, `file://$filename` or `base64://${valid_base64}`. Optional.") - ) - ).get_matches(); + let cli = Cli::parse(); - let test_case_name = matches.value_of("TEST_CASE_NAME").map(|s| s.to_string()); - let concurrency = matches.value_of("CONCURRENCY").unwrap_or("1"); - let rate_per_second = matches.value_of("RATE"); - let rate_step = matches.value_of("RATE_STEP"); - let rate_max = matches.value_of("RATE_MAX"); - let max_rate_iterations = matches.value_of("MAX_RATE_ITERATIONS").unwrap_or("1"); + let concurrency = cli.concurrency; + let rate_per_second = cli.rate; + let rate_step = cli.rate_step; + let rate_max = cli.rate_max; + let max_rate_iterations = cli.max_iter; - let duration = matches.value_of("DURATION").map(|d| { - humantime::Duration::from_str(d) + let duration = cli.duration.as_ref().map(|d| { + humantime::Duration::from_str(d.as_str()) .expect("Illegal duration") .into() }); - let number_of_requests = matches - .value_of("NUMBER_OF_REQUESTS") - .map(|n| parse_num(n, "Illegal number for NUMBER_OF_REQUESTS")); + let number_of_requests = cli.num_req; + + if duration.is_none() && number_of_requests.is_none() { + panic!("Either the number of requests or the test duration must be specified"); + } let rate_ladder = if let Some(rate_max) = rate_max { let rate_per_second = rate_per_second.expect("RATE is required if RATE_MAX is specified"); let rate_step = rate_step.expect("RATE_STEP is required if RATE_MAX is specified"); RateLadderBuilder::default() - .start(parse_num(rate_per_second, "Cannot parse RATE")) - .end(parse_num(rate_max, "Cannot parse RATE_MAX")) - .rate_increment(Some(parse_num(rate_step, "Cannot parse RATE_STEP"))) + .start(rate_per_second) + .end(rate_max) + .rate_increment(Some(rate_step)) .step_duration(duration) .step_requests(number_of_requests) - .max_rate_iterations(parse_num( - max_rate_iterations, - "Cannot parse MAX_RATE_ITERATIONS", - )) + .max_rate_iterations(max_rate_iterations) .build() .expect("RateLadderBuilder failed") } else { - let rps = parse_num(rate_per_second.unwrap_or("0"), "Cannot parse RATE"); + let rps = rate_per_second.unwrap_or(0.0); RateLadderBuilder::default() .start(rps) .end(rps) .rate_increment(None) .step_duration(duration) .step_requests(number_of_requests) - .max_rate_iterations(parse_num( - max_rate_iterations, - "Cannot parse MAX_RATE_ITERATIONS", - )) + .max_rate_iterations(max_rate_iterations) .build() .expect("RateLadderBuilder failed") }; Ok(BenchmarkConfigBuilder::default() - .name(test_case_name.clone()) + .name(cli.name.clone()) .rate_ladder(rate_ladder) - .concurrency(parse_num(concurrency, "Cannot parse CONCURRENCY")) + .concurrency(concurrency) .verbose(false) - .continuous(matches.is_present("CONTINUOUS")) - .mode(BenchmarkConfig::build_mode(&matches)) + .continuous(cli.continuous) + .mode(BenchmarkConfig::build_mode(&cli)) .reporters(BenchmarkConfig::build_metric_destinations( - test_case_name, - matches, + cli.name.clone(), + &cli, )) .build() .expect("BenchmarkConfig failed")) @@ -142,9 +190,11 @@ impl BenchmarkConfig { #[cfg(not(feature = "report-to-prometheus"))] fn build_metric_destinations( test_case_name: Option, - matches: ArgMatches, + args: &Cli, ) -> Vec> { - if matches.value_of("PROMETHEUS_ADDR").is_some() { + use std::process::exit; + + if args.prometheus.is_some() { println!("Prometheus is not supported in this configuration"); exit(-1); } @@ -155,7 +205,7 @@ impl BenchmarkConfig { #[cfg(feature = "report-to-prometheus")] fn build_metric_destinations( test_case_name: Option, - matches: ArgMatches, + args: &Cli, ) -> Vec> { use crate::prometheus_reporter::PrometheusReporter; use std::net::SocketAddr; @@ -166,68 +216,84 @@ impl BenchmarkConfig { test_case_name.clone(), ))]; - if let Some(prometheus_addr) = matches.value_of("PROMETHEUS_ADDR") { - if SocketAddr::from_str(prometheus_addr).is_err() { + if let Some(prometheus_addr) = &args.prometheus { + if SocketAddr::from_str(prometheus_addr.as_str()).is_err() { panic!("Illegal Prometheus Gateway addr `{}`", prometheus_addr); } metrics_destinations.push(Arc::new(PrometheusReporter::new( test_case_name, prometheus_addr.to_string(), - matches.value_of("PROMETHEUS_JOB"), + Some( + args.prometheus_job + .as_ref() + .unwrap_or(&"pushgateway".to_string()) + .clone() + .as_str(), + ), ))); } metrics_destinations } - fn build_mode(matches: &ArgMatches) -> BenchmarkMode { - let mode = if let Some(config) = matches.subcommand_matches("http") { - #[cfg(feature = "tls-boring")] - if config.is_present("IGNORE_CERT") { - println!("--ignore_cert is not supported for BoringSSL"); - exit(-1); - } + fn build_mode(args: &Cli) -> BenchmarkMode { + match &args.command { + Commands::Http(config) => { + #[cfg(feature = "tls-boring")] + if config.ignore_cert { + use std::process::exit; - let http_config = HttpBenchAdapterBuilder::default() - .config( - HttpClientConfigBuilder::default() - .ignore_cert(config.is_present("IGNORE_CERT")) - .conn_reuse(config.is_present("CONN_REUSE")) - .http2_only(config.is_present("HTTP2_ONLY")) - .build() - .expect("HttpClientConfigBuilder failed"), - ) - .request( - HttpRequestBuilder::default() - .url( - config - .values_of("TARGET") - .expect("misconfiguration for TARGET") - .map(|s| s.to_string()) - .collect(), - ) - .method(config.value_of("METHOD").unwrap_or("GET").to_string()) - .headers(BenchmarkConfig::get_multiple_values(config, "HEADER")) - .body(BenchmarkConfig::generate_body(config)) - .build() - .expect("HttpRequestBuilder failed"), - ) - .build() - .expect("BenchmarkModeBuilder failed"); - BenchmarkMode::Http(http_config) - } else { - println!("Run `perf-gauge help` to see program options."); - exit(1); - }; - mode + println!("--ignore_cert is not supported for BoringSSL"); + exit(-1); + } + + let http_config = HttpBenchAdapterBuilder::default() + .config( + HttpClientConfigBuilder::default() + .ignore_cert(config.ignore_cert) + .conn_reuse(config.conn_reuse) + .http2_only(config.http2_only) + .stop_on_errors(config.error_stop.clone()) + .build() + .expect("HttpClientConfigBuilder failed"), + ) + .request( + HttpRequestBuilder::default() + .url(config.target.clone()) + .method(config.method.as_ref().unwrap_or(&"GET".to_string()).clone()) + .headers( + config + .header + .iter() + .map(|s| { + let mut split = s.split(':'); + ( + split + .next() + .expect("Header name is missing") + .to_string(), + split.collect::>().join(":"), + ) + }) + .collect(), + ) + .body(BenchmarkConfig::generate_body(config)) + .build() + .expect("HttpRequestBuilder failed"), + ) + .build() + .expect("BenchmarkModeBuilder failed"); + BenchmarkMode::Http(http_config) + } + } } - fn generate_body(config: &ArgMatches) -> Vec { + fn generate_body(args: &HttpOptions) -> Vec { const RANDOM_PREFIX: &str = "random://"; const BASE64_PREFIX: &str = "base64://"; const FILE_PREFIX: &str = "file://"; - if let Some(body_value) = config.value_of("BODY") { + if let Some(body_value) = &args.body { if let Some(body_size) = body_value.strip_prefix(RANDOM_PREFIX) { BenchmarkConfig::generate_random_vec(body_size) } else if let Some(base64) = body_value.strip_prefix(BASE64_PREFIX) { @@ -262,22 +328,6 @@ impl BenchmarkConfig { buffer } - fn get_multiple_values(config: &ArgMatches, id: &str) -> Vec<(String, String)> { - config - .values_of(id) - .map(|v| { - v.map(|s| { - let mut split = s.split(':'); - ( - split.next().expect("Header name is missing").to_string(), - split.collect::>().join(":"), - ) - }) - .collect() - }) - .unwrap_or_else(Vec::new) - } - pub fn new_bench_session(&mut self) -> BenchSession { BenchSessionBuilder::default() .concurrency(self.concurrency) @@ -297,12 +347,3 @@ impl fmt::Display for BenchmarkConfig { ) } } - -pub fn parse_num(s: &str, error_msg: &str) -> F { - s.parse() - .map_err(|_| { - println!("{}", error_msg); - panic!("Cannot start"); - }) - .unwrap() -} diff --git a/src/http_bench_session.rs b/src/http_bench_session.rs index 3fbb1e8..980f5f1 100644 --- a/src/http_bench_session.rs +++ b/src/http_bench_session.rs @@ -34,6 +34,8 @@ pub struct HttpClientConfig { conn_reuse: bool, #[builder(default)] http2_only: bool, + #[builder(default)] + pub stop_on_errors: Vec, } #[derive(Builder, Deserialize, Clone, Debug)] @@ -126,6 +128,10 @@ impl BenchmarkProtocolAdapter for HttpBenchAdapter { Ok(r) => { let status = r.status().to_string(); let success = r.status().is_success(); + + let fatal_error = + !success && self.config.stop_on_errors.contains(&r.status().as_u16()); + let mut stream = r.into_body(); let mut total_size = 0; while let Some(item) = stream.next().await { @@ -140,6 +146,7 @@ impl BenchmarkProtocolAdapter for HttpBenchAdapter { .status(status) .is_success(success) .duration(Instant::now().duration_since(start)) + .fatal_error(fatal_error) .build() .expect("RequestStatsBuilder failed") } @@ -151,6 +158,7 @@ impl BenchmarkProtocolAdapter for HttpBenchAdapter { .status(status) .is_success(false) .duration(Instant::now().duration_since(start)) + .fatal_error(false) .build() .expect("RequestStatsBuilder failed") } @@ -163,9 +171,8 @@ impl HttpRequest { let method = Method::from_str(&self.method.clone()).expect("Method must be valid at this point"); - let mut request_builder = Request::builder() - .method(method) - .uri(&self.url[thread_rng().gen_range(0..self.url.len())].clone()); + let uri = &self.url[thread_rng().gen_range(0..self.url.len())]; + let mut request_builder = Request::builder().method(method).uri(uri.clone()); if !self.headers.is_empty() { for (key, value) in self.headers.iter() { @@ -183,6 +190,12 @@ impl HttpRequest { } else { request_builder .body(Body::empty()) + .map_err(|e| { + println!( + "Cannot create url {}, headers: {:?}. Error: {}", + uri, self.headers, e + ); + }) .expect("Error building Request") } } diff --git a/src/metrics.rs b/src/metrics.rs index 6a722c4..e780d86 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -61,6 +61,7 @@ pub struct RequestStats { pub duration: Duration, #[builder(default = "None")] pub operation_name: Option, + pub fatal_error: bool, } impl BenchRunMetrics { @@ -392,6 +393,7 @@ mod tests { status: code, duration: Default::default(), operation_name: None, + fatal_error: false, }); } @@ -421,6 +423,7 @@ mod tests { status: "200 OK".to_string(), duration: Duration::from_micros(i), operation_name: None, + fatal_error: false, }); } @@ -452,6 +455,7 @@ mod tests { } else { Some("OperationB".to_string()) }, + fatal_error: false, }); } @@ -538,6 +542,7 @@ mod tests { status: "200 OK".to_string(), duration: Duration::from_micros(i), operation_name: None, + fatal_error: false, }); } diff --git a/src/prometheus_reporter.rs b/src/prometheus_reporter.rs index 095cb8a..538ebb0 100644 --- a/src/prometheus_reporter.rs +++ b/src/prometheus_reporter.rs @@ -372,6 +372,7 @@ mod test { .status(code) .is_success(success) .duration(Duration::from_micros(i as u64)) + .fatal_error(false) .build() .expect("RequestStatsBuilder failed"), ); @@ -478,6 +479,7 @@ mod test { .bytes_processed(i) .status(code) .is_success(success) + .fatal_error(false) .duration(Duration::from_micros(i as u64)) .build() .expect("RequestStatsBuilder failed"), @@ -594,6 +596,7 @@ mod test { status: "200 OK".to_string(), duration: Duration::from_micros(i), operation_name: None, + fatal_error: false, }); }