From 4ee8dba4035e2889538fb60a2f8a9c1e930d3549 Mon Sep 17 00:00:00 2001 From: Jason Brown Date: Mon, 9 Oct 2023 14:39:48 -0700 Subject: [PATCH] adapter: Replace client command's latencies with counts SHOW PROXIED QUERIES and SHOW CACHES were recently updated to output latency percentiles of the queries executed. As we are piggy- backing off our prometheus metrics for capturing and in-memory storage, we are bound byt the implementation details of prometheus histogramws (Distributions). Those Distributions only report data for the last 60 seconds, by default. The impact to users is if they execxute a query, dump SHOW CACHES, and see some latency information, if they simply wait > 60 seconds and issue SHOW CACHES again, the latencies get "reset to zero". As we are using this latency information in SHOW CACHES as more of a Day 1 experience (getting a user to try out and experience ReadySet), the current "resetting" is a bit confusing. Hence, this patch replaces the latency percentiles with a simple incrementing counter. This shows enough information to users to indicate that ReadySet is indeed working and handling queries. Fixes: REA-3540 Release-Note-Core: Replace latency histograms in SHOW PROXIED QUERIES and SHOW CACHES with a simple counter. Change-Id: Ic9623ba08f1503491ed283766a0492b2b6e58e37 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/6183 Tested-by: Buildkite CI Reviewed-by: Luke Osborne --- readyset-adapter/src/backend.rs | 43 +++----------- readyset-adapter/src/metrics_handle.rs | 67 ++++++++++------------ readyset-client-metrics/src/recorded.rs | 1 + readyset-clustertest/src/readyset_mysql.rs | 19 +----- readyset/src/query_logger.rs | 54 +++++++++++++++++ 5 files changed, 96 insertions(+), 88 deletions(-) diff --git a/readyset-adapter/src/backend.rs b/readyset-adapter/src/backend.rs index bf1353bb7f..4009d59ec7 100644 --- a/readyset-adapter/src/backend.rs +++ b/readyset-adapter/src/backend.rs @@ -1748,17 +1748,9 @@ where } let select_schema = if let Some(handle) = self.metrics_handle.as_mut() { - // Must snapshot histograms to get the latest metrics - handle.snapshot_histograms(); - create_dummy_schema!( - "query id", - "proxied query", - "readyset supported", - "count", - "p50 (ms)", - "p90 (ms)", - "p99 (ms)" - ) + // Must snapshot to get the latest metrics + handle.snapshot_counters(readyset_client_metrics::DatabaseType::MySql); + create_dummy_schema!("query id", "proxied query", "readyset supported", "count") } else { create_dummy_schema!("query id", "proxied query", "readyset supported") }; @@ -1785,16 +1777,9 @@ where // Append metrics if we have them if let Some(handle) = self.metrics_handle.as_ref() { - let MetricsSummary { - sample_count, - p50_us, - p90_us, - p99_us, - } = handle.metrics_summary(id.to_string()).unwrap_or_default(); + let MetricsSummary { sample_count } = + handle.metrics_summary(id.to_string()).unwrap_or_default(); row.push(DfValue::from(format!("{sample_count}"))); - row.push(DfValue::from(format!("{:.3}", 1000.0 * p50_us))); - row.push(DfValue::from(format!("{:.3}", 1000.0 * p90_us))); - row.push(DfValue::from(format!("{:.3}", 1000.0 * p99_us))); } row @@ -1831,16 +1816,13 @@ where let select_schema = if let Some(handle) = self.metrics_handle.as_mut() { // Must snapshot histograms to get the latest metrics - handle.snapshot_histograms(); + handle.snapshot_counters(readyset_client_metrics::DatabaseType::ReadySet); create_dummy_schema!( "query id", "cache name", "query text", "fallback behavior", - "count", - "p50 (ms)", - "p90 (ms)", - "p99 (ms)" + "count" ) } else { create_dummy_schema!("query id", "cache name", "query text", "fallback behavior") @@ -1867,16 +1849,9 @@ where // Append metrics if we have them if let Some(handle) = self.metrics_handle.as_ref() { - let MetricsSummary { - sample_count, - p50_us, - p90_us, - p99_us, - } = handle.metrics_summary(id.to_string()).unwrap_or_default(); + let MetricsSummary { sample_count } = + handle.metrics_summary(id.to_string()).unwrap_or_default(); row.push(DfValue::from(format!("{sample_count}"))); - row.push(DfValue::from(format!("{:.3}", 1000.0 * p50_us))); - row.push(DfValue::from(format!("{:.3}", 1000.0 * p90_us))); - row.push(DfValue::from(format!("{:.3}", 1000.0 * p99_us))); } results.push(row); diff --git a/readyset-adapter/src/metrics_handle.rs b/readyset-adapter/src/metrics_handle.rs index e4ca5e9e18..4e8dca29e1 100644 --- a/readyset-adapter/src/metrics_handle.rs +++ b/readyset-adapter/src/metrics_handle.rs @@ -1,28 +1,24 @@ use std::collections::HashMap; use indexmap::IndexMap; +use metrics::SharedString; use metrics_exporter_prometheus::formatting::{sanitize_label_key, sanitize_label_value}; use metrics_exporter_prometheus::{Distribution, PrometheusHandle}; -use metrics_util::Summary; -use quanta::Instant; // adding an alias to disambiguate vs readyset_client_metrics::recorded use readyset_client::metrics::recorded as client_recorded; -use readyset_client_metrics::recorded::QUERY_LOG_EXECUTION_TIME; +use readyset_client_metrics::recorded::QUERY_LOG_EXECUTION_COUNT; +use readyset_client_metrics::DatabaseType; use readyset_data::TimestampTz; #[derive(Debug, Default, Clone)] pub struct MetricsSummary { - // i64 because Postgres doesn't have unsigned ints - pub sample_count: i64, - pub p50_us: f64, - pub p90_us: f64, - pub p99_us: f64, + pub sample_count: u64, } #[derive(Clone)] pub struct MetricsHandle { inner: PrometheusHandle, - snapshot: Option>, + snapshot: Option>, } impl MetricsHandle { @@ -59,54 +55,51 @@ impl MetricsHandle { self.inner.distributions(filter) } - /// Clone a snapshot of all QUERY_LOG_EXECUTION_TIME histograms. - pub fn snapshot_histograms(&mut self) { + /// Clone a snapshot of all QUERY_LOG_EXECUTION_COUNT counters. + pub fn snapshot_counters(&mut self, database_type: DatabaseType) { fn filter(key: &str) -> bool { - key == QUERY_LOG_EXECUTION_TIME + key == QUERY_LOG_EXECUTION_COUNT } - let histograms = self - .histograms(Some(filter)) - .get(QUERY_LOG_EXECUTION_TIME) + let db_type = SharedString::from(database_type).to_string(); + + let counters = self + .counters(Some(filter)) + .get(QUERY_LOG_EXECUTION_COUNT) .cloned() .map(|h| { h.into_iter() - .filter_map(|(k, dist)| { - k.into_iter() - .find(|k| k.starts_with("query_id")) - .and_then(|k| { - let summary = match dist { - Distribution::Summary(summary, _, _) => { - Some(summary.snapshot(Instant::now())) - } - _ => None, - }; - summary.map(|s| (k, s)) - }) + .filter_map(|(k, count)| { + let mut query_id_tag = None; + for tag in k { + if tag.starts_with("query_id") { + query_id_tag = Some(tag); + } else if tag.starts_with("database_type") && !tag.contains(&db_type) { + return None; + } + } + + query_id_tag.map(|k| (k, count)) }) .collect::>() }); - self.snapshot = histograms; + self.snapshot = counters; } - /// Return the count (number of samples), 0.5, 0.9 and 0.99 quantiles for the query specified by - /// `query_id`. + /// Returns the execution count query specified by `query_id`. /// - /// NOTE: Quantiles are queried from the last snapshot obtained by calling - /// [`Self::snapshot_histograms`] + /// NOTE: Values are queried from the last snapshot obtained by calling + /// [`Self::snapshot_counters`] pub fn metrics_summary(&self, query_id: String) -> Option { let label = format!( "{}=\"{}\"", sanitize_label_key("query_id"), sanitize_label_value(&query_id) ); - let summary = self.snapshot.as_ref()?.get(&label)?; + let summary = self.snapshot.as_ref()?.get(&label).or(Some(&0))?; Some(MetricsSummary { - sample_count: summary.count().try_into().unwrap_or_default(), - p50_us: summary.quantile(0.5).unwrap_or_default(), - p90_us: summary.quantile(0.90).unwrap_or_default(), - p99_us: summary.quantile(0.99).unwrap_or_default(), + sample_count: *summary, }) } diff --git a/readyset-client-metrics/src/recorded.rs b/readyset-client-metrics/src/recorded.rs index 171189e6ce..a68d40c964 100644 --- a/readyset-client-metrics/src/recorded.rs +++ b/readyset-client-metrics/src/recorded.rs @@ -12,6 +12,7 @@ /// /// [`DatabaseType`]: crate::DatabaseType pub const QUERY_LOG_EXECUTION_TIME: &str = "readyset_query_log_execution_time"; +pub const QUERY_LOG_EXECUTION_COUNT: &str = "readyset_query_log_execution_count"; /// Histogram: The time in seconds that the database spent executing a /// query. diff --git a/readyset-clustertest/src/readyset_mysql.rs b/readyset-clustertest/src/readyset_mysql.rs index bfc4e806db..2fe38985a3 100644 --- a/readyset-clustertest/src/readyset_mysql.rs +++ b/readyset-clustertest/src/readyset_mysql.rs @@ -1776,7 +1776,7 @@ async fn show_query_metrics() { } // Check `SHOW PROXIED QUERIES` - let proxied_result: Vec<(String, String, String, String, String, String, String)> = adapter + let proxied_result: Vec<(String, String, String, String)> = adapter .as_mysql_conn() .unwrap() .query(r"SHOW PROXIED QUERIES") @@ -1785,22 +1785,10 @@ async fn show_query_metrics() { // Assert that we get a non-zero value for the metrics assert!(&proxied_result[0].3 != "0"); - assert!(&proxied_result[0].4 != "0.0"); - assert!(&proxied_result[0].5 != "0.0"); - assert!(&proxied_result[0].6 != "0.0"); // Check `SHOW CACHES` #[allow(clippy::type_complexity)] - let caches_result: Vec<( - String, - String, - String, - String, - String, - String, - String, - String, - )> = adapter + let caches_result: Vec<(String, String, String, String, String)> = adapter .as_mysql_conn() .unwrap() .query(r"SHOW CACHES") @@ -1809,7 +1797,4 @@ async fn show_query_metrics() { // Assert that we get a non-zero value for the metrics assert!(&caches_result[0].3 != "0"); - assert!(&caches_result[0].4 != "0.0"); - assert!(&caches_result[0].5 != "0.0"); - assert!(&caches_result[0].6 != "0.0"); } diff --git a/readyset/src/query_logger.rs b/readyset/src/query_logger.rs index 55693bf918..90dd10963f 100644 --- a/readyset/src/query_logger.rs +++ b/readyset/src/query_logger.rs @@ -29,6 +29,7 @@ struct QueryMetrics { cache_misses: Counter, cache_keys_missed: Counter, histograms: BTreeMap<(EventType, SqlQueryType), QueryHistograms>, + counters: BTreeMap<(EventType, SqlQueryType), QueryCounters>, } #[derive(Default)] @@ -38,6 +39,13 @@ struct QueryHistograms { readyset_exe_time: Option, } +// this counter is for use in Day 1, demo mode +#[derive(Default)] +struct QueryCounters { + upstream_exe_count: Option, + readyset_exe_count: Option, +} + impl QueryMetrics { fn parse_histogram(&mut self, kind: (EventType, SqlQueryType)) -> &mut Histogram { self.histograms @@ -100,6 +108,48 @@ impl QueryMetrics { register_histogram!(recorded::QUERY_LOG_EXECUTION_TIME, &labels) }) } + + fn readyset_counter(&mut self, kind: (EventType, SqlQueryType)) -> &mut Counter { + self.counters + .entry(kind) + .or_default() + .readyset_exe_count + .get_or_insert_with(|| { + let mut labels = vec![ + ("query", self.query.clone()), + ("event_type", SharedString::from(kind.0)), + ("query_type", SharedString::from(kind.1)), + ("database_type", SharedString::from(DatabaseType::ReadySet)), + ]; + + if let Some(id) = &self.query_id { + labels.push(("query_id", id.clone())); + } + + register_counter!(recorded::QUERY_LOG_EXECUTION_COUNT, &labels) + }) + } + + fn upstream_counter(&mut self, kind: (EventType, SqlQueryType)) -> &mut Counter { + self.counters + .entry(kind) + .or_default() + .upstream_exe_count + .get_or_insert_with(|| { + let mut labels = vec![ + ("query", self.query.clone()), + ("event_type", SharedString::from(kind.0)), + ("query_type", SharedString::from(kind.1)), + ("database_type", SharedString::from(DatabaseType::MySql)), + ]; + + if let Some(id) = &self.query_id { + labels.push(("query_id", id.clone())); + } + + register_counter!(recorded::QUERY_LOG_EXECUTION_COUNT, &labels) + }) + } } impl QueryLogger { @@ -143,6 +193,7 @@ impl QueryLogger { query: query_string, query_id: Some(query_id), histograms: BTreeMap::new(), + counters: BTreeMap::new(), } }) } @@ -169,6 +220,7 @@ impl QueryLogger { query: query_string, query_id: None, histograms: BTreeMap::new(), + counters: BTreeMap::new(), } }) } @@ -262,12 +314,14 @@ impl QueryLogger { metrics .readyset_histogram((event.event, event.sql_type)) .record(duration); + metrics.readyset_counter((event.event, event.sql_type)).increment(1); } if let Some(duration) = event.upstream_duration { metrics .upstream_histogram((event.event, event.sql_type)) .record(duration); + metrics.upstream_counter((event.event, event.sql_type)).increment(1); } } }