Skip to content

Commit

Permalink
adapter: Display metrics in SHOW statements
Browse files Browse the repository at this point in the history
Displays query latency quantiles in SHOW PROXIED QUERIES and SHOW CACHES
if `--prometheus-metrics` is enabled.

Introduces a `MetricsHandle` in the adapter, which wraps a
`PrometheusHandle`, providing some ReadySet specific convenience
functions.

The `MetricsHandle` can be used to query quantiles for a particular
query id. The quantiles are returned from the snapshot of histograms
that was taken at the last call to `snapshot_histograms`. This is done
because the `PrometheusHandle` only supports returning all histograms
for a particular tag.

Release-Note-Core: ReadySet will display latency quantiles in SHOW
  PROXIED QUERIES and SHOW CACHES if `--prometheus-metrics` is enabled.
Change-Id: I30d50aff79a087ad497c26db8b9a03d285b6a690
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/5979
Tested-by: Buildkite CI
Reviewed-by: Luke Osborne <luke@readyset.io>
  • Loading branch information
Dan Wilbanks authored and lukoktonos committed Sep 11, 2023
1 parent db272cc commit 3d6800b
Show file tree
Hide file tree
Showing 11 changed files with 257 additions and 19 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 3 additions & 6 deletions Cargo.toml
Expand Up @@ -4,9 +4,6 @@ eui48 = { git = "https://github.com/readysettech/eui48.git", branch = "master" }
opentelemetry = { git = "https://github.com/open-telemetry/opentelemetry-rust" }
opentelemetry-otlp = { git = "https://github.com/open-telemetry/opentelemetry-rust" }
opentelemetry-semantic-conventions = { git = "https://github.com/open-telemetry/opentelemetry-rust" }
metrics-exporter-prometheus = { git = "https://github.com/readysettech/metrics.git" }
metrics = { git = "https://github.com/readysettech/metrics.git" }
metrics-util = { git = "https://github.com/readysettech/metrics.git" }

[workspace]
members = [
Expand Down Expand Up @@ -78,9 +75,9 @@ postgres-types = { git = "https://github.com/readysettech/rust-postgres.git"}
tokio-postgres = { git = "https://github.com/readysettech/rust-postgres.git"}
tokio = { version = "1.32", features = ["full"] }
rocksdb = { git = "https://github.com/readysettech/rust-rocksdb.git", default-features = false, features = ["lz4", "jemalloc"] }
metrics-exporter-prometheus = { version = "0.12.1" }
metrics = { version = "0.21.1" }
metrics-util = { version = "0.15.1"}
metrics-exporter-prometheus = { git = "https://github.com/readysettech/metrics.git" }
metrics = { git = "https://github.com/readysettech/metrics.git" }
metrics-util = { git = "https://github.com/readysettech/metrics.git" }

[profile.release]
debug=true
3 changes: 3 additions & 0 deletions readyset-adapter/Cargo.toml
Expand Up @@ -30,6 +30,7 @@ thiserror = "1.0.26"
readyset-util = { path = "../readyset-util" }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
metrics-util = { workspace = true }
futures = "0.3"
serde = "1.0.130"
serde_json = "1.0.67"
Expand All @@ -45,6 +46,8 @@ mysql_common = "0.29"
bincode = "1.3.3"
parking_lot = "0.11.2"
sqlformat = "0.2.1"
indexmap = { version = "1", default-features = false }
quanta = { version = "0.11", default-features = false }

readyset-alloc = { path = "../readyset-alloc/" }
readyset-client = { path = "../readyset-client/" }
Expand Down
75 changes: 67 additions & 8 deletions readyset-adapter/src/backend.rs
Expand Up @@ -103,6 +103,7 @@ use tracing::{error, instrument, trace, warn};
use vec1::Vec1;

use crate::backend::noria_connector::ExecuteSelectContext;
use crate::metrics_handle::MetricsHandle;
use crate::query_handler::SetBehavior;
use crate::query_status_cache::QueryStatusCache;
pub use crate::upstream_database::UpstreamPrepare;
Expand Down Expand Up @@ -266,6 +267,7 @@ pub struct BackendBuilder {
fallback_recovery_seconds: u64,
telemetry_sender: Option<TelemetrySender>,
enable_experimental_placeholder_inlining: bool,
metrics_handle: Option<MetricsHandle>,
}

impl Default for BackendBuilder {
Expand All @@ -285,6 +287,7 @@ impl Default for BackendBuilder {
fallback_recovery_seconds: 0,
telemetry_sender: None,
enable_experimental_placeholder_inlining: false,
metrics_handle: None,
}
}
}
Expand Down Expand Up @@ -337,6 +340,7 @@ impl BackendBuilder {
},
telemetry_sender: self.telemetry_sender,
authority,
metrics_handle: self.metrics_handle,
_query_handler: PhantomData,
}
}
Expand Down Expand Up @@ -414,6 +418,11 @@ impl BackendBuilder {
self.enable_experimental_placeholder_inlining = enable_experimental_placeholder_inlining;
self
}

pub fn metrics_handle(mut self, metrics_handle: Option<MetricsHandle>) -> Self {
self.metrics_handle = metrics_handle;
self
}
}

/// A [`CachedPreparedStatement`] stores the data needed for an immediate
Expand Down Expand Up @@ -510,6 +519,9 @@ where
/// the Controller.
authority: Arc<Authority>,

/// Handle to the [`metrics_exporter_prometheus::PrometheusRecorder`] that runs in the adapter.
metrics_handle: Option<MetricsHandle>,

_query_handler: PhantomData<Handler>,
}

Expand Down Expand Up @@ -1731,7 +1743,20 @@ where
queries.retain(|q| q.status.migration_state.is_supported());
}

let select_schema = create_dummy_schema!("query id", "proxied query", "readyset supported");
let select_schema = if let Some(handle) = self.metrics_handle.as_mut() {
// Must snapshot histograms to get the latest metrics
handle.snapshot_histograms();
create_dummy_schema!(
"query id",
"proxied query",
"readyset supported",
"p50 (ms)",
"p90 (ms)",
"p99 (ms)"
)
} else {
create_dummy_schema!("query id", "proxied query", "readyset supported")
};

let mut data = queries
.into_iter()
Expand All @@ -1745,13 +1770,24 @@ where
}
.to_string();

vec![
let mut row = vec![
DfValue::from(id.to_string()),
DfValue::from(Self::format_query_text(
query.display(DB::SQL_DIALECT).to_string(),
)),
DfValue::from(s),
]
];

// Append metrics if we have them
if let Some(handle) = self.metrics_handle.as_ref() {
let (p50, p90, p99) =
handle.quantiles(id.to_string()).unwrap_or((0.0, 0.0, 0.0));
row.push(DfValue::from(format!("{:.3}", 1000.0 * p50)));
row.push(DfValue::from(format!("{:.3}", 1000.0 * p90)));
row.push(DfValue::from(format!("{:.3}", 1000.0 * p99)));
}

row
})
.collect::<Vec<_>>();

Expand Down Expand Up @@ -1783,10 +1819,26 @@ where
queries.retain(|(id, _, _)| id.to_string() == *q_id);
}

let select_schema = if let Some(handle) = self.metrics_handle.as_mut() {
// Must snapshot histograms to get the latest metrics
handle.snapshot_histograms();
create_dummy_schema!(
"query id",
"cache name",
"query text",
"fallback behavior",
"p50 (ms)",
"p90 (ms)",
"p99 (ms)"
)
} else {
create_dummy_schema!("query id", "cache name", "query text", "fallback behavior")
};

// Get the cache name for each query from the view cache
let mut results: Vec<Vec<DfValue>> = vec![];
for (id, view, status) in queries {
results.push(vec![
let mut row = vec![
id.to_string().into(),
self.noria
.get_view(&view.statement, false, false, None)
Expand All @@ -1800,11 +1852,18 @@ where
} else {
"fallback allowed".into()
},
]);
}
];

// Append metrics if we have them
if let Some(handle) = self.metrics_handle.as_ref() {
let (p50, p90, p99) = handle.quantiles(id.to_string()).unwrap_or((0.0, 0.0, 0.0));
row.push(DfValue::from(format!("{:.3}", 1000.0 * p50)));
row.push(DfValue::from(format!("{:.3}", 1000.0 * p90)));
row.push(DfValue::from(format!("{:.3}", 1000.0 * p99)));
}

let select_schema =
create_dummy_schema!("query id", "cache name", "query text", "fallback behavior");
results.push(row);
}

Ok(noria_connector::QueryResult::from_owned(
select_schema,
Expand Down
1 change: 1 addition & 0 deletions readyset-adapter/src/lib.rs
Expand Up @@ -10,6 +10,7 @@

pub mod backend;
pub mod http_router;
pub mod metrics_handle;
pub mod migration_handler;
pub mod proxied_queries_reporter;
mod query_handler;
Expand Down
98 changes: 98 additions & 0 deletions readyset-adapter/src/metrics_handle.rs
@@ -0,0 +1,98 @@
use std::collections::HashMap;

use indexmap::IndexMap;
use metrics_exporter_prometheus::formatting::{sanitize_label_key, sanitize_label_value};
use metrics_exporter_prometheus::{Distribution, PrometheusHandle};
use metrics_util::Summary;
use quanta::Instant;
use readyset_client_metrics::recorded::QUERY_LOG_EXECUTION_TIME;

#[derive(Clone)]
pub struct MetricsHandle {
inner: PrometheusHandle,
snapshot: Option<HashMap<String, Summary>>,
}

impl MetricsHandle {
/// Create a new [`MetricsHandle`] given a [`PrometheusHandle`]
pub fn new(prometheus_handle: PrometheusHandle) -> Self {
Self {
inner: prometheus_handle,
snapshot: None,
}
}

/// Returns a snapshot of the counters held by [`Self`]. Optionally filters by `filter`.
pub fn counters<F: Fn(&str) -> bool>(
&self,
filter: Option<F>,
) -> HashMap<String, HashMap<Vec<String>, u64>> {
self.inner.counters(filter)
}

/// Returns a snapshot of the gauges held by [`Self`]. Optionally filters by `filter`.
pub fn gauges<F: Fn(&str) -> bool>(
&self,
filter: Option<F>,
) -> HashMap<String, HashMap<Vec<String>, f64>> {
self.inner.gauges(filter)
}

/// Returns a snapshot of the histograms and summaries held by [`Self`]. Optionall filters by
/// `filter`.
pub fn histograms<F: Fn(&str) -> bool>(
&self,
filter: Option<F>,
) -> HashMap<String, IndexMap<Vec<String>, Distribution>> {
self.inner.distributions(filter)
}

/// Clone a snapshot of all QUERY_LOG_EXECUTION_TIME histograms.
pub fn snapshot_histograms(&mut self) {
fn filter(key: &str) -> bool {
key == QUERY_LOG_EXECUTION_TIME
}

let histograms = self
.histograms(Some(filter))
.get(QUERY_LOG_EXECUTION_TIME)
.cloned()
.map(|h| {
h.into_iter()
.filter_map(|(k, dist)| {
k.into_iter()
.find(|k| k.starts_with("query_id"))
.and_then(|k| {
let summary = match dist {
Distribution::Summary(summary, _, _) => {
Some(summary.snapshot(Instant::now()))
}
_ => None,
};
summary.map(|s| (k, s))
})
})
.collect::<HashMap<_, _>>()
});
self.snapshot = histograms;
}

/// Return the 0.5, 0.9 and 0.99 quantiles for the query specified by `query_id`.
///
/// NOTE: Quantiles are queried from the last snapshot obtained by calling
/// [`Self::snapshot_histograms`]
pub fn quantiles(&self, query_id: String) -> Option<(f64, f64, f64)> {
let label = format!(
"{}=\"{}\"",
sanitize_label_key("query_id"),
sanitize_label_value(&query_id)
);
let summary = self.snapshot.as_ref()?.get(&label)?;

Some((
summary.quantile(0.5).unwrap_or_default(),
summary.quantile(0.90).unwrap_or_default(),
summary.quantile(0.99).unwrap_or_default(),
))
}
}
1 change: 1 addition & 0 deletions readyset-client/Cargo.toml
Expand Up @@ -57,6 +57,7 @@ rocksdb.workspace = true
tokio-postgres = { workspace = true, features = ["with-chrono-0_4", "with-eui48-1", "with-uuid-0_8", "with-serde_json-1", "with-bit-vec-0_6"] }
metrics = { workspace = true }
metrics-util = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
itertools = "0.10"
bytes = "1.0.1"
rust_decimal = { version = "1.26", features = ["db-tokio-postgres", "serde-str"] }
Expand Down
16 changes: 16 additions & 0 deletions readyset-clustertest/src/lib.rs
Expand Up @@ -457,6 +457,8 @@ pub struct DeploymentBuilder {
embedded_readers: bool,
/// Whether to allow fully materialized nodes or not
allow_full_materialization: bool,
/// Whether to allow prometheus metrics
prometheus_metrics: bool,
}

pub enum FailpointDestination {
Expand Down Expand Up @@ -531,6 +533,7 @@ impl DeploymentBuilder {
enable_experimental_placeholder_inlining: false,
embedded_readers: false,
allow_full_materialization: false,
prometheus_metrics: false,
}
}
/// The number of shards in the graph, `shards` <= 1 disables sharding.
Expand Down Expand Up @@ -700,6 +703,12 @@ impl DeploymentBuilder {
self
}

/// Sets whether to enable prometheus metrics in the adapter
pub fn prometheus_metrics(mut self, prometheus_metrics: bool) -> Self {
self.prometheus_metrics = prometheus_metrics;
self
}

pub fn adapter_start_params(&self) -> AdapterStartParams {
let wait_for_failpoint = matches!(
self.wait_for_failpoint,
Expand All @@ -722,6 +731,7 @@ impl DeploymentBuilder {
enable_experimental_placeholder_inlining: self.enable_experimental_placeholder_inlining,
embedded_readers: self.embedded_readers,
allow_full_materialization: self.allow_full_materialization,
prometheus_metrics: self.prometheus_metrics,
}
}

Expand Down Expand Up @@ -1538,6 +1548,8 @@ pub struct AdapterStartParams {
embedded_readers: bool,
/// Whether or not to allow full materializations
allow_full_materialization: bool,
/// Whether to enable prometheus metrics
prometheus_metrics: bool,
}

async fn start_server(
Expand Down Expand Up @@ -1683,6 +1695,10 @@ async fn start_adapter(
builder = builder.allow_full_materialization();
}

if params.prometheus_metrics {
builder = builder.prometheus_metrics()
}

builder.start().await
}

Expand Down

0 comments on commit 3d6800b

Please sign in to comment.