Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 92 additions & 137 deletions oximeter/db/src/client/oxql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use slog::Logger;
use slog::debug;
use slog::trace;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::BTreeSet;
use std::time::Duration;
use std::time::Instant;
use uuid::Uuid;
Expand Down Expand Up @@ -930,120 +930,96 @@ impl Client {
preds: Option<&oxql::ast::table_ops::filter::Filter>,
) -> Result<String, Error> {
// Filter down the fields to those which apply to this timeseries
// itself, and rewrite as a DB-safe WHERE clause.
// itself, and rewrite as a DB-safe clause.
let preds_for_fields = preds
.map(|p| Self::rewrite_predicate_for_fields(schema, p))
.transpose()?
.flatten();
let (already_has_where, mut query) = self.all_fields_query_raw(schema);
let mut query = self.all_fields_query_raw(schema);
if let Some(preds) = preds_for_fields {
// If the raw field has only a single select query, then we've
// already added a "WHERE" clause. Simply tack these predicates onto
// that one.
if already_has_where {
query.push_str(" AND ");
} else {
query.push_str(" WHERE ");
}
query.push_str(" HAVING ");
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it helpful to have a test checking this new syntax element? I always find it hard to build a SQL query programmatically without seeing the final output.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the existing test to use a predicate so that our expectorated test file includes the HAVING clause.

query.push_str(&preds);
}
Ok(query)
}

// Build a reasonably efficient query to retrieve all fields for a given
// timeseries. Joins in ClickHouse are expensive, so aggregate all relevant
// fields from each relevant fields table in a single subquery, then join
// the results together. This results in n - 1 joins, where n is the number
// of relevant fields tables. Note that we may be able to improve
// performance in future ClickHouse versions, which have better support for
// Variant types, better support for the merge() table function, and faster
// joins.
fn all_fields_query_raw(
&self,
schema: &TimeseriesSchema,
) -> (bool, String) {
// Build a query to retrieve all fields for a given timeseries. In
// ClickHouse, JOINs are slow, so we construct a query that doesn't use
// them. We identify all relevant field tables, and for each table, we
// select the relevant field values. We also select NULL for each other
// field type so that we can UNION ALL the results of our per-table
// subqueries. Then we pivot the unioned results to wide rows using anyIf.
fn all_fields_query_raw(&self, schema: &TimeseriesSchema) -> String {
match schema.field_schema.len() {
0 => unreachable!(),
_ => {
// Build a vector of top-level select expressions, as well as a
// map from fields to lists of subquery select expressions.
let mut top_selects: Vec<String> = Vec::new();
let mut select_map: HashMap<oximeter::FieldType, Vec<String>> =
HashMap::new();
for field_schema in schema.field_schema.iter() {
select_map
.entry(field_schema.field_type)
.or_insert_with(|| vec![String::from("timeseries_key")])
.push(format!(
"anyIf(field_value, field_name = '{}') AS {}",
field_schema.name, field_schema.name
));
top_selects.push(format!(
"{}_pivot.{} AS {}",
field_table_name(field_schema.field_type),
field_schema.name,
field_schema.name
));
}
let field_types: BTreeSet<oximeter::FieldType> =
schema.field_schema.iter().map(|f| f.field_type).collect();

// Build top-level SELECT columns. For each field, we use
// anyIf to extract the value from the appropriate type column.
// We can use anyIf to take the first matching value because a
// given timeseries key is always associated with the same set
// of fields, so all rows with a given (timeseries_key,
// field_name) will have the same field_value.
//
// We wrap the result in assumeNotNull() because the UNION
// ALL with NULL placeholders causes anyIf to return Nullable
// types, but we know fields will always have values for
// matching keys.
let mut top_selects: Vec<String> = schema
.field_schema
.iter()
.map(|f| {
format!(
"assumeNotNull(anyIf({}, field_name = '{}')) AS \"{}\"",
field_table_name(f.field_type),
f.name,
f.name
)
})
.collect();
top_selects.push(String::from("timeseries_key"));

// Sort field tables by number of columns, descending.
// ClickHouse recommends joining larger tables to smaller
// tables, and doesn't currently reorder joins automatically.
let mut field_types: Vec<oximeter::FieldType> =
select_map.keys().cloned().collect();
field_types.sort_by(|a, b| {
select_map[b]
.len()
.cmp(&select_map[a].len())
.then(field_table_name(*a).cmp(&field_table_name(*b)))
});
// Build UNION ALL subqueries, one per field type. Each
// subquery selects timeseries_key, field_name, and a value
// column for each relevant type (field_value for its own
// type, NULL for others). We emit NULLs so that we can UNION
// ALL the resulting subqueries.

// Build a map from field type to pivot subquery. We filter by
// timeseries_name, group by timeseries_key, and use anyIf to
// pivot fields to a wide table. We can use anyIf to take the
// first matching value because a given timeseries key is
// always associated with the same set of fields, so all rows
// with a given (timeseries_key, field_name) will have the same
// field_value.
let mut query_map: HashMap<oximeter::FieldType, String> =
HashMap::new();
for field_type in field_types.clone() {
let selects = &select_map[&field_type];
let query = format!(
"(
SELECT
{select}
FROM {db_name}.{from}
WHERE timeseries_name = '{timeseries_name}'
GROUP BY timeseries_key
) AS {subquery_name}_pivot",
select = selects.join(", "),
db_name = crate::DATABASE_NAME,
from = field_table_name(field_type),
timeseries_name = schema.timeseries_name,
subquery_name = field_table_name(field_type),
);
query_map.insert(field_type, query);
}
let null_cols: Vec<String> = field_types
.iter()
.map(|&field_type| {
format!("NULL AS {}", field_table_name(field_type))
})
.collect();
let union_parts: Vec<String> = field_types
.iter()
.enumerate()
.map(|(idx, &field_type)| {
let mut cols = null_cols.clone();
cols[idx] = format!(
"field_value AS {}",
field_table_name(field_type)
);
format!(
"SELECT timeseries_key, field_name, {} \
FROM {}.{} \
WHERE timeseries_name = '{}'",
cols.join(", "),
crate::DATABASE_NAME,
field_table_name(field_type),
schema.timeseries_name,
)
})
.collect();

// Assemble the final query.
let mut from = query_map[&field_types[0]].clone();
for field_type in field_types.iter().skip(1) {
from = format!(
"{from} JOIN {query} ON {source}_pivot.timeseries_key = {dest}_pivot.timeseries_key",
from = from,
query = query_map[field_type],
source = field_table_name(field_types[0]),
dest = field_table_name(*field_type),
);
}
top_selects.push(format!(
"{}_pivot.timeseries_key AS timeseries_key",
field_table_name(field_types[0])
));
let query =
format!("SELECT {} FROM {}", top_selects.join(", "), from);
(false, query)
// Assemble final query.
format!(
"SELECT {} FROM ({}) GROUP BY timeseries_key",
top_selects.join(", "),
union_parts.join(" UNION ALL "),
)
}
}
}
Expand Down Expand Up @@ -1189,6 +1165,7 @@ mod tests {
QueryAuthzScope, chunk_consistent_key_groups_impl,
};
use crate::oxql::ast::grammar::query_parser;
use crate::oxql::ast::table_ops::filter::Filter;
use crate::{Client, DATABASE_TIMESTAMP_FORMAT, DbWrite};
use crate::{Metric, Target};
use chrono::{DateTime, NaiveDate, Utc};
Expand Down Expand Up @@ -1352,42 +1329,20 @@ mod tests {
.await
.unwrap()
.unwrap();
let query = ctx.client.all_fields_query(&schema, None).unwrap();
let want = "SELECT
fields_i32_pivot.foo AS foo,
fields_u32_pivot.index AS index,
fields_string_pivot.name AS name,
fields_i32_pivot.timeseries_key AS timeseries_key
FROM
(
SELECT
timeseries_key,
anyIf(field_value, field_name = 'foo') AS foo
FROM oximeter.fields_i32
WHERE timeseries_name = 'some_target:some_metric'
GROUP BY timeseries_key
) AS fields_i32_pivot
JOIN
(
SELECT
timeseries_key,
anyIf(field_value, field_name = 'name') AS name
FROM oximeter.fields_string
WHERE timeseries_name = 'some_target:some_metric'
GROUP BY timeseries_key
) AS fields_string_pivot ON fields_i32_pivot.timeseries_key = fields_string_pivot.timeseries_key
JOIN
(
SELECT
timeseries_key,
anyIf(field_value, field_name = 'index') AS index
FROM oximeter.fields_u32
WHERE timeseries_name = 'some_target:some_metric'
GROUP BY timeseries_key
) AS fields_u32_pivot ON fields_i32_pivot.timeseries_key = fields_u32_pivot.timeseries_key";
assert_eq!(
want.split_whitespace().collect::<Vec<&str>>().join(" "),
query.split_whitespace().collect::<Vec<&str>>().join(" ")
let filter: Filter = "index == 0".parse().unwrap();
let query =
ctx.client.all_fields_query(&schema, Some(&filter)).unwrap();
let formatted = sqlformat::format(
&query,
&sqlformat::QueryParams::None,
&sqlformat::FormatOptions {
uppercase: Some(true),
..Default::default()
},
);
expectorate::assert_contents(
"test-output/all-fields-query.sql",
&formatted,
);

ctx.cleanup_successful().await;
Expand Down Expand Up @@ -1876,7 +1831,7 @@ mod tests {
let rewritten = Client::rewrite_predicate_for_fields(&schema, &filt)
.unwrap()
.expect("Should have rewritten the field predicate");
assert_eq!(rewritten, "NOT equals(f0, 0)");
assert_eq!(rewritten, "NOT equals(\"f0\", 0)");
logctx.cleanup_successful();
}

Expand All @@ -1900,7 +1855,7 @@ mod tests {
assert_eq!(
rewritten,
format!(
"NOT greater(timestamp, '{}')",
"NOT greater(\"timestamp\", '{}')",
now.format(DATABASE_TIMESTAMP_FORMAT)
)
);
Expand Down
2 changes: 1 addition & 1 deletion oximeter/db/src/oxql/ast/table_ops/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,7 @@ impl SimpleFilter {
pub(crate) fn as_db_safe_string(&self) -> String {
let expr = self.value.as_db_safe_string();
let fn_name = self.cmp.as_db_function_name();
format!("{}({}, {})", fn_name, self.ident, expr)
format!("{}(\"{}\", {})", fn_name, self.ident, expr)
}

// Returns an array of bools, where true indicates the point should be kept.
Expand Down
46 changes: 46 additions & 0 deletions oximeter/db/test-output/all-fields-query.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
SELECT
assumeNotNull(anyIf(fields_i32, field_name = 'foo')) AS "foo",
assumeNotNull(anyIf(fields_u32, field_name = 'index')) AS "index",
assumeNotNull(anyIf(fields_string, field_name = 'name')) AS "name",
timeseries_key
FROM
(
SELECT
timeseries_key,
field_name,
field_value AS fields_string,
NULL AS fields_i32,
NULL AS fields_u32
FROM
oximeter.fields_string
WHERE
timeseries_name = 'some_target:some_metric'
UNION
ALL
SELECT
timeseries_key,
field_name,
NULL AS fields_string,
field_value AS fields_i32,
NULL AS fields_u32
FROM
oximeter.fields_i32
WHERE
timeseries_name = 'some_target:some_metric'
UNION
ALL
SELECT
timeseries_key,
field_name,
NULL AS fields_string,
NULL AS fields_i32,
field_value AS fields_u32
FROM
oximeter.fields_u32
WHERE
timeseries_name = 'some_target:some_metric'
)
GROUP BY
timeseries_key
HAVING
equals("index", 0)
Loading