From b894c71da9eb83e62549dbe568d5bd7cfba27d42 Mon Sep 17 00:00:00 2001 From: xnuter Date: Tue, 20 Jul 2021 21:30:25 -0700 Subject: [PATCH] Adding multi-operation metric reporting --- src/bench_run.rs | 9 +- src/configuration.rs | 2 +- src/metrics.rs | 165 ++++++++++++++++++++++++++--- src/prometheus_reporter.rs | 211 ++++++++++++++++++++++++++++++------- 4 files changed, 331 insertions(+), 56 deletions(-) diff --git a/src/bench_run.rs b/src/bench_run.rs index 9452fd4..89e71d9 100644 --- a/src/bench_run.rs +++ b/src/bench_run.rs @@ -178,8 +178,11 @@ mod tests { let stats = bench_result.unwrap(); - assert_eq!(body.len() * request_count, stats.total_bytes); - assert_eq!(request_count, stats.total_requests); - assert_eq!(stats.summary.get("200 OK"), Some(&(request_count as i32))); + assert_eq!(body.len() * request_count, stats.combined.total_bytes); + assert_eq!(request_count, stats.combined.total_requests); + assert_eq!( + stats.combined.summary.get("200 OK"), + Some(&(request_count as i32)) + ); } } diff --git a/src/configuration.rs b/src/configuration.rs index 68269de..7a886ec 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -169,7 +169,7 @@ impl BenchmarkConfig { panic!("Illegal Prometheus Gateway addr `{}`", prometheus_addr); } metrics_destinations.push(Arc::new(PrometheusReporter::new( - test_case_name.clone(), + test_case_name, prometheus_addr.to_string(), matches.value_of("PROMETHEUS_JOB"), ))); diff --git a/src/metrics.rs b/src/metrics.rs index 747371f..f2f7440 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -18,6 +18,12 @@ pub struct DefaultConsoleReporter { #[derive(Clone)] pub struct BenchRunMetrics { + pub(crate) combined: BenchRunMetricsItem, + pub(crate) by_operation: HashMap, +} + +#[derive(Clone)] +pub struct BenchRunMetricsItem { pub(crate) bench_begin: Instant, pub(crate) total_bytes: usize, pub(crate) total_requests: usize, @@ -27,9 +33,15 @@ pub struct BenchRunMetrics { pub(crate) error_latency: Histogram, } -/// Default reporter that prints stats to console. #[derive(Serialize)] struct BenchRunReport { + combined: BenchRunReportItem, + by_operation: HashMap, +} + +/// Default reporter that prints stats to console. +#[derive(Serialize)] +struct BenchRunReportItem { test_case_name: Option, duration: Duration, total_bytes: usize, @@ -47,9 +59,30 @@ pub struct RequestStats { pub bytes_processed: usize, pub status: String, pub duration: Duration, + #[builder(default = "None")] + pub operation_name: Option, } impl BenchRunMetrics { + pub fn new() -> Self { + Self { + combined: BenchRunMetricsItem::new(), + by_operation: HashMap::new(), + } + } + + pub fn report_request(&mut self, stats: RequestStats) { + self.combined.report_request(&stats); + if let Some(operation_name) = stats.operation_name.as_ref() { + self.by_operation + .entry(operation_name.to_owned()) + .or_insert_with(BenchRunMetricsItem::new) + .report_request(&stats); + } + } +} + +impl BenchRunMetricsItem { pub fn new() -> Self { Self { bench_begin: Instant::now(), @@ -62,7 +95,7 @@ impl BenchRunMetrics { } } - pub fn report_request(&mut self, stats: RequestStats) { + pub fn report_request(&mut self, stats: &RequestStats) { self.total_requests += 1; if stats.is_success { self.successful_requests += 1; @@ -75,7 +108,10 @@ impl BenchRunMetrics { .unwrap_or_default(); } self.total_bytes += stats.bytes_processed; - self.summary.entry(stats.status).or_insert(0).add_assign(1); + self.summary + .entry(stats.status.to_owned()) + .or_insert(0) + .add_assign(1); } pub fn truncated_mean(histogram: &Histogram, threshold: f64) -> u64 { @@ -110,8 +146,8 @@ impl BenchRunMetrics { } } -impl BenchRunReport { - fn summary_ordered(metrics: &BenchRunMetrics) -> Vec<(String, i32)> { +impl BenchRunReportItem { + fn summary_ordered(metrics: &BenchRunMetricsItem) -> Vec<(String, i32)> { let mut pairs: Vec<(String, i32)> = metrics .summary .iter() @@ -130,7 +166,7 @@ impl BenchRunReport { pairs } - fn latency_summary(metrics: &BenchRunMetrics) -> Vec<(String, u64)> { + fn latency_summary(metrics: &BenchRunMetricsItem) -> Vec<(String, u64)> { // for simplicity of reporting we merge both latency // into a single histogram. let mut latency = metrics.success_latency.clone(); @@ -163,21 +199,27 @@ impl BenchRunReport { ("StdDev".to_string(), latency.stddev().unwrap_or_default()), ( "tm95".to_string(), - BenchRunMetrics::truncated_mean(&latency, 5.0), + BenchRunMetricsItem::truncated_mean(&latency, 5.0), ), ( "tm99".to_string(), - BenchRunMetrics::truncated_mean(&latency, 1.0), + BenchRunMetricsItem::truncated_mean(&latency, 1.0), ), ( "tm99.9".to_string(), - BenchRunMetrics::truncated_mean(&latency, 0.1), + BenchRunMetricsItem::truncated_mean(&latency, 0.1), ), ] } } impl fmt::Display for BenchRunReport { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f, "{}", self.combined) + } +} + +impl fmt::Display for BenchRunReportItem { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let name = match self.test_case_name.as_ref() { None => String::new(), @@ -273,12 +315,38 @@ impl DefaultConsoleReporter { Self { test_case_name } } + fn sorted_operations(metrics: &BenchRunMetrics) -> Vec { + let sorted_operation_name: Vec = + metrics.by_operation.keys().map(|s| s.to_owned()).collect(); + sorted_operation_name + } + fn build_report(&self, metrics: &BenchRunMetrics) -> BenchRunReport { + let mut by_operation = HashMap::new(); + let sorted_operation_name = DefaultConsoleReporter::sorted_operations(metrics); + for operation in sorted_operation_name { + by_operation.insert( + operation.to_owned(), + self.build_item_report( + &metrics + .by_operation + .get(&operation) + .expect("Operation key cannot be missing"), + ), + ); + } + BenchRunReport { + combined: self.build_item_report(&metrics.combined), + by_operation, + } + } + + fn build_item_report(&self, metrics: &BenchRunMetricsItem) -> BenchRunReportItem { let successful_requests = metrics.successful_requests as usize; let total_requests = metrics.total_requests as usize; let total_bytes = metrics.total_bytes as usize; let duration = Instant::now().duration_since(metrics.bench_begin); - BenchRunReport { + BenchRunReportItem { test_case_name: self .test_case_name .as_ref() @@ -290,8 +358,8 @@ impl DefaultConsoleReporter { success_rate: successful_requests as f64 * 100. / total_requests as f64, rate_per_second: total_requests as f64 / duration.as_secs_f64(), bitrate_mbps: total_bytes as f64 / duration.as_secs_f64() * 8. / 1_000_000., - response_code_summary: BenchRunReport::summary_ordered(metrics), - latency_summary: BenchRunReport::latency_summary(metrics), + response_code_summary: BenchRunReportItem::summary_ordered(metrics), + latency_summary: BenchRunReportItem::latency_summary(metrics), } } } @@ -323,11 +391,13 @@ mod tests { bytes_processed: 0, status: code, duration: Default::default(), + operation_name: None, }); } let mut ordered_summary = DefaultConsoleReporter::new(None) .build_report(&metrics) + .combined .response_code_summary .into_iter(); assert_eq!( @@ -350,11 +420,12 @@ mod tests { bytes_processed: 0, status: "200 OK".to_string(), duration: Duration::from_micros(i), + operation_name: None, }); } let report = DefaultConsoleReporter::new(None).build_report(&metrics); - let mut items = report.latency_summary.into_iter(); + let mut items = report.combined.latency_summary.into_iter(); assert_eq!(Some(("Min".to_string(), 0)), items.next()); assert_eq!(Some(("p50".to_string(), 500)), items.next()); @@ -367,6 +438,73 @@ mod tests { assert_eq!(Some(("StdDev".to_string(), 289)), items.next()); } + #[test] + fn test_by_operation_reporting() { + let mut metrics = BenchRunMetrics::new(); + for i in 0..1000 { + metrics.report_request(RequestStats { + is_success: true, + bytes_processed: 0, + status: "200 OK".to_string(), + duration: Duration::from_micros(i), + operation_name: if i % 2 == 0 { + Some("OperationA".to_string()) + } else { + Some("OperationB".to_string()) + }, + }); + } + + let report = DefaultConsoleReporter::new(None).build_report(&metrics); + let mut items = report.combined.latency_summary.to_owned().into_iter(); + + assert_eq!(Some(("Min".to_string(), 0)), items.next()); + assert_eq!(Some(("p50".to_string(), 500)), items.next()); + assert_eq!(Some(("p90".to_string(), 900)), items.next()); + assert_eq!(Some(("p99".to_string(), 990)), items.next()); + assert_eq!(Some(("p99.9".to_string(), 999)), items.next()); + assert_eq!(Some(("p99.99".to_string(), 999)), items.next()); + assert_eq!(Some(("Max".to_string(), 999)), items.next()); + assert_eq!(Some(("Mean".to_string(), 500)), items.next()); + assert_eq!(Some(("StdDev".to_string(), 289)), items.next()); + + assert_eq!(report.by_operation.len(), 2); + + let mut items = report + .by_operation + .get("OperationA") + .unwrap() + .latency_summary + .to_owned() + .into_iter(); + assert_eq!(Some(("Min".to_string(), 0)), items.next()); + assert_eq!(Some(("p50".to_string(), 500)), items.next()); + assert_eq!(Some(("p90".to_string(), 900)), items.next()); + assert_eq!(Some(("p99".to_string(), 990)), items.next()); + assert_eq!(Some(("p99.9".to_string(), 998)), items.next()); + assert_eq!(Some(("p99.99".to_string(), 998)), items.next()); + assert_eq!(Some(("Max".to_string(), 998)), items.next()); + assert_eq!(Some(("Mean".to_string(), 499)), items.next()); + assert_eq!(Some(("StdDev".to_string(), 289)), items.next()); + + let mut items = report + .by_operation + .get("OperationB") + .unwrap() + .latency_summary + .to_owned() + .into_iter(); + assert_eq!(Some(("Min".to_string(), 1)), items.next()); + assert_eq!(Some(("p50".to_string(), 501)), items.next()); + assert_eq!(Some(("p90".to_string(), 901)), items.next()); + assert_eq!(Some(("p99".to_string(), 991)), items.next()); + assert_eq!(Some(("p99.9".to_string(), 999)), items.next()); + assert_eq!(Some(("p99.99".to_string(), 999)), items.next()); + assert_eq!(Some(("Max".to_string(), 999)), items.next()); + assert_eq!(Some(("Mean".to_string(), 501)), items.next()); + assert_eq!(Some(("StdDev".to_string(), 289)), items.next()); + } + #[test] fn test_has_more_work_request_limit() { let requests = 10; @@ -399,6 +537,7 @@ mod tests { bytes_processed: 0, status: "200 OK".to_string(), duration: Duration::from_micros(i), + operation_name: None, }); } diff --git a/src/prometheus_reporter.rs b/src/prometheus_reporter.rs index 8516f4e..7823da9 100644 --- a/src/prometheus_reporter.rs +++ b/src/prometheus_reporter.rs @@ -1,4 +1,4 @@ -use crate::metrics::{BenchRunMetrics, ExternalMetricsServiceReporter}; +use crate::metrics::{BenchRunMetrics, BenchRunMetricsItem, ExternalMetricsServiceReporter}; use histogram::Histogram; use log::info; use prometheus::core::{AtomicI64, GenericGauge, GenericGaugeVec}; @@ -15,9 +15,39 @@ pub struct PrometheusReporter { impl ExternalMetricsServiceReporter for PrometheusReporter { fn report(&self, metrics: &BenchRunMetrics) -> io::Result<()> { + self.report_item(None, &metrics.combined)?; + for (operation, metrics_item) in metrics.by_operation.iter() { + self.report_item(Some(operation.to_owned()), metrics_item)?; + } + Ok(()) + } + + fn reset_metrics(&self) { + info!("Stop sending metrics to Prometheus: {}", self.address); + // send empty metrics to reset counters + self.report(&BenchRunMetrics::new()).unwrap_or_default(); + } +} + +/// For reporting to Prometheus +impl PrometheusReporter { + pub fn new(test_case_name: Option, addr: String, job: Option<&str>) -> Self { + Self { + test_case_name, + job: job.unwrap_or("pushgateway").to_string(), + address: addr, + basic_auth: None, + } + } + + fn report_item( + &self, + operation_name: Option, + metrics: &BenchRunMetricsItem, + ) -> io::Result<()> { info!("Sending metrics to Prometheus: {}", self.address,); - let registry = PrometheusReporter::build_registry(metrics); + let registry = PrometheusReporter::build_registry(operation_name, &metrics); let metric_families = registry.gather(); @@ -43,62 +73,54 @@ impl ExternalMetricsServiceReporter for PrometheusReporter { .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } - fn reset_metrics(&self) { - info!("Stop sending metrics to Prometheus: {}", self.address); - // send empty metrics to reset counters - self.report(&BenchRunMetrics::new()).unwrap_or_default(); - } -} - -/// For reporting to Prometheus -impl PrometheusReporter { - pub fn new(test_case_name: Option, addr: String, job: Option<&str>) -> Self { - Self { - test_case_name, - job: job.unwrap_or("pushgateway").to_string(), - address: addr, - basic_auth: None, - } + fn build_metric_name(operation_name: &Option, name: &str) -> String { + operation_name + .as_ref() + .map(|s| format!("{}_{}", s, name)) + .unwrap_or_else(|| name.to_string()) } - fn build_registry(bench_run_metrics: &BenchRunMetrics) -> Registry { + fn build_registry( + operation_name: Option, + bench_run_metrics: &BenchRunMetricsItem, + ) -> Registry { let registry = Registry::new(); PrometheusReporter::register_gauge( ®istry, - "request_count", + PrometheusReporter::build_metric_name(&operation_name, "request_count"), "All requests", bench_run_metrics.total_requests as i64, ); PrometheusReporter::register_gauge( ®istry, - "success_count", + PrometheusReporter::build_metric_name(&operation_name, "success_count"), "Successful requests", bench_run_metrics.successful_requests as i64, ); PrometheusReporter::register_gauge( ®istry, - "bytes_count", + PrometheusReporter::build_metric_name(&operation_name, "bytes_count"), "Bytes received/sent", bench_run_metrics.total_bytes as i64, ); PrometheusReporter::register_codes( ®istry, - "response_codes", + PrometheusReporter::build_metric_name(&operation_name, "response_codes"), "Response codes/errors", &bench_run_metrics.summary, ); PrometheusReporter::register_histogram( ®istry, - "success_latency", + PrometheusReporter::build_metric_name(&operation_name, "success_latency"), "Latency of successful requests", bench_run_metrics.success_latency.clone(), ); PrometheusReporter::register_histogram( ®istry, - "error_latency", + PrometheusReporter::build_metric_name(&operation_name, "error_latency"), "Latency of failed requests", bench_run_metrics.error_latency.clone(), ); @@ -107,7 +129,7 @@ impl PrometheusReporter { latency.merge(&bench_run_metrics.error_latency); PrometheusReporter::register_histogram( ®istry, - "latency", + PrometheusReporter::build_metric_name(&operation_name, "latency"), "Latency of failed requests", latency, ); @@ -115,7 +137,7 @@ impl PrometheusReporter { registry } - fn register_gauge(registry: &Registry, name: &str, help: &str, value: i64) { + fn register_gauge(registry: &Registry, name: String, help: &str, value: i64) { let gauge = GenericGauge::::new(name, help).expect("Creating gauge failed"); registry .register(Box::new(gauge.clone())) @@ -126,7 +148,7 @@ impl PrometheusReporter { fn register_codes + Copy>( registry: &Registry, - name: &str, + name: String, help: &str, map_of_codes: &HashMap, ) { @@ -142,7 +164,7 @@ impl PrometheusReporter { .for_each(|(k, v)| codes.with_label_values(&[k]).set((*v).into())) } - fn register_histogram(registry: &Registry, name: &str, help: &str, histogram: Histogram) { + fn register_histogram(registry: &Registry, name: String, help: &str, histogram: Histogram) { let mut buckets = vec![]; let mut counts = vec![]; for bucket in histogram.into_iter() { @@ -158,7 +180,7 @@ impl PrometheusReporter { counts.iter().sum::() ); let prometheus_histogram = prometheus::Histogram::with_opts( - HistogramOpts::new(name, help).buckets(buckets.clone()), + HistogramOpts::new(name.to_owned(), help).buckets(buckets.clone()), ) .expect("Histogram failed"); @@ -178,7 +200,7 @@ impl PrometheusReporter { fn register_histogram_precalculated( registry: &Registry, - name: &str, + name: String, help: &str, histogram: Histogram, ) { @@ -209,21 +231,21 @@ impl PrometheusReporter { ("stddev".to_string(), histogram.stddev().unwrap_or_default()), ( "tm95".to_string(), - BenchRunMetrics::truncated_mean(&histogram, 5.0), + BenchRunMetricsItem::truncated_mean(&histogram, 5.0), ), ( "tm99".to_string(), - BenchRunMetrics::truncated_mean(&histogram, 1.0), + BenchRunMetricsItem::truncated_mean(&histogram, 1.0), ), ( "tm99_9".to_string(), - BenchRunMetrics::truncated_mean(&histogram, 0.1), + BenchRunMetricsItem::truncated_mean(&histogram, 0.1), ), ]; for (label, value) in percentiles { PrometheusReporter::register_gauge( registry, - format!("{}_{}", name, label).as_str(), + format!("{}_{}", name, label), format!("{} {}", help, label).as_str(), value as i64, ); @@ -253,7 +275,7 @@ mod test { counters.insert("500".to_string(), 1); PrometheusReporter::register_codes( ®istry, - "response_codes", + PrometheusReporter::build_metric_name(&None, "response_codes"), "HTTP response codes", &counters, ); @@ -283,7 +305,7 @@ mod test { PrometheusReporter::register_histogram( ®istry, - "latency", + PrometheusReporter::build_metric_name(&None, "latency"), "Latency of requests", histogram, ); @@ -328,7 +350,7 @@ mod test { } #[test] - fn test_build_registry() { + fn test_build_registry_combined() { let mut metrics = BenchRunMetrics::new(); let mut total_bytes = 0; let mut successful_requests = 0; @@ -358,7 +380,7 @@ mod test { .report(&metrics) .expect("infallible"); - let registry = PrometheusReporter::build_registry(&metrics); + let registry = PrometheusReporter::build_registry(None, &metrics.combined); let metrics = registry.gather(); @@ -434,6 +456,116 @@ mod test { ); } + #[test] + fn test_build_registry_with_operation() { + let mut metrics = BenchRunMetrics::new(); + let mut total_bytes = 0; + let mut successful_requests = 0; + let mut total_requests = 0; + + for i in 1..=100 { + let (success, code) = if i % 5 == 0 { + (true, "200".to_string()) + } else { + (false, "500".to_string()) + }; + total_bytes += i; + successful_requests += if success { 1 } else { 0 }; + total_requests += 1; + + metrics.report_request( + RequestStatsBuilder::default() + .bytes_processed(i) + .status(code) + .is_success(success) + .duration(Duration::from_micros(i as u64)) + .build() + .expect("RequestStatsBuilder failed"), + ); + } + DefaultConsoleReporter::new(None) + .report(&metrics) + .expect("infallible"); + + let registry = + PrometheusReporter::build_registry(Some("prefix".to_string()), &metrics.combined); + + let metrics = registry.gather(); + + let mut metrics_map = HashMap::new(); + + for m in metrics.iter() { + metrics_map.insert(m.get_name(), m); + } + + let bytes_count = metrics_map + .get("prefix_bytes_count") + .expect("Missing bytes_count"); + let error_latency = metrics_map + .get("prefix_error_latency") + .expect("Missing error_latency"); + let latency = metrics_map.get("prefix_latency").expect("Missing latency"); + let request_count = metrics_map + .get("prefix_request_count") + .expect("Missing request_count"); + let response_codes = metrics_map + .get("prefix_response_codes") + .expect("Missing response_codes"); + let success_count = metrics_map + .get("prefix_success_count") + .expect("Missing success_count"); + let success_latency = metrics_map + .get("prefix_success_latency") + .expect("Missing success_latency"); + + assert_eq!(MetricType::GAUGE, bytes_count.get_field_type()); + assert_eq!(MetricType::GAUGE, request_count.get_field_type()); + assert_eq!(MetricType::GAUGE, success_count.get_field_type()); + assert_eq!(MetricType::GAUGE, response_codes.get_field_type()); + assert_eq!(MetricType::HISTOGRAM, latency.get_field_type()); + assert_eq!(MetricType::HISTOGRAM, success_latency.get_field_type()); + assert_eq!(MetricType::HISTOGRAM, error_latency.get_field_type()); + + assert_eq!( + total_bytes as f64, + bytes_count.get_metric()[0].get_gauge().get_value() + ); + assert_eq!( + total_requests as f64, + request_count.get_metric()[0].get_gauge().get_value() + ); + assert_eq!( + successful_requests as f64, + success_count.get_metric()[0].get_gauge().get_value() + ); + + assert_eq!( + "Code", + response_codes.get_metric()[0].get_label()[0].get_name() + ); + assert_eq!( + "200", + response_codes.get_metric()[0].get_label()[0].get_value() + ); + assert_eq!( + successful_requests as f64, + response_codes.get_metric()[0].get_gauge().get_value() + ); + + assert_eq!( + "Code", + response_codes.get_metric()[1].get_label()[0].get_name() + ); + assert_eq!( + "500", + response_codes.get_metric()[1].get_label()[0].get_value() + ); + assert_eq!( + (total_requests - successful_requests) as f64, + response_codes.get_metric()[1].get_gauge().get_value() + ); + } + #[test] fn test_prometheus_reporting() { let _m = mock( @@ -461,6 +593,7 @@ mod test { bytes_processed: 0, status: "200 OK".to_string(), duration: Duration::from_micros(i), + operation_name: None, }); }