From 3d6800ba1d75bf77e1f871302b95315fc2308741 Mon Sep 17 00:00:00 2001 From: Dan Wilbanks Date: Wed, 30 Aug 2023 17:56:36 -0400 Subject: [PATCH] adapter: Display metrics in SHOW statements Displays query latency quantiles in SHOW PROXIED QUERIES and SHOW CACHES if `--prometheus-metrics` is enabled. Introduces a `MetricsHandle` in the adapter, which wraps a `PrometheusHandle`, providing some ReadySet specific convenience functions. The `MetricsHandle` can be used to query quantiles for a particular query id. The quantiles are returned from the snapshot of histograms that was taken at the last call to `snapshot_histograms`. This is done because the `PrometheusHandle` only supports returning all histograms for a particular tag. Release-Note-Core: ReadySet will display latency quantiles in SHOW PROXIED QUERIES and SHOW CACHES if `--prometheus-metrics` is enabled. Change-Id: I30d50aff79a087ad497c26db8b9a03d285b6a690 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/5979 Tested-by: Buildkite CI Reviewed-by: Luke Osborne --- Cargo.lock | 4 + Cargo.toml | 9 +- readyset-adapter/Cargo.toml | 3 + readyset-adapter/src/backend.rs | 75 +++++++++++++++-- readyset-adapter/src/lib.rs | 1 + readyset-adapter/src/metrics_handle.rs | 98 ++++++++++++++++++++++ readyset-client/Cargo.toml | 1 + readyset-clustertest/src/lib.rs | 16 ++++ readyset-clustertest/src/readyset_mysql.rs | 56 +++++++++++++ readyset-clustertest/src/server.rs | 4 + readyset/src/lib.rs | 9 +- 11 files changed, 257 insertions(+), 19 deletions(-) create mode 100644 readyset-adapter/src/metrics_handle.rs diff --git a/Cargo.lock b/Cargo.lock index 99d87be2f8..8ad7ae541a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4296,15 +4296,18 @@ dependencies = [ "futures-util", "health-reporter", "hyper", + "indexmap", "itertools", "lazy_static", "metrics", "metrics-exporter-prometheus", + "metrics-util", "mysql_common", "nom", "nom-sql", "parking_lot 0.11.2", "proptest", + "quanta", "readyset-alloc", "readyset-client", "readyset-client-metrics", @@ -4380,6 +4383,7 @@ dependencies = [ "hyper", "itertools", "metrics", + "metrics-exporter-prometheus", "metrics-util", "mysql-time", "mysql_async", diff --git a/Cargo.toml b/Cargo.toml index 1994b9cc23..88530053b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,9 +4,6 @@ eui48 = { git = "https://github.com/readysettech/eui48.git", branch = "master" } opentelemetry = { git = "https://github.com/open-telemetry/opentelemetry-rust" } opentelemetry-otlp = { git = "https://github.com/open-telemetry/opentelemetry-rust" } opentelemetry-semantic-conventions = { git = "https://github.com/open-telemetry/opentelemetry-rust" } -metrics-exporter-prometheus = { git = "https://github.com/readysettech/metrics.git" } -metrics = { git = "https://github.com/readysettech/metrics.git" } -metrics-util = { git = "https://github.com/readysettech/metrics.git" } [workspace] members = [ @@ -78,9 +75,9 @@ postgres-types = { git = "https://github.com/readysettech/rust-postgres.git"} tokio-postgres = { git = "https://github.com/readysettech/rust-postgres.git"} tokio = { version = "1.32", features = ["full"] } rocksdb = { git = "https://github.com/readysettech/rust-rocksdb.git", default-features = false, features = ["lz4", "jemalloc"] } -metrics-exporter-prometheus = { version = "0.12.1" } -metrics = { version = "0.21.1" } -metrics-util = { version = "0.15.1"} +metrics-exporter-prometheus = { git = "https://github.com/readysettech/metrics.git" } +metrics = { git = "https://github.com/readysettech/metrics.git" } +metrics-util = { git = "https://github.com/readysettech/metrics.git" } [profile.release] debug=true diff --git a/readyset-adapter/Cargo.toml b/readyset-adapter/Cargo.toml index 48a8c8b814..dd0c496df8 100644 --- a/readyset-adapter/Cargo.toml +++ b/readyset-adapter/Cargo.toml @@ -30,6 +30,7 @@ thiserror = "1.0.26" readyset-util = { path = "../readyset-util" } metrics = { workspace = true } metrics-exporter-prometheus = { workspace = true } +metrics-util = { workspace = true } futures = "0.3" serde = "1.0.130" serde_json = "1.0.67" @@ -45,6 +46,8 @@ mysql_common = "0.29" bincode = "1.3.3" parking_lot = "0.11.2" sqlformat = "0.2.1" +indexmap = { version = "1", default-features = false } +quanta = { version = "0.11", default-features = false } readyset-alloc = { path = "../readyset-alloc/" } readyset-client = { path = "../readyset-client/" } diff --git a/readyset-adapter/src/backend.rs b/readyset-adapter/src/backend.rs index c94c00710a..9539e3a8f8 100644 --- a/readyset-adapter/src/backend.rs +++ b/readyset-adapter/src/backend.rs @@ -103,6 +103,7 @@ use tracing::{error, instrument, trace, warn}; use vec1::Vec1; use crate::backend::noria_connector::ExecuteSelectContext; +use crate::metrics_handle::MetricsHandle; use crate::query_handler::SetBehavior; use crate::query_status_cache::QueryStatusCache; pub use crate::upstream_database::UpstreamPrepare; @@ -266,6 +267,7 @@ pub struct BackendBuilder { fallback_recovery_seconds: u64, telemetry_sender: Option, enable_experimental_placeholder_inlining: bool, + metrics_handle: Option, } impl Default for BackendBuilder { @@ -285,6 +287,7 @@ impl Default for BackendBuilder { fallback_recovery_seconds: 0, telemetry_sender: None, enable_experimental_placeholder_inlining: false, + metrics_handle: None, } } } @@ -337,6 +340,7 @@ impl BackendBuilder { }, telemetry_sender: self.telemetry_sender, authority, + metrics_handle: self.metrics_handle, _query_handler: PhantomData, } } @@ -414,6 +418,11 @@ impl BackendBuilder { self.enable_experimental_placeholder_inlining = enable_experimental_placeholder_inlining; self } + + pub fn metrics_handle(mut self, metrics_handle: Option) -> Self { + self.metrics_handle = metrics_handle; + self + } } /// A [`CachedPreparedStatement`] stores the data needed for an immediate @@ -510,6 +519,9 @@ where /// the Controller. authority: Arc, + /// Handle to the [`metrics_exporter_prometheus::PrometheusRecorder`] that runs in the adapter. + metrics_handle: Option, + _query_handler: PhantomData, } @@ -1731,7 +1743,20 @@ where queries.retain(|q| q.status.migration_state.is_supported()); } - let select_schema = create_dummy_schema!("query id", "proxied query", "readyset supported"); + let select_schema = if let Some(handle) = self.metrics_handle.as_mut() { + // Must snapshot histograms to get the latest metrics + handle.snapshot_histograms(); + create_dummy_schema!( + "query id", + "proxied query", + "readyset supported", + "p50 (ms)", + "p90 (ms)", + "p99 (ms)" + ) + } else { + create_dummy_schema!("query id", "proxied query", "readyset supported") + }; let mut data = queries .into_iter() @@ -1745,13 +1770,24 @@ where } .to_string(); - vec![ + let mut row = vec![ DfValue::from(id.to_string()), DfValue::from(Self::format_query_text( query.display(DB::SQL_DIALECT).to_string(), )), DfValue::from(s), - ] + ]; + + // Append metrics if we have them + if let Some(handle) = self.metrics_handle.as_ref() { + let (p50, p90, p99) = + handle.quantiles(id.to_string()).unwrap_or((0.0, 0.0, 0.0)); + row.push(DfValue::from(format!("{:.3}", 1000.0 * p50))); + row.push(DfValue::from(format!("{:.3}", 1000.0 * p90))); + row.push(DfValue::from(format!("{:.3}", 1000.0 * p99))); + } + + row }) .collect::>(); @@ -1783,10 +1819,26 @@ where queries.retain(|(id, _, _)| id.to_string() == *q_id); } + let select_schema = if let Some(handle) = self.metrics_handle.as_mut() { + // Must snapshot histograms to get the latest metrics + handle.snapshot_histograms(); + create_dummy_schema!( + "query id", + "cache name", + "query text", + "fallback behavior", + "p50 (ms)", + "p90 (ms)", + "p99 (ms)" + ) + } else { + create_dummy_schema!("query id", "cache name", "query text", "fallback behavior") + }; + // Get the cache name for each query from the view cache let mut results: Vec> = vec![]; for (id, view, status) in queries { - results.push(vec![ + let mut row = vec![ id.to_string().into(), self.noria .get_view(&view.statement, false, false, None) @@ -1800,11 +1852,18 @@ where } else { "fallback allowed".into() }, - ]); - } + ]; + + // Append metrics if we have them + if let Some(handle) = self.metrics_handle.as_ref() { + let (p50, p90, p99) = handle.quantiles(id.to_string()).unwrap_or((0.0, 0.0, 0.0)); + row.push(DfValue::from(format!("{:.3}", 1000.0 * p50))); + row.push(DfValue::from(format!("{:.3}", 1000.0 * p90))); + row.push(DfValue::from(format!("{:.3}", 1000.0 * p99))); + } - let select_schema = - create_dummy_schema!("query id", "cache name", "query text", "fallback behavior"); + results.push(row); + } Ok(noria_connector::QueryResult::from_owned( select_schema, diff --git a/readyset-adapter/src/lib.rs b/readyset-adapter/src/lib.rs index 57143ce1b8..a49262d810 100644 --- a/readyset-adapter/src/lib.rs +++ b/readyset-adapter/src/lib.rs @@ -10,6 +10,7 @@ pub mod backend; pub mod http_router; +pub mod metrics_handle; pub mod migration_handler; pub mod proxied_queries_reporter; mod query_handler; diff --git a/readyset-adapter/src/metrics_handle.rs b/readyset-adapter/src/metrics_handle.rs new file mode 100644 index 0000000000..9f603fb04a --- /dev/null +++ b/readyset-adapter/src/metrics_handle.rs @@ -0,0 +1,98 @@ +use std::collections::HashMap; + +use indexmap::IndexMap; +use metrics_exporter_prometheus::formatting::{sanitize_label_key, sanitize_label_value}; +use metrics_exporter_prometheus::{Distribution, PrometheusHandle}; +use metrics_util::Summary; +use quanta::Instant; +use readyset_client_metrics::recorded::QUERY_LOG_EXECUTION_TIME; + +#[derive(Clone)] +pub struct MetricsHandle { + inner: PrometheusHandle, + snapshot: Option>, +} + +impl MetricsHandle { + /// Create a new [`MetricsHandle`] given a [`PrometheusHandle`] + pub fn new(prometheus_handle: PrometheusHandle) -> Self { + Self { + inner: prometheus_handle, + snapshot: None, + } + } + + /// Returns a snapshot of the counters held by [`Self`]. Optionally filters by `filter`. + pub fn counters bool>( + &self, + filter: Option, + ) -> HashMap, u64>> { + self.inner.counters(filter) + } + + /// Returns a snapshot of the gauges held by [`Self`]. Optionally filters by `filter`. + pub fn gauges bool>( + &self, + filter: Option, + ) -> HashMap, f64>> { + self.inner.gauges(filter) + } + + /// Returns a snapshot of the histograms and summaries held by [`Self`]. Optionall filters by + /// `filter`. + pub fn histograms bool>( + &self, + filter: Option, + ) -> HashMap, Distribution>> { + self.inner.distributions(filter) + } + + /// Clone a snapshot of all QUERY_LOG_EXECUTION_TIME histograms. + pub fn snapshot_histograms(&mut self) { + fn filter(key: &str) -> bool { + key == QUERY_LOG_EXECUTION_TIME + } + + let histograms = self + .histograms(Some(filter)) + .get(QUERY_LOG_EXECUTION_TIME) + .cloned() + .map(|h| { + h.into_iter() + .filter_map(|(k, dist)| { + k.into_iter() + .find(|k| k.starts_with("query_id")) + .and_then(|k| { + let summary = match dist { + Distribution::Summary(summary, _, _) => { + Some(summary.snapshot(Instant::now())) + } + _ => None, + }; + summary.map(|s| (k, s)) + }) + }) + .collect::>() + }); + self.snapshot = histograms; + } + + /// Return the 0.5, 0.9 and 0.99 quantiles for the query specified by `query_id`. + /// + /// NOTE: Quantiles are queried from the last snapshot obtained by calling + /// [`Self::snapshot_histograms`] + pub fn quantiles(&self, query_id: String) -> Option<(f64, f64, f64)> { + let label = format!( + "{}=\"{}\"", + sanitize_label_key("query_id"), + sanitize_label_value(&query_id) + ); + let summary = self.snapshot.as_ref()?.get(&label)?; + + Some(( + summary.quantile(0.5).unwrap_or_default(), + summary.quantile(0.90).unwrap_or_default(), + summary.quantile(0.99).unwrap_or_default(), + )) + } +} diff --git a/readyset-client/Cargo.toml b/readyset-client/Cargo.toml index 27b040cd1e..8610da2770 100644 --- a/readyset-client/Cargo.toml +++ b/readyset-client/Cargo.toml @@ -57,6 +57,7 @@ rocksdb.workspace = true tokio-postgres = { workspace = true, features = ["with-chrono-0_4", "with-eui48-1", "with-uuid-0_8", "with-serde_json-1", "with-bit-vec-0_6"] } metrics = { workspace = true } metrics-util = { workspace = true } +metrics-exporter-prometheus = { workspace = true } itertools = "0.10" bytes = "1.0.1" rust_decimal = { version = "1.26", features = ["db-tokio-postgres", "serde-str"] } diff --git a/readyset-clustertest/src/lib.rs b/readyset-clustertest/src/lib.rs index 3e3962ef10..02ded81141 100644 --- a/readyset-clustertest/src/lib.rs +++ b/readyset-clustertest/src/lib.rs @@ -457,6 +457,8 @@ pub struct DeploymentBuilder { embedded_readers: bool, /// Whether to allow fully materialized nodes or not allow_full_materialization: bool, + /// Whether to allow prometheus metrics + prometheus_metrics: bool, } pub enum FailpointDestination { @@ -531,6 +533,7 @@ impl DeploymentBuilder { enable_experimental_placeholder_inlining: false, embedded_readers: false, allow_full_materialization: false, + prometheus_metrics: false, } } /// The number of shards in the graph, `shards` <= 1 disables sharding. @@ -700,6 +703,12 @@ impl DeploymentBuilder { self } + /// Sets whether to enable prometheus metrics in the adapter + pub fn prometheus_metrics(mut self, prometheus_metrics: bool) -> Self { + self.prometheus_metrics = prometheus_metrics; + self + } + pub fn adapter_start_params(&self) -> AdapterStartParams { let wait_for_failpoint = matches!( self.wait_for_failpoint, @@ -722,6 +731,7 @@ impl DeploymentBuilder { enable_experimental_placeholder_inlining: self.enable_experimental_placeholder_inlining, embedded_readers: self.embedded_readers, allow_full_materialization: self.allow_full_materialization, + prometheus_metrics: self.prometheus_metrics, } } @@ -1538,6 +1548,8 @@ pub struct AdapterStartParams { embedded_readers: bool, /// Whether or not to allow full materializations allow_full_materialization: bool, + /// Whether to enable prometheus metrics + prometheus_metrics: bool, } async fn start_server( @@ -1683,6 +1695,10 @@ async fn start_adapter( builder = builder.allow_full_materialization(); } + if params.prometheus_metrics { + builder = builder.prometheus_metrics() + } + builder.start().await } diff --git a/readyset-clustertest/src/readyset_mysql.rs b/readyset-clustertest/src/readyset_mysql.rs index 448ec9c1be..e6a0616455 100644 --- a/readyset-clustertest/src/readyset_mysql.rs +++ b/readyset-clustertest/src/readyset_mysql.rs @@ -1782,3 +1782,59 @@ async fn aliased_query_explicitly_cached() { deployment.teardown().await.unwrap(); } + +#[clustertest] +async fn show_query_metrics() { + readyset_tracing::init_test_logging(); + let mut deployment = readyset_mysql("show_query_metrics") + .standalone() + .prometheus_metrics(true) + .start() + .await + .unwrap(); + let mut adapter = deployment.first_adapter().await; + + adapter.query_drop("CREATE TABLE t (c INT)").await.unwrap(); + + // Proxy a query + adapter.query_drop("SELECT c FROM t").await.unwrap(); + // Run a query against ReadySet + deployment.leader_handle().ready().await.unwrap(); + eventually! { + adapter + .query_drop("CREATE CACHE FROM SELECT c FROM t WHERE c = ?") + .await + .is_ok() + } + + eventually! { + adapter.query_drop("SELECT c FROM t where c = 1").await.unwrap(); + last_statement_destination(adapter.as_mysql_conn().unwrap()).await == QueryDestination::Readyset + } + + // Check `SHOW PROXIED QUERIES` + let proxied_result: Vec<(String, String, String, String, String, String)> = adapter + .as_mysql_conn() + .unwrap() + .query(r"SHOW PROXIED QUERIES") + .await + .unwrap(); + + // Assert that we get a non-zero value for the metrics + assert!(&proxied_result[0].3 != "0.0"); + assert!(&proxied_result[0].4 != "0.0"); + assert!(&proxied_result[0].5 != "0.0"); + + // Check `SHOW CACHES` + let caches_result: Vec<(String, String, String, String, String, String, String)> = adapter + .as_mysql_conn() + .unwrap() + .query(r"SHOW CACHES") + .await + .unwrap(); + + // Assert that we get a non-zero value for the metrics + assert!(&caches_result[0].4 != "0.0"); + assert!(&caches_result[0].5 != "0.0"); + assert!(&caches_result[0].6 != "0.0"); +} diff --git a/readyset-clustertest/src/server.rs b/readyset-clustertest/src/server.rs index 965c69a5c5..e18c605ffc 100644 --- a/readyset-clustertest/src/server.rs +++ b/readyset-clustertest/src/server.rs @@ -349,4 +349,8 @@ impl AdapterBuilder { pub fn allow_full_materialization(self) -> Self { self.push_arg("--allow-full-materialization") } + + pub fn prometheus_metrics(self) -> Self { + self.push_arg("--prometheus-metrics") + } } diff --git a/readyset/src/lib.rs b/readyset/src/lib.rs index 2d52b5eeeb..2304decd79 100644 --- a/readyset/src/lib.rs +++ b/readyset/src/lib.rs @@ -30,6 +30,7 @@ use nom_sql::Relation; use readyset_adapter::backend::noria_connector::{NoriaConnector, ReadBehavior}; use readyset_adapter::backend::MigrationMode; use readyset_adapter::http_router::NoriaAdapterHttpRouter; +use readyset_adapter::metrics_handle::MetricsHandle; use readyset_adapter::migration_handler::MigrationHandler; use readyset_adapter::proxied_queries_reporter::ProxiedQueriesReporter; use readyset_adapter::query_status_cache::{MigrationStyle, QueryStatusCache}; @@ -744,7 +745,7 @@ where }; let http_server = NoriaAdapterHttpRouter { listen_addr: options.metrics_address, - prometheus_handle, + prometheus_handle: prometheus_handle.clone(), health_reporter: health_reporter.clone(), failpoint_channel: tx, }; @@ -957,9 +958,8 @@ where .query_max_failure_seconds(options.query_max_failure_seconds) .telemetry_sender(telemetry_sender.clone()) .fallback_recovery_seconds(options.fallback_recovery_seconds) - .enable_experimental_placeholder_inlining( - options.experimental_placeholder_inlining, - ); + .enable_experimental_placeholder_inlining(options.experimental_placeholder_inlining) + .metrics_handle(prometheus_handle.clone().map(MetricsHandle::new)); let telemetry_sender = telemetry_sender.clone(); // Initialize the reader layer for the adapter. @@ -971,7 +971,6 @@ where ReadRequestHandler::new(readers.clone(), tx, Duration::from_secs(5)) }); - let query_status_cache = query_status_cache; let upstream_config = upstream_config.clone(); let fut = async move { let upstream_res =