diff --git a/oximeter/db/src/client/oxql.rs b/oximeter/db/src/client/oxql.rs index 296f9a35196..665e5fb51a4 100644 --- a/oximeter/db/src/client/oxql.rs +++ b/oximeter/db/src/client/oxql.rs @@ -28,7 +28,7 @@ use slog::Logger; use slog::debug; use slog::trace; use std::collections::BTreeMap; -use std::collections::HashMap; +use std::collections::BTreeSet; use std::time::Duration; use std::time::Instant; use uuid::Uuid; @@ -930,120 +930,96 @@ impl Client { preds: Option<&oxql::ast::table_ops::filter::Filter>, ) -> Result { // Filter down the fields to those which apply to this timeseries - // itself, and rewrite as a DB-safe WHERE clause. + // itself, and rewrite as a DB-safe clause. let preds_for_fields = preds .map(|p| Self::rewrite_predicate_for_fields(schema, p)) .transpose()? .flatten(); - let (already_has_where, mut query) = self.all_fields_query_raw(schema); + let mut query = self.all_fields_query_raw(schema); if let Some(preds) = preds_for_fields { - // If the raw field has only a single select query, then we've - // already added a "WHERE" clause. Simply tack these predicates onto - // that one. - if already_has_where { - query.push_str(" AND "); - } else { - query.push_str(" WHERE "); - } + query.push_str(" HAVING "); query.push_str(&preds); } Ok(query) } - // Build a reasonably efficient query to retrieve all fields for a given - // timeseries. Joins in ClickHouse are expensive, so aggregate all relevant - // fields from each relevant fields table in a single subquery, then join - // the results together. This results in n - 1 joins, where n is the number - // of relevant fields tables. Note that we may be able to improve - // performance in future ClickHouse versions, which have better support for - // Variant types, better support for the merge() table function, and faster - // joins. - fn all_fields_query_raw( - &self, - schema: &TimeseriesSchema, - ) -> (bool, String) { + // Build a query to retrieve all fields for a given timeseries. In + // ClickHouse, JOINs are slow, so we construct a query that doesn't use + // them. We identify all relevant field tables, and for each table, we + // select the relevant field values. We also select NULL for each other + // field type so that we can UNION ALL the results of our per-table + // subqueries. Then we pivot the unioned results to wide rows using anyIf. + fn all_fields_query_raw(&self, schema: &TimeseriesSchema) -> String { match schema.field_schema.len() { 0 => unreachable!(), _ => { - // Build a vector of top-level select expressions, as well as a - // map from fields to lists of subquery select expressions. - let mut top_selects: Vec = Vec::new(); - let mut select_map: HashMap> = - HashMap::new(); - for field_schema in schema.field_schema.iter() { - select_map - .entry(field_schema.field_type) - .or_insert_with(|| vec![String::from("timeseries_key")]) - .push(format!( - "anyIf(field_value, field_name = '{}') AS {}", - field_schema.name, field_schema.name - )); - top_selects.push(format!( - "{}_pivot.{} AS {}", - field_table_name(field_schema.field_type), - field_schema.name, - field_schema.name - )); - } + let field_types: BTreeSet = + schema.field_schema.iter().map(|f| f.field_type).collect(); + + // Build top-level SELECT columns. For each field, we use + // anyIf to extract the value from the appropriate type column. + // We can use anyIf to take the first matching value because a + // given timeseries key is always associated with the same set + // of fields, so all rows with a given (timeseries_key, + // field_name) will have the same field_value. + // + // We wrap the result in assumeNotNull() because the UNION + // ALL with NULL placeholders causes anyIf to return Nullable + // types, but we know fields will always have values for + // matching keys. + let mut top_selects: Vec = schema + .field_schema + .iter() + .map(|f| { + format!( + "assumeNotNull(anyIf({}, field_name = '{}')) AS \"{}\"", + field_table_name(f.field_type), + f.name, + f.name + ) + }) + .collect(); + top_selects.push(String::from("timeseries_key")); - // Sort field tables by number of columns, descending. - // ClickHouse recommends joining larger tables to smaller - // tables, and doesn't currently reorder joins automatically. - let mut field_types: Vec = - select_map.keys().cloned().collect(); - field_types.sort_by(|a, b| { - select_map[b] - .len() - .cmp(&select_map[a].len()) - .then(field_table_name(*a).cmp(&field_table_name(*b))) - }); + // Build UNION ALL subqueries, one per field type. Each + // subquery selects timeseries_key, field_name, and a value + // column for each relevant type (field_value for its own + // type, NULL for others). We emit NULLs so that we can UNION + // ALL the resulting subqueries. - // Build a map from field type to pivot subquery. We filter by - // timeseries_name, group by timeseries_key, and use anyIf to - // pivot fields to a wide table. We can use anyIf to take the - // first matching value because a given timeseries key is - // always associated with the same set of fields, so all rows - // with a given (timeseries_key, field_name) will have the same - // field_value. - let mut query_map: HashMap = - HashMap::new(); - for field_type in field_types.clone() { - let selects = &select_map[&field_type]; - let query = format!( - "( - SELECT - {select} - FROM {db_name}.{from} - WHERE timeseries_name = '{timeseries_name}' - GROUP BY timeseries_key - ) AS {subquery_name}_pivot", - select = selects.join(", "), - db_name = crate::DATABASE_NAME, - from = field_table_name(field_type), - timeseries_name = schema.timeseries_name, - subquery_name = field_table_name(field_type), - ); - query_map.insert(field_type, query); - } + let null_cols: Vec = field_types + .iter() + .map(|&field_type| { + format!("NULL AS {}", field_table_name(field_type)) + }) + .collect(); + let union_parts: Vec = field_types + .iter() + .enumerate() + .map(|(idx, &field_type)| { + let mut cols = null_cols.clone(); + cols[idx] = format!( + "field_value AS {}", + field_table_name(field_type) + ); + format!( + "SELECT timeseries_key, field_name, {} \ + FROM {}.{} \ + WHERE timeseries_name = '{}'", + cols.join(", "), + crate::DATABASE_NAME, + field_table_name(field_type), + schema.timeseries_name, + ) + }) + .collect(); - // Assemble the final query. - let mut from = query_map[&field_types[0]].clone(); - for field_type in field_types.iter().skip(1) { - from = format!( - "{from} JOIN {query} ON {source}_pivot.timeseries_key = {dest}_pivot.timeseries_key", - from = from, - query = query_map[field_type], - source = field_table_name(field_types[0]), - dest = field_table_name(*field_type), - ); - } - top_selects.push(format!( - "{}_pivot.timeseries_key AS timeseries_key", - field_table_name(field_types[0]) - )); - let query = - format!("SELECT {} FROM {}", top_selects.join(", "), from); - (false, query) + // Assemble final query. + format!( + "SELECT {} FROM ({}) GROUP BY timeseries_key", + top_selects.join(", "), + union_parts.join(" UNION ALL "), + ) } } } @@ -1189,6 +1165,7 @@ mod tests { QueryAuthzScope, chunk_consistent_key_groups_impl, }; use crate::oxql::ast::grammar::query_parser; + use crate::oxql::ast::table_ops::filter::Filter; use crate::{Client, DATABASE_TIMESTAMP_FORMAT, DbWrite}; use crate::{Metric, Target}; use chrono::{DateTime, NaiveDate, Utc}; @@ -1352,42 +1329,20 @@ mod tests { .await .unwrap() .unwrap(); - let query = ctx.client.all_fields_query(&schema, None).unwrap(); - let want = "SELECT - fields_i32_pivot.foo AS foo, - fields_u32_pivot.index AS index, - fields_string_pivot.name AS name, - fields_i32_pivot.timeseries_key AS timeseries_key - FROM - ( - SELECT - timeseries_key, - anyIf(field_value, field_name = 'foo') AS foo - FROM oximeter.fields_i32 - WHERE timeseries_name = 'some_target:some_metric' - GROUP BY timeseries_key - ) AS fields_i32_pivot - JOIN - ( - SELECT - timeseries_key, - anyIf(field_value, field_name = 'name') AS name - FROM oximeter.fields_string - WHERE timeseries_name = 'some_target:some_metric' - GROUP BY timeseries_key - ) AS fields_string_pivot ON fields_i32_pivot.timeseries_key = fields_string_pivot.timeseries_key - JOIN - ( - SELECT - timeseries_key, - anyIf(field_value, field_name = 'index') AS index - FROM oximeter.fields_u32 - WHERE timeseries_name = 'some_target:some_metric' - GROUP BY timeseries_key - ) AS fields_u32_pivot ON fields_i32_pivot.timeseries_key = fields_u32_pivot.timeseries_key"; - assert_eq!( - want.split_whitespace().collect::>().join(" "), - query.split_whitespace().collect::>().join(" ") + let filter: Filter = "index == 0".parse().unwrap(); + let query = + ctx.client.all_fields_query(&schema, Some(&filter)).unwrap(); + let formatted = sqlformat::format( + &query, + &sqlformat::QueryParams::None, + &sqlformat::FormatOptions { + uppercase: Some(true), + ..Default::default() + }, + ); + expectorate::assert_contents( + "test-output/all-fields-query.sql", + &formatted, ); ctx.cleanup_successful().await; @@ -1876,7 +1831,7 @@ mod tests { let rewritten = Client::rewrite_predicate_for_fields(&schema, &filt) .unwrap() .expect("Should have rewritten the field predicate"); - assert_eq!(rewritten, "NOT equals(f0, 0)"); + assert_eq!(rewritten, "NOT equals(\"f0\", 0)"); logctx.cleanup_successful(); } @@ -1900,7 +1855,7 @@ mod tests { assert_eq!( rewritten, format!( - "NOT greater(timestamp, '{}')", + "NOT greater(\"timestamp\", '{}')", now.format(DATABASE_TIMESTAMP_FORMAT) ) ); diff --git a/oximeter/db/src/oxql/ast/table_ops/filter/mod.rs b/oximeter/db/src/oxql/ast/table_ops/filter/mod.rs index aab59e39095..27281053a75 100644 --- a/oximeter/db/src/oxql/ast/table_ops/filter/mod.rs +++ b/oximeter/db/src/oxql/ast/table_ops/filter/mod.rs @@ -947,7 +947,7 @@ impl SimpleFilter { pub(crate) fn as_db_safe_string(&self) -> String { let expr = self.value.as_db_safe_string(); let fn_name = self.cmp.as_db_function_name(); - format!("{}({}, {})", fn_name, self.ident, expr) + format!("{}(\"{}\", {})", fn_name, self.ident, expr) } // Returns an array of bools, where true indicates the point should be kept. diff --git a/oximeter/db/test-output/all-fields-query.sql b/oximeter/db/test-output/all-fields-query.sql new file mode 100644 index 00000000000..6c91ef1e6de --- /dev/null +++ b/oximeter/db/test-output/all-fields-query.sql @@ -0,0 +1,46 @@ +SELECT + assumeNotNull(anyIf(fields_i32, field_name = 'foo')) AS "foo", + assumeNotNull(anyIf(fields_u32, field_name = 'index')) AS "index", + assumeNotNull(anyIf(fields_string, field_name = 'name')) AS "name", + timeseries_key +FROM + ( + SELECT + timeseries_key, + field_name, + field_value AS fields_string, + NULL AS fields_i32, + NULL AS fields_u32 + FROM + oximeter.fields_string + WHERE + timeseries_name = 'some_target:some_metric' + UNION + ALL + SELECT + timeseries_key, + field_name, + NULL AS fields_string, + field_value AS fields_i32, + NULL AS fields_u32 + FROM + oximeter.fields_i32 + WHERE + timeseries_name = 'some_target:some_metric' + UNION + ALL + SELECT + timeseries_key, + field_name, + NULL AS fields_string, + NULL AS fields_i32, + field_value AS fields_u32 + FROM + oximeter.fields_u32 + WHERE + timeseries_name = 'some_target:some_metric' + ) +GROUP BY + timeseries_key +HAVING + equals("index", 0) \ No newline at end of file