Skip to content

Commit

Permalink
adapter: Replace client command's latencies with counts
Browse files Browse the repository at this point in the history
SHOW PROXIED QUERIES and SHOW CACHES were recently updated to output
latency percentiles of the queries executed. As we are piggy-
backing off our prometheus metrics for capturing and in-memory
storage, we are bound byt the implementation details of prometheus
histogramws (Distributions). Those Distributions only report data
for the last 60 seconds, by default. The impact to users is if
they execxute a query, dump SHOW CACHES, and see some latency
information, if they simply wait > 60 seconds and issue SHOW
CACHES again, the latencies get "reset to zero".

As we are using this latency information in SHOW CACHES as more
of a Day 1 experience (getting a user to try out and experience
ReadySet), the current "resetting" is a bit confusing. Hence,
this patch replaces the latency percentiles with a simple incrementing
counter. This shows enough information to users to indicate that
ReadySet is indeed working and handling queries.

Fixes: REA-3540
Release-Note-Core: Replace latency histograms in SHOW PROXIED QUERIES
  and SHOW CACHES with a simple counter.
Change-Id: Ic9623ba08f1503491ed283766a0492b2b6e58e37
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/6183
Tested-by: Buildkite CI
Reviewed-by: Luke Osborne <luke@readyset.io>
  • Loading branch information
jasobrown-rs committed Oct 10, 2023
1 parent 132dafe commit 4ee8dba
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 88 deletions.
43 changes: 9 additions & 34 deletions readyset-adapter/src/backend.rs
Expand Up @@ -1748,17 +1748,9 @@ where
}

let select_schema = if let Some(handle) = self.metrics_handle.as_mut() {
// Must snapshot histograms to get the latest metrics
handle.snapshot_histograms();
create_dummy_schema!(
"query id",
"proxied query",
"readyset supported",
"count",
"p50 (ms)",
"p90 (ms)",
"p99 (ms)"
)
// Must snapshot to get the latest metrics
handle.snapshot_counters(readyset_client_metrics::DatabaseType::MySql);
create_dummy_schema!("query id", "proxied query", "readyset supported", "count")
} else {
create_dummy_schema!("query id", "proxied query", "readyset supported")
};
Expand All @@ -1785,16 +1777,9 @@ where

// Append metrics if we have them
if let Some(handle) = self.metrics_handle.as_ref() {
let MetricsSummary {
sample_count,
p50_us,
p90_us,
p99_us,
} = handle.metrics_summary(id.to_string()).unwrap_or_default();
let MetricsSummary { sample_count } =
handle.metrics_summary(id.to_string()).unwrap_or_default();
row.push(DfValue::from(format!("{sample_count}")));
row.push(DfValue::from(format!("{:.3}", 1000.0 * p50_us)));
row.push(DfValue::from(format!("{:.3}", 1000.0 * p90_us)));
row.push(DfValue::from(format!("{:.3}", 1000.0 * p99_us)));
}

row
Expand Down Expand Up @@ -1831,16 +1816,13 @@ where

let select_schema = if let Some(handle) = self.metrics_handle.as_mut() {
// Must snapshot histograms to get the latest metrics
handle.snapshot_histograms();
handle.snapshot_counters(readyset_client_metrics::DatabaseType::ReadySet);
create_dummy_schema!(
"query id",
"cache name",
"query text",
"fallback behavior",
"count",
"p50 (ms)",
"p90 (ms)",
"p99 (ms)"
"count"
)
} else {
create_dummy_schema!("query id", "cache name", "query text", "fallback behavior")
Expand All @@ -1867,16 +1849,9 @@ where

// Append metrics if we have them
if let Some(handle) = self.metrics_handle.as_ref() {
let MetricsSummary {
sample_count,
p50_us,
p90_us,
p99_us,
} = handle.metrics_summary(id.to_string()).unwrap_or_default();
let MetricsSummary { sample_count } =
handle.metrics_summary(id.to_string()).unwrap_or_default();
row.push(DfValue::from(format!("{sample_count}")));
row.push(DfValue::from(format!("{:.3}", 1000.0 * p50_us)));
row.push(DfValue::from(format!("{:.3}", 1000.0 * p90_us)));
row.push(DfValue::from(format!("{:.3}", 1000.0 * p99_us)));
}

results.push(row);
Expand Down
67 changes: 30 additions & 37 deletions readyset-adapter/src/metrics_handle.rs
@@ -1,28 +1,24 @@
use std::collections::HashMap;

use indexmap::IndexMap;
use metrics::SharedString;
use metrics_exporter_prometheus::formatting::{sanitize_label_key, sanitize_label_value};
use metrics_exporter_prometheus::{Distribution, PrometheusHandle};
use metrics_util::Summary;
use quanta::Instant;
// adding an alias to disambiguate vs readyset_client_metrics::recorded
use readyset_client::metrics::recorded as client_recorded;
use readyset_client_metrics::recorded::QUERY_LOG_EXECUTION_TIME;
use readyset_client_metrics::recorded::QUERY_LOG_EXECUTION_COUNT;
use readyset_client_metrics::DatabaseType;
use readyset_data::TimestampTz;

#[derive(Debug, Default, Clone)]
pub struct MetricsSummary {
// i64 because Postgres doesn't have unsigned ints
pub sample_count: i64,
pub p50_us: f64,
pub p90_us: f64,
pub p99_us: f64,
pub sample_count: u64,
}

#[derive(Clone)]
pub struct MetricsHandle {
inner: PrometheusHandle,
snapshot: Option<HashMap<String, Summary>>,
snapshot: Option<HashMap<String, u64>>,
}

impl MetricsHandle {
Expand Down Expand Up @@ -59,54 +55,51 @@ impl MetricsHandle {
self.inner.distributions(filter)
}

/// Clone a snapshot of all QUERY_LOG_EXECUTION_TIME histograms.
pub fn snapshot_histograms(&mut self) {
/// Clone a snapshot of all QUERY_LOG_EXECUTION_COUNT counters.
pub fn snapshot_counters(&mut self, database_type: DatabaseType) {
fn filter(key: &str) -> bool {
key == QUERY_LOG_EXECUTION_TIME
key == QUERY_LOG_EXECUTION_COUNT
}

let histograms = self
.histograms(Some(filter))
.get(QUERY_LOG_EXECUTION_TIME)
let db_type = SharedString::from(database_type).to_string();

let counters = self
.counters(Some(filter))
.get(QUERY_LOG_EXECUTION_COUNT)
.cloned()
.map(|h| {
h.into_iter()
.filter_map(|(k, dist)| {
k.into_iter()
.find(|k| k.starts_with("query_id"))
.and_then(|k| {
let summary = match dist {
Distribution::Summary(summary, _, _) => {
Some(summary.snapshot(Instant::now()))
}
_ => None,
};
summary.map(|s| (k, s))
})
.filter_map(|(k, count)| {
let mut query_id_tag = None;
for tag in k {
if tag.starts_with("query_id") {
query_id_tag = Some(tag);
} else if tag.starts_with("database_type") && !tag.contains(&db_type) {
return None;
}
}

query_id_tag.map(|k| (k, count))
})
.collect::<HashMap<_, _>>()
});
self.snapshot = histograms;
self.snapshot = counters;
}

/// Return the count (number of samples), 0.5, 0.9 and 0.99 quantiles for the query specified by
/// `query_id`.
/// Returns the execution count query specified by `query_id`.
///
/// NOTE: Quantiles are queried from the last snapshot obtained by calling
/// [`Self::snapshot_histograms`]
/// NOTE: Values are queried from the last snapshot obtained by calling
/// [`Self::snapshot_counters`]
pub fn metrics_summary(&self, query_id: String) -> Option<MetricsSummary> {
let label = format!(
"{}=\"{}\"",
sanitize_label_key("query_id"),
sanitize_label_value(&query_id)
);
let summary = self.snapshot.as_ref()?.get(&label)?;
let summary = self.snapshot.as_ref()?.get(&label).or(Some(&0))?;

Some(MetricsSummary {
sample_count: summary.count().try_into().unwrap_or_default(),
p50_us: summary.quantile(0.5).unwrap_or_default(),
p90_us: summary.quantile(0.90).unwrap_or_default(),
p99_us: summary.quantile(0.99).unwrap_or_default(),
sample_count: *summary,
})
}

Expand Down
1 change: 1 addition & 0 deletions readyset-client-metrics/src/recorded.rs
Expand Up @@ -12,6 +12,7 @@
///
/// [`DatabaseType`]: crate::DatabaseType
pub const QUERY_LOG_EXECUTION_TIME: &str = "readyset_query_log_execution_time";
pub const QUERY_LOG_EXECUTION_COUNT: &str = "readyset_query_log_execution_count";

/// Histogram: The time in seconds that the database spent executing a
/// query.
Expand Down
19 changes: 2 additions & 17 deletions readyset-clustertest/src/readyset_mysql.rs
Expand Up @@ -1776,7 +1776,7 @@ async fn show_query_metrics() {
}

// Check `SHOW PROXIED QUERIES`
let proxied_result: Vec<(String, String, String, String, String, String, String)> = adapter
let proxied_result: Vec<(String, String, String, String)> = adapter
.as_mysql_conn()
.unwrap()
.query(r"SHOW PROXIED QUERIES")
Expand All @@ -1785,22 +1785,10 @@ async fn show_query_metrics() {

// Assert that we get a non-zero value for the metrics
assert!(&proxied_result[0].3 != "0");
assert!(&proxied_result[0].4 != "0.0");
assert!(&proxied_result[0].5 != "0.0");
assert!(&proxied_result[0].6 != "0.0");

// Check `SHOW CACHES`
#[allow(clippy::type_complexity)]
let caches_result: Vec<(
String,
String,
String,
String,
String,
String,
String,
String,
)> = adapter
let caches_result: Vec<(String, String, String, String, String)> = adapter
.as_mysql_conn()
.unwrap()
.query(r"SHOW CACHES")
Expand All @@ -1809,7 +1797,4 @@ async fn show_query_metrics() {

// Assert that we get a non-zero value for the metrics
assert!(&caches_result[0].3 != "0");
assert!(&caches_result[0].4 != "0.0");
assert!(&caches_result[0].5 != "0.0");
assert!(&caches_result[0].6 != "0.0");
}
54 changes: 54 additions & 0 deletions readyset/src/query_logger.rs
Expand Up @@ -29,6 +29,7 @@ struct QueryMetrics {
cache_misses: Counter,
cache_keys_missed: Counter,
histograms: BTreeMap<(EventType, SqlQueryType), QueryHistograms>,
counters: BTreeMap<(EventType, SqlQueryType), QueryCounters>,
}

#[derive(Default)]
Expand All @@ -38,6 +39,13 @@ struct QueryHistograms {
readyset_exe_time: Option<Histogram>,
}

// this counter is for use in Day 1, demo mode
#[derive(Default)]
struct QueryCounters {
upstream_exe_count: Option<Counter>,
readyset_exe_count: Option<Counter>,
}

impl QueryMetrics {
fn parse_histogram(&mut self, kind: (EventType, SqlQueryType)) -> &mut Histogram {
self.histograms
Expand Down Expand Up @@ -100,6 +108,48 @@ impl QueryMetrics {
register_histogram!(recorded::QUERY_LOG_EXECUTION_TIME, &labels)
})
}

fn readyset_counter(&mut self, kind: (EventType, SqlQueryType)) -> &mut Counter {
self.counters
.entry(kind)
.or_default()
.readyset_exe_count
.get_or_insert_with(|| {
let mut labels = vec![
("query", self.query.clone()),
("event_type", SharedString::from(kind.0)),
("query_type", SharedString::from(kind.1)),
("database_type", SharedString::from(DatabaseType::ReadySet)),
];

if let Some(id) = &self.query_id {
labels.push(("query_id", id.clone()));
}

register_counter!(recorded::QUERY_LOG_EXECUTION_COUNT, &labels)
})
}

fn upstream_counter(&mut self, kind: (EventType, SqlQueryType)) -> &mut Counter {
self.counters
.entry(kind)
.or_default()
.upstream_exe_count
.get_or_insert_with(|| {
let mut labels = vec![
("query", self.query.clone()),
("event_type", SharedString::from(kind.0)),
("query_type", SharedString::from(kind.1)),
("database_type", SharedString::from(DatabaseType::MySql)),
];

if let Some(id) = &self.query_id {
labels.push(("query_id", id.clone()));
}

register_counter!(recorded::QUERY_LOG_EXECUTION_COUNT, &labels)
})
}
}

impl QueryLogger {
Expand Down Expand Up @@ -143,6 +193,7 @@ impl QueryLogger {
query: query_string,
query_id: Some(query_id),
histograms: BTreeMap::new(),
counters: BTreeMap::new(),
}
})
}
Expand All @@ -169,6 +220,7 @@ impl QueryLogger {
query: query_string,
query_id: None,
histograms: BTreeMap::new(),
counters: BTreeMap::new(),
}
})
}
Expand Down Expand Up @@ -262,12 +314,14 @@ impl QueryLogger {
metrics
.readyset_histogram((event.event, event.sql_type))
.record(duration);
metrics.readyset_counter((event.event, event.sql_type)).increment(1);
}

if let Some(duration) = event.upstream_duration {
metrics
.upstream_histogram((event.event, event.sql_type))
.record(duration);
metrics.upstream_counter((event.event, event.sql_type)).increment(1);
}
}
}
Expand Down

0 comments on commit 4ee8dba

Please sign in to comment.