From a435519e329d3771c56bc9d5dd65e6f5ca5a5333 Mon Sep 17 00:00:00 2001 From: Josh Carp Date: Fri, 13 Mar 2026 17:47:39 +0000 Subject: [PATCH] Further query perf updates for oximeter field lookups. Joins are expensive in clickhouse. In #9262, we dropped the number of joins in the field lookup query from the number of fields to the number of relevant field tables. However, this still leaves us with up to six field tables to join. In this patch, we do away with joins for the field lookup query entirely. Instead, we select the relevant rows from each field table, then combine them with UNION ALL, and use anyIf and GROUP BY to pivot from long format to wide. This speeds up the field query with indentical results, with greater speedups for timeseries that use more field tables. A timeseries whose labels are all strings will see no difference in performance; a series that has string, uuid, u8, u16, u32, and i16 labels (like the switch_port_control_data_link metrics) will return results much faster. --- oximeter/db/src/client/oxql.rs | 229 +++++++----------- .../db/src/oxql/ast/table_ops/filter/mod.rs | 2 +- oximeter/db/test-output/all-fields-query.sql | 46 ++++ 3 files changed, 139 insertions(+), 138 deletions(-) create mode 100644 oximeter/db/test-output/all-fields-query.sql diff --git a/oximeter/db/src/client/oxql.rs b/oximeter/db/src/client/oxql.rs index 296f9a35196..665e5fb51a4 100644 --- a/oximeter/db/src/client/oxql.rs +++ b/oximeter/db/src/client/oxql.rs @@ -28,7 +28,7 @@ use slog::Logger; use slog::debug; use slog::trace; use std::collections::BTreeMap; -use std::collections::HashMap; +use std::collections::BTreeSet; use std::time::Duration; use std::time::Instant; use uuid::Uuid; @@ -930,120 +930,96 @@ impl Client { preds: Option<&oxql::ast::table_ops::filter::Filter>, ) -> Result { // Filter down the fields to those which apply to this timeseries - // itself, and rewrite as a DB-safe WHERE clause. + // itself, and rewrite as a DB-safe clause. let preds_for_fields = preds .map(|p| Self::rewrite_predicate_for_fields(schema, p)) .transpose()? .flatten(); - let (already_has_where, mut query) = self.all_fields_query_raw(schema); + let mut query = self.all_fields_query_raw(schema); if let Some(preds) = preds_for_fields { - // If the raw field has only a single select query, then we've - // already added a "WHERE" clause. Simply tack these predicates onto - // that one. - if already_has_where { - query.push_str(" AND "); - } else { - query.push_str(" WHERE "); - } + query.push_str(" HAVING "); query.push_str(&preds); } Ok(query) } - // Build a reasonably efficient query to retrieve all fields for a given - // timeseries. Joins in ClickHouse are expensive, so aggregate all relevant - // fields from each relevant fields table in a single subquery, then join - // the results together. This results in n - 1 joins, where n is the number - // of relevant fields tables. Note that we may be able to improve - // performance in future ClickHouse versions, which have better support for - // Variant types, better support for the merge() table function, and faster - // joins. - fn all_fields_query_raw( - &self, - schema: &TimeseriesSchema, - ) -> (bool, String) { + // Build a query to retrieve all fields for a given timeseries. In + // ClickHouse, JOINs are slow, so we construct a query that doesn't use + // them. We identify all relevant field tables, and for each table, we + // select the relevant field values. We also select NULL for each other + // field type so that we can UNION ALL the results of our per-table + // subqueries. Then we pivot the unioned results to wide rows using anyIf. + fn all_fields_query_raw(&self, schema: &TimeseriesSchema) -> String { match schema.field_schema.len() { 0 => unreachable!(), _ => { - // Build a vector of top-level select expressions, as well as a - // map from fields to lists of subquery select expressions. - let mut top_selects: Vec = Vec::new(); - let mut select_map: HashMap> = - HashMap::new(); - for field_schema in schema.field_schema.iter() { - select_map - .entry(field_schema.field_type) - .or_insert_with(|| vec![String::from("timeseries_key")]) - .push(format!( - "anyIf(field_value, field_name = '{}') AS {}", - field_schema.name, field_schema.name - )); - top_selects.push(format!( - "{}_pivot.{} AS {}", - field_table_name(field_schema.field_type), - field_schema.name, - field_schema.name - )); - } + let field_types: BTreeSet = + schema.field_schema.iter().map(|f| f.field_type).collect(); + + // Build top-level SELECT columns. For each field, we use + // anyIf to extract the value from the appropriate type column. + // We can use anyIf to take the first matching value because a + // given timeseries key is always associated with the same set + // of fields, so all rows with a given (timeseries_key, + // field_name) will have the same field_value. + // + // We wrap the result in assumeNotNull() because the UNION + // ALL with NULL placeholders causes anyIf to return Nullable + // types, but we know fields will always have values for + // matching keys. + let mut top_selects: Vec = schema + .field_schema + .iter() + .map(|f| { + format!( + "assumeNotNull(anyIf({}, field_name = '{}')) AS \"{}\"", + field_table_name(f.field_type), + f.name, + f.name + ) + }) + .collect(); + top_selects.push(String::from("timeseries_key")); - // Sort field tables by number of columns, descending. - // ClickHouse recommends joining larger tables to smaller - // tables, and doesn't currently reorder joins automatically. - let mut field_types: Vec = - select_map.keys().cloned().collect(); - field_types.sort_by(|a, b| { - select_map[b] - .len() - .cmp(&select_map[a].len()) - .then(field_table_name(*a).cmp(&field_table_name(*b))) - }); + // Build UNION ALL subqueries, one per field type. Each + // subquery selects timeseries_key, field_name, and a value + // column for each relevant type (field_value for its own + // type, NULL for others). We emit NULLs so that we can UNION + // ALL the resulting subqueries. - // Build a map from field type to pivot subquery. We filter by - // timeseries_name, group by timeseries_key, and use anyIf to - // pivot fields to a wide table. We can use anyIf to take the - // first matching value because a given timeseries key is - // always associated with the same set of fields, so all rows - // with a given (timeseries_key, field_name) will have the same - // field_value. - let mut query_map: HashMap = - HashMap::new(); - for field_type in field_types.clone() { - let selects = &select_map[&field_type]; - let query = format!( - "( - SELECT - {select} - FROM {db_name}.{from} - WHERE timeseries_name = '{timeseries_name}' - GROUP BY timeseries_key - ) AS {subquery_name}_pivot", - select = selects.join(", "), - db_name = crate::DATABASE_NAME, - from = field_table_name(field_type), - timeseries_name = schema.timeseries_name, - subquery_name = field_table_name(field_type), - ); - query_map.insert(field_type, query); - } + let null_cols: Vec = field_types + .iter() + .map(|&field_type| { + format!("NULL AS {}", field_table_name(field_type)) + }) + .collect(); + let union_parts: Vec = field_types + .iter() + .enumerate() + .map(|(idx, &field_type)| { + let mut cols = null_cols.clone(); + cols[idx] = format!( + "field_value AS {}", + field_table_name(field_type) + ); + format!( + "SELECT timeseries_key, field_name, {} \ + FROM {}.{} \ + WHERE timeseries_name = '{}'", + cols.join(", "), + crate::DATABASE_NAME, + field_table_name(field_type), + schema.timeseries_name, + ) + }) + .collect(); - // Assemble the final query. - let mut from = query_map[&field_types[0]].clone(); - for field_type in field_types.iter().skip(1) { - from = format!( - "{from} JOIN {query} ON {source}_pivot.timeseries_key = {dest}_pivot.timeseries_key", - from = from, - query = query_map[field_type], - source = field_table_name(field_types[0]), - dest = field_table_name(*field_type), - ); - } - top_selects.push(format!( - "{}_pivot.timeseries_key AS timeseries_key", - field_table_name(field_types[0]) - )); - let query = - format!("SELECT {} FROM {}", top_selects.join(", "), from); - (false, query) + // Assemble final query. + format!( + "SELECT {} FROM ({}) GROUP BY timeseries_key", + top_selects.join(", "), + union_parts.join(" UNION ALL "), + ) } } } @@ -1189,6 +1165,7 @@ mod tests { QueryAuthzScope, chunk_consistent_key_groups_impl, }; use crate::oxql::ast::grammar::query_parser; + use crate::oxql::ast::table_ops::filter::Filter; use crate::{Client, DATABASE_TIMESTAMP_FORMAT, DbWrite}; use crate::{Metric, Target}; use chrono::{DateTime, NaiveDate, Utc}; @@ -1352,42 +1329,20 @@ mod tests { .await .unwrap() .unwrap(); - let query = ctx.client.all_fields_query(&schema, None).unwrap(); - let want = "SELECT - fields_i32_pivot.foo AS foo, - fields_u32_pivot.index AS index, - fields_string_pivot.name AS name, - fields_i32_pivot.timeseries_key AS timeseries_key - FROM - ( - SELECT - timeseries_key, - anyIf(field_value, field_name = 'foo') AS foo - FROM oximeter.fields_i32 - WHERE timeseries_name = 'some_target:some_metric' - GROUP BY timeseries_key - ) AS fields_i32_pivot - JOIN - ( - SELECT - timeseries_key, - anyIf(field_value, field_name = 'name') AS name - FROM oximeter.fields_string - WHERE timeseries_name = 'some_target:some_metric' - GROUP BY timeseries_key - ) AS fields_string_pivot ON fields_i32_pivot.timeseries_key = fields_string_pivot.timeseries_key - JOIN - ( - SELECT - timeseries_key, - anyIf(field_value, field_name = 'index') AS index - FROM oximeter.fields_u32 - WHERE timeseries_name = 'some_target:some_metric' - GROUP BY timeseries_key - ) AS fields_u32_pivot ON fields_i32_pivot.timeseries_key = fields_u32_pivot.timeseries_key"; - assert_eq!( - want.split_whitespace().collect::>().join(" "), - query.split_whitespace().collect::>().join(" ") + let filter: Filter = "index == 0".parse().unwrap(); + let query = + ctx.client.all_fields_query(&schema, Some(&filter)).unwrap(); + let formatted = sqlformat::format( + &query, + &sqlformat::QueryParams::None, + &sqlformat::FormatOptions { + uppercase: Some(true), + ..Default::default() + }, + ); + expectorate::assert_contents( + "test-output/all-fields-query.sql", + &formatted, ); ctx.cleanup_successful().await; @@ -1876,7 +1831,7 @@ mod tests { let rewritten = Client::rewrite_predicate_for_fields(&schema, &filt) .unwrap() .expect("Should have rewritten the field predicate"); - assert_eq!(rewritten, "NOT equals(f0, 0)"); + assert_eq!(rewritten, "NOT equals(\"f0\", 0)"); logctx.cleanup_successful(); } @@ -1900,7 +1855,7 @@ mod tests { assert_eq!( rewritten, format!( - "NOT greater(timestamp, '{}')", + "NOT greater(\"timestamp\", '{}')", now.format(DATABASE_TIMESTAMP_FORMAT) ) ); diff --git a/oximeter/db/src/oxql/ast/table_ops/filter/mod.rs b/oximeter/db/src/oxql/ast/table_ops/filter/mod.rs index aab59e39095..27281053a75 100644 --- a/oximeter/db/src/oxql/ast/table_ops/filter/mod.rs +++ b/oximeter/db/src/oxql/ast/table_ops/filter/mod.rs @@ -947,7 +947,7 @@ impl SimpleFilter { pub(crate) fn as_db_safe_string(&self) -> String { let expr = self.value.as_db_safe_string(); let fn_name = self.cmp.as_db_function_name(); - format!("{}({}, {})", fn_name, self.ident, expr) + format!("{}(\"{}\", {})", fn_name, self.ident, expr) } // Returns an array of bools, where true indicates the point should be kept. diff --git a/oximeter/db/test-output/all-fields-query.sql b/oximeter/db/test-output/all-fields-query.sql new file mode 100644 index 00000000000..6c91ef1e6de --- /dev/null +++ b/oximeter/db/test-output/all-fields-query.sql @@ -0,0 +1,46 @@ +SELECT + assumeNotNull(anyIf(fields_i32, field_name = 'foo')) AS "foo", + assumeNotNull(anyIf(fields_u32, field_name = 'index')) AS "index", + assumeNotNull(anyIf(fields_string, field_name = 'name')) AS "name", + timeseries_key +FROM + ( + SELECT + timeseries_key, + field_name, + field_value AS fields_string, + NULL AS fields_i32, + NULL AS fields_u32 + FROM + oximeter.fields_string + WHERE + timeseries_name = 'some_target:some_metric' + UNION + ALL + SELECT + timeseries_key, + field_name, + NULL AS fields_string, + field_value AS fields_i32, + NULL AS fields_u32 + FROM + oximeter.fields_i32 + WHERE + timeseries_name = 'some_target:some_metric' + UNION + ALL + SELECT + timeseries_key, + field_name, + NULL AS fields_string, + NULL AS fields_i32, + field_value AS fields_u32 + FROM + oximeter.fields_u32 + WHERE + timeseries_name = 'some_target:some_metric' + ) +GROUP BY + timeseries_key +HAVING + equals("index", 0) \ No newline at end of file