diff --git a/sidemantic/adapters/cube.py b/sidemantic/adapters/cube.py index d22afd45..ae5a7a3a 100644 --- a/sidemantic/adapters/cube.py +++ b/sidemantic/adapters/cube.py @@ -20,7 +20,9 @@ def _normalize_cube_sql(sql: str | None, cube_name: str | None = None) -> str | - ${CUBE} -> {model} placeholder - ${cube_name} -> {model} placeholder - {CUBE} -> {model} placeholder (variant without dollar sign) - - ${measure_ref} references are preserved as-is (for derived metrics) + + Note: ${measure_ref} references are handled separately in _parse_measure() + for derived metrics. Args: sql: SQL expression string or None @@ -99,6 +101,38 @@ def _parse_file(self, file_path: Path, graph: SemanticGraph) -> None: if model: graph.add_model(model) + def _extract_fk_from_join_sql(self, join_sql: str, relationship_type: str, join_name: str) -> str | None: + """Extract foreign key column from Cube join SQL. + + Parses join SQL to extract the foreign key column name based on relationship type: + - many_to_one: Extract from ${CUBE}.column (e.g., "${CUBE}.company_id = ${companies.id}" -> "company_id") + - one_to_many: Extract from ${join_name.column} (e.g., "${CUBE}.id = ${project_assignments.project_id}" -> "project_id") + + Args: + join_sql: Join SQL expression from Cube definition + relationship_type: Type of relationship (many_to_one or one_to_many) + join_name: Name of the joined model + + Returns: + Foreign key column name, or None if parsing fails + """ + import re + + if relationship_type == "one_to_many": + # For one_to_many, extract from ${join_name.column} + # Example: "${CUBE}.id = ${project_assignments.project_id}" -> "project_id" + match = re.search(rf"\$\{{{re.escape(join_name)}\.(\w+)\}}", join_sql) + if match: + return match.group(1) + else: + # For many_to_one (default), extract from ${CUBE}.column + # Example: "${CUBE}.company_id = ${companies.id}" -> "company_id" + match = re.search(r"\$\{CUBE\}\.(\w+)", join_sql) + if match: + return match.group(1) + + return None + def _parse_cube(self, cube_def: dict) -> Model | None: """Parse a Cube definition into a Model. @@ -161,7 +195,15 @@ def _parse_cube(self, cube_def: dict) -> Model | None: if join_name: # Get relationship type from join definition, default to many_to_one rel_type = join_def.get("relationship", "many_to_one") - relationships.append(Relationship(name=join_name, type=rel_type, foreign_key=f"{join_name}_id")) + + # Extract foreign key from join SQL, fallback to convention + join_sql = join_def.get("sql", "") + fk_column = self._extract_fk_from_join_sql(join_sql, rel_type, join_name) + if not fk_column: + # Fallback to conventional naming if parsing fails + fk_column = f"{join_name}_id" + + relationships.append(Relationship(name=join_name, type=rel_type, foreign_key=fk_column)) # Parse pre-aggregations (handle None from empty YAML section) pre_aggregations = [] @@ -236,6 +278,8 @@ def _parse_measure(self, measure_def: dict, cube_name: str) -> Metric | None: Returns: Measure instance or None """ + import re + name = measure_def.get("name") if not name: return None @@ -281,11 +325,55 @@ def _parse_measure(self, measure_def: dict, cube_name: str) -> Metric | None: # Normalize SQL to replace ${CUBE}/{CUBE} with {model} measure_sql = _normalize_cube_sql(measure_def.get("sql"), cube_name) + # Convert ${measure_name} references to model_name.measure_name format + # This is needed for derived metrics that reference other measures + numerator = None + denominator = None + if measure_sql and metric_type == "derived": + # Check if this is a simple ratio pattern: ${measure1} / ${measure2} + # This is a common pattern in Cube for ratio metrics + ratio_pattern = ( + r"^\s*\$\{(\w+)\}(?:::[\w\s]+)?\s*/\s*(?:NULLIF\()?\$\{(\w+)\}(?:::[\w\s]+)?(?:,\s*0\))?\s*$" + ) + ratio_match = re.match(ratio_pattern, measure_sql, re.IGNORECASE) + + if ratio_match: + # This is a simple ratio - convert to ratio metric type + num_measure = ratio_match.group(1) + denom_measure = ratio_match.group(2) + metric_type = "ratio" + numerator = f"{cube_name}.{num_measure}" + denominator = f"{cube_name}.{denom_measure}" + measure_sql = None # Ratio metrics don't use sql field + else: + # Check if SQL contains inline aggregations (COUNT, SUM, AVG, etc.) + # These are "SQL expression metrics" that already contain aggregation + has_inline_agg = any(agg in measure_sql.upper() for agg in ["COUNT(", "SUM(", "AVG(", "MIN(", "MAX("]) + + if has_inline_agg: + # This is a SQL expression metric with inline aggregations + # Don't try to replace measure references - use SQL as-is + # Set agg=None to signal this is a complete SQL expression + agg_type = None + else: + # Complex derived metric - replace measure references + def replace_measure_ref(match): + measure_ref = match.group(1) + # Don't replace if it's already been normalized to {model} + if measure_ref == "model": + return "{model}" + # Convert ${measure_name} to cube_name.measure_name + return f"{cube_name}.{measure_ref}" + + measure_sql = re.sub(r"\$\{(\w+)\}", replace_measure_ref, measure_sql) + return Metric( name=name, type=metric_type, agg=agg_type, sql=measure_sql, + numerator=numerator, + denominator=denominator, window=window, filters=filters if filters else None, description=measure_def.get("description"), @@ -489,10 +577,15 @@ def _export_cube(self, model: Model, graph: SemanticGraph) -> dict: # Handle different metric types if measure.type == "ratio": - # Ratio metrics become calculated measures + # Ratio metrics become calculated measures with ${measure} references measure_def["type"] = "number" if measure.numerator and measure.denominator: - measure_def["sql"] = f"{measure.numerator} / NULLIF({measure.denominator}, 0)" + # Convert model.measure to ${measure} format for Cube + num_ref = measure.numerator.split(".")[-1] if "." in measure.numerator else measure.numerator + denom_ref = ( + measure.denominator.split(".")[-1] if "." in measure.denominator else measure.denominator + ) + measure_def["sql"] = f"${{{num_ref}}}::float / NULLIF(${{{denom_ref}}}, 0)" elif measure.type == "derived": # Derived metrics become calculated measures measure_def["type"] = "number" diff --git a/sidemantic/sql/generator.py b/sidemantic/sql/generator.py index 0dc3751c..7f12373e 100644 --- a/sidemantic/sql/generator.py +++ b/sidemantic/sql/generator.py @@ -260,6 +260,20 @@ def metric_needs_window(m): # Find all models needed for the query model_names = self._find_required_models(metrics, dimensions, filters) + # Check if we need symmetric aggregation (pre-aggregation approach) + # This is needed when metrics come from different models at different join levels + if self._needs_preaggregation_for_fanout(metrics, dimensions): + return self._generate_with_preaggregation( + metrics=metrics, + dimensions=dimensions, + filters=filters, + segments=segments, + order_by=order_by, + limit=limit, + offset=offset, + aliases=aliases, + ) + # Try to use pre-aggregation if enabled (single model queries only) if use_preaggregations and len(model_names) == 1 and not ungrouped: preagg_sql = self._try_use_preaggregation( @@ -554,13 +568,13 @@ def _classify_filters_for_pushdown( return pushdown_filters, main_query_filters def _extract_metric_filter_columns(self, metrics: list[str]) -> dict[str, set[str]]: - """Extract columns referenced in metric-level filters. + """Extract columns referenced in metric-level filters and SQL expressions. Args: metrics: List of metric references (e.g., ["orders.revenue", "bookings.gross_value"]) Returns: - Dict mapping model_name -> set of column names needed for metric filters + Dict mapping model_name -> set of column names needed for metric filters and SQL expressions """ columns_by_model: dict[str, set[str]] = {} @@ -571,14 +585,30 @@ def _extract_metric_filter_columns(self, metrics: list[str]) -> dict[str, set[st model = self.graph.get_model(model_name) if model: measure = model.get_metric(measure_name) - if measure and measure.filters: + if measure: if model_name not in columns_by_model: columns_by_model[model_name] = set() - for f in measure.filters: + + # Extract columns from metric filters + if measure.filters: + for f in measure.filters: + # Replace {model} placeholder for parsing + aliased_filter = f.replace("{model}", f"{model_name}_cte") + try: + parsed = sqlglot.parse_one(aliased_filter, dialect=self.dialect) + for col in parsed.find_all(exp.Column): + if col.table and col.table.replace("_cte", "") == model_name: + columns_by_model[model_name].add(col.name) + except Exception: + pass + + # Extract columns from SQL expression metrics (derived metrics with inline SQL) + if measure.type == "derived" and measure.sql and not measure.agg: + # This is a SQL expression metric - extract column references # Replace {model} placeholder for parsing - aliased_filter = f.replace("{model}", f"{model_name}_cte") + aliased_sql = measure.sql.replace("{model}", f"{model_name}_cte") try: - parsed = sqlglot.parse_one(aliased_filter, dialect=self.dialect) + parsed = sqlglot.parse_one(aliased_sql, dialect=self.dialect) for col in parsed.find_all(exp.Column): if col.table and col.table.replace("_cte", "") == model_name: columns_by_model[model_name].add(col.name) @@ -809,8 +839,10 @@ def collect_measures_from_metric(metric_ref: str, visited: set[str] | None = Non # It's for this model - check if it's a derived measure measure = model.get_metric(measure_name) if measure: - if measure.type == "derived" or (not measure.type and not measure.agg and measure.sql): - # Derived measure - get its dependencies + if measure.type in ("derived", "ratio") or ( + not measure.type and not measure.agg and measure.sql + ): + # Derived/ratio measure - get its dependencies for dep in measure.get_dependencies(self.graph, ref_model_name): collect_measures_from_metric(dep, visited) elif measure.agg: @@ -824,8 +856,8 @@ def collect_measures_from_metric(metric_ref: str, visited: set[str] | None = Non # First check if it's a measure on the current model measure = model.get_metric(metric_ref) if measure: - if measure.type == "derived" or (not measure.type and not measure.agg and measure.sql): - # Derived measure - get its dependencies + if measure.type in ("derived", "ratio") or (not measure.type and not measure.agg and measure.sql): + # Derived/ratio measure - get its dependencies for dep in measure.get_dependencies(self.graph, model_name): collect_measures_from_metric(dep, visited) elif measure.agg: @@ -864,12 +896,17 @@ def collect_measures_from_metric(metric_ref: str, visited: set[str] | None = Non filter_conditions = [] for filter_str in measure.filters: # Replace {model} with nothing since we're in the CTE selecting from raw table - filter_sql = filter_str.replace("{model}.", "") + filter_sql = filter_str.replace("{model}.", "").replace("{model}", "") filter_conditions.append(filter_sql) if filter_conditions: filter_sql = " AND ".join(filter_conditions) - measure_sql = f"CASE WHEN {filter_sql} THEN {base_sql} END" + # For count measures, return 1 if condition met, else NULL + # COUNT counts non-NULL values, so we need NULL to exclude non-matching rows + if measure.agg == "count": + measure_sql = f"CASE WHEN {filter_sql} THEN 1 ELSE NULL END" + else: + measure_sql = f"CASE WHEN {filter_sql} THEN {base_sql} ELSE NULL END" else: measure_sql = base_sql else: @@ -958,6 +995,253 @@ def _has_fanout_joins(self, base_model_name: str, other_models: list[str]) -> di return needs_symmetric + def _needs_preaggregation_for_fanout(self, metrics: list[str], dimensions: list[str]) -> bool: + """Determine if pre-aggregation is needed to avoid fan-out. + + Pre-aggregation is needed when: + 1. Metrics come from multiple different models + 2. Those models are at different levels in the join chain + 3. A join between them would cause one model's metrics to be over-counted + + For example: employees.total_salary + departments.total_budget by companies.name + The join path is: companies -> departments -> employees + When employees join to departments, each department row is replicated per employee, + causing department budgets to be summed multiple times. + + Args: + metrics: List of metric references (e.g., ["employees.total_salary", "departments.total_budget"]) + dimensions: List of dimension references (e.g., ["companies.name"]) + + Returns: + True if pre-aggregation is needed + """ + if not metrics or len(metrics) < 2: + return False + + # Get unique metric models + metric_models = set() + for metric_ref in metrics: + if "." in metric_ref: + model_name = metric_ref.split(".")[0] + metric_models.add(model_name) + + if len(metric_models) < 2: + return False + + # Check if any pair of metric models would cause fan-out + # Fan-out occurs when model A joins to model B via a path that includes + # a many_to_one relationship from B's perspective (one_to_many from A's) + metric_model_list = list(metric_models) + for i, model_a in enumerate(metric_model_list): + for model_b in metric_model_list[i + 1 :]: + try: + # Check path from A to B + join_path = self.graph.find_relationship_path(model_a, model_b) + if join_path: + # If any hop is many_to_one (from A's perspective), model_a metrics + # would be replicated when joining to model_b + for jp in join_path: + if jp.relationship == "many_to_one": + # model_a is on the "many" side, so its rows fan out + # when we aggregate model_b metrics + return True + + # Check reverse path + join_path_reverse = self.graph.find_relationship_path(model_b, model_a) + if join_path_reverse: + for jp in join_path_reverse: + if jp.relationship == "many_to_one": + return True + + except (ValueError, KeyError): + pass + + return False + + def _generate_with_preaggregation( + self, + metrics: list[str], + dimensions: list[str], + filters: list[str] | None = None, + segments: list[str] | None = None, + order_by: list[str] | None = None, + limit: int | None = None, + offset: int | None = None, + aliases: dict[str, str] | None = None, + ) -> str: + """Generate SQL using pre-aggregation to avoid fan-out. + + This generates separate queries for each metric model, pre-aggregated + to the dimension grain, then joins them together. + + Args: + metrics: List of metric references + dimensions: List of dimension references + filters: List of filter expressions + segments: List of segment references + order_by: List of fields to order by + limit: Maximum number of rows + offset: Number of rows to skip + aliases: Custom aliases for fields + + Returns: + SQL query string + """ + aliases = aliases or {} + parsed_dims = self._parse_dimension_refs(dimensions) + + # Group metrics by their model + metrics_by_model: dict[str, list[str]] = {} + for metric_ref in metrics: + if "." in metric_ref: + model_name = metric_ref.split(".")[0] + if model_name not in metrics_by_model: + metrics_by_model[model_name] = [] + metrics_by_model[model_name].append(metric_ref) + + if len(metrics_by_model) < 2: + # Shouldn't happen, but fall back to regular generation + return self.generate( + metrics=metrics, + dimensions=dimensions, + filters=filters, + segments=segments, + order_by=order_by, + limit=limit, + offset=offset, + aliases=aliases, + ) + + # Resolve segments to SQL filters + segment_filters = self._resolve_segments(segments or []) + all_filters = (filters or []) + segment_filters + + # Generate a pre-aggregated CTE for each metric model + preagg_ctes = [] + cte_names = [] + + for model_name, model_metrics in metrics_by_model.items(): + cte_name = f"{model_name}_preagg" + cte_names.append(cte_name) + + # Generate sub-query for this model's metrics at the dimension grain + # We call generate() recursively but it won't trigger pre-aggregation + # again because each sub-query has metrics from only one model + sub_query = self.generate( + metrics=model_metrics, + dimensions=dimensions, + filters=all_filters, + segments=None, # Already resolved + order_by=None, + limit=None, + offset=None, + aliases=aliases, + ) + + # Remove the instrumentation comment from sub-query + sub_query_lines = sub_query.split("\n") + sub_query_clean = "\n".join( + line for line in sub_query_lines if not line.strip().startswith("-- sidemantic:") + ) + + preagg_ctes.append(f"{cte_name} AS (\n{sub_query_clean}\n)") + + # Build the final SELECT that joins all pre-aggregated CTEs + select_exprs = [] + + # Add dimensions - use COALESCE across all CTEs + for dim_ref, gran in parsed_dims: + dim_name = dim_ref.split(".")[1] if "." in dim_ref else dim_ref + col_name = f"{dim_name}__{gran}" if gran else dim_name + + # Build COALESCE expression + coalesce_parts = [f"{cte}.{col_name}" for cte in cte_names] + select_exprs.append(f"COALESCE({', '.join(coalesce_parts)}) AS {col_name}") + + # Check for metric name collisions across models + metric_name_counts: dict[str, int] = {} + for model_metrics in metrics_by_model.values(): + for metric_ref in model_metrics: + metric_name = metric_ref.split(".")[1] if "." in metric_ref else metric_ref + metric_name_counts[metric_name] = metric_name_counts.get(metric_name, 0) + 1 + + # Add metrics from each CTE + for model_name, model_metrics in metrics_by_model.items(): + cte_name = f"{model_name}_preagg" + for metric_ref in model_metrics: + metric_name = metric_ref.split(".")[1] if "." in metric_ref else metric_ref + # Check for custom alias first + if metric_ref in aliases: + alias = aliases[metric_ref] + elif metric_name_counts.get(metric_name, 1) > 1: + # Collision - prefix with model name + alias = f"{model_name}_{metric_name}" + else: + alias = metric_name + select_exprs.append(f"{cte_name}.{metric_name} AS {alias}") + + # Build FROM clause with FULL OUTER JOINs (or CROSS JOIN if no dimensions) + # Start with first CTE + from_clause = cte_names[0] + + # Join remaining CTEs + join_clauses = [] + for cte_name in cte_names[1:]: + if not parsed_dims: + # No dimensions - use CROSS JOIN (each CTE returns single row) + join_clauses.append(f"CROSS JOIN {cte_name}") + else: + # Build join condition on all dimension columns + join_conditions = [] + for dim_ref, gran in parsed_dims: + dim_name = dim_ref.split(".")[1] if "." in dim_ref else dim_ref + col_name = f"{dim_name}__{gran}" if gran else dim_name + # Use COALESCE to handle NULLs in join condition + # Actually for FULL OUTER JOIN, we need to compare the actual columns + # and handle NULLs with IS NOT DISTINCT FROM or COALESCE-based comparison + join_conditions.append( + f"COALESCE({cte_names[0]}.{col_name}, '') = COALESCE({cte_name}.{col_name}, '')" + ) + + join_clause = " AND ".join(join_conditions) + join_clauses.append(f"FULL OUTER JOIN {cte_name} ON {join_clause}") + + # Combine into final query + select_str = ",\n ".join(select_exprs) + from_str = from_clause + "\n" + "\n".join(join_clauses) + + final_query = f"SELECT\n {select_str}\nFROM {from_str}" + + # Add ORDER BY + if order_by: + order_clauses = [] + for field in order_by: + if "." in field: + field_name = field.split(".", 1)[1] + else: + field_name = field + order_clauses.append(field_name) + final_query += f"\nORDER BY {', '.join(order_clauses)}" + + # Add LIMIT and OFFSET + if limit: + final_query += f"\nLIMIT {limit}" + if offset: + final_query += f"\nOFFSET {offset}" + + # Combine CTEs and main query + cte_str = "WITH " + ",\n".join(preagg_ctes) + full_sql = cte_str + "\n" + final_query + + # Add instrumentation comment + all_models = list(metrics_by_model.keys()) + instrumentation = self._generate_instrumentation_comment( + models=all_models, metrics=metrics, dimensions=dimensions, used_preagg=False + ) + full_sql = full_sql + "\n" + instrumentation + + return full_sql + def _build_main_select( self, base_model_name: str, @@ -1141,15 +1425,10 @@ def _build_main_select( query = query.join(right_table, on=join_cond, join_type=join_type) joined_models.add(jp.to_model) - # Note: Metric-level filters (Metric.filters) are applied in the CTE - # via CASE WHEN expressions, so they should NOT be added to WHERE clause. - # This allows filtered aggregations to work correctly when multiple - # filtered metrics are queried together. - metric_filters = [] - # Separate filters into WHERE (dimension/row-level) and HAVING (metric/aggregation-level) - # Query-level filters: check if they reference metrics (HAVING) or dimensions (WHERE) - # Metric-level filters: always WHERE (they're row-level filters defined in Metric.filters) + # NOTE: Metric-level filters (measure.filters) are NOT added here because they are + # already handled via CASE WHEN in the CTE (see _build_model_cte lines 822-841). + # Adding them to WHERE would incorrectly AND them together across different measures. where_filters = [] having_filters = [] @@ -1177,9 +1456,6 @@ def _build_main_select( else: where_filters.append(filter_expr) - # Metric-level filters always go to WHERE (they're row-level filters) - where_filters.extend(metric_filters) - # Add WHERE clause (dimension filters and metric-level row filters) if where_filters: # Parse filters to add table aliases and handle measure vs dimension columns @@ -1388,6 +1664,24 @@ def _build_metric_sql(self, metric, model_context: str | None = None) -> str: formula = metric.sql + # Check if this is a SQL expression metric (has inline aggregations) + # These metrics already contain complete SQL and shouldn't have dependencies replaced + has_inline_agg = any(agg in formula.upper() for agg in ["COUNT(", "SUM(", "AVG(", "MIN(", "MAX("]) + + if has_inline_agg: + # This is a SQL expression metric - use formula as-is + # Just replace {model} placeholder with the CTE alias + # We need to find which model this metric belongs to + # Look through all models to find this metric + for model_name, model in self.graph.models.items(): + if model.get_metric(metric.name): + # Replace {model} with the CTE alias + formula = formula.replace("{model}", f"{model_name}_cte") + return formula + + # If we didn't find the model, return formula as-is + return formula + # Auto-detect dependencies from expression using graph for resolution dependencies = metric.get_dependencies(self.graph, model_context) diff --git a/tests/adapters/cube/test_kitchen_sink.py b/tests/adapters/cube/test_kitchen_sink.py new file mode 100644 index 00000000..138265ab --- /dev/null +++ b/tests/adapters/cube/test_kitchen_sink.py @@ -0,0 +1,916 @@ +"""Kitchen sink integration tests for Cube adapter. + +This test suite stress-tests sidemantic's Cube adapter implementation with a +complex multi-entity data model featuring various relationship types, aggregation +patterns, and edge cases. + +BUGS FOUND AND FIXED: +===================== + +1. FIXED: Foreign key column inference in CubeAdapter + - Now parses join SQL `${CUBE}.company_id = ${companies.id}` to extract actual FK column + - Handles both many_to_one and one_to_many relationship directions + +2. FIXED: Derived metrics with Cube ${measure} references + - Now converts ratio patterns like `${billable_hours} / ${total_hours}` to sidemantic format + +3. FIXED: Multiple filtered measures AND'd incorrectly + - Now uses CASE WHEN for conditional aggregation instead of WHERE clause + +4. FIXED: Complex derived metrics with inline SQL (e.g., approval_rate with ${CUBE}.status) + - Now handles SQL expression metrics with inline aggregations + - Extracts column references from SQL expressions and includes them in CTEs + +5. FIXED: one_to_many join direction + - Now correctly extracts FK from the target model side for one_to_many relationships + +6. FIXED: Symmetric aggregation for multi-model fan-out + - When metrics from different join levels are queried together (e.g., employees.salary + + departments.budget by companies.name), pre-aggregates each metric separately + to the dimension grain, then joins the pre-aggregated results +""" + +import duckdb +import pytest + +from sidemantic import SemanticLayer +from sidemantic.adapters.cube import CubeAdapter +from tests.utils import fetch_dicts + + +@pytest.fixture +def kitchen_sink_db(): + """Create a comprehensive test database matching the kitchen_sink.yml fixture. + + This creates realistic test data with known values to verify query correctness. + """ + conn = duckdb.connect(":memory:") + + # Companies + conn.execute(""" + CREATE TABLE companies ( + id INTEGER PRIMARY KEY, + name VARCHAR, + industry VARCHAR, + founded_at DATE, + is_active BOOLEAN, + employee_count INTEGER + ) + """) + conn.execute(""" + INSERT INTO companies VALUES + (1, 'TechCorp', 'Technology', '2010-01-15', true, 150), + (2, 'FinanceInc', 'Finance', '2005-06-20', true, 75), + (3, 'RetailCo', 'Retail', '2015-03-10', false, 30), + (4, 'HealthOrg', 'Healthcare', '2018-09-01', true, 200) + """) + + # Departments + conn.execute(""" + CREATE TABLE departments ( + id INTEGER PRIMARY KEY, + company_id INTEGER, + name VARCHAR, + budget DECIMAL(15, 2), + created_at DATE + ) + """) + conn.execute(""" + INSERT INTO departments VALUES + (1, 1, 'Engineering', 2000000.00, '2010-02-01'), + (2, 1, 'Sales', 1500000.00, '2010-02-01'), + (3, 1, 'Marketing', 800000.00, '2011-01-01'), + (4, 2, 'Trading', 5000000.00, '2005-07-01'), + (5, 2, 'Risk', 1200000.00, '2006-01-01'), + (6, 3, 'Operations', 500000.00, '2015-04-01'), + (7, 4, 'Research', 3000000.00, '2018-10-01'), + (8, 4, 'Clinical', 2500000.00, '2018-10-01') + """) + + # Employees + conn.execute(""" + CREATE TABLE employees ( + id INTEGER PRIMARY KEY, + department_id INTEGER, + name VARCHAR, + email VARCHAR, + title VARCHAR, + salary DECIMAL(10, 2), + hired_at DATE, + is_manager BOOLEAN, + manager_id INTEGER + ) + """) + conn.execute(""" + INSERT INTO employees VALUES + (1, 1, 'Alice Smith', 'alice@techcorp.com', 'VP Engineering', 250000.00, '2010-03-01', true, NULL), + (2, 1, 'Bob Jones', 'bob@techcorp.com', 'Senior Engineer', 180000.00, '2012-05-15', false, 1), + (3, 1, 'Carol White', 'carol@techcorp.com', 'Engineer', 140000.00, '2020-01-10', false, 1), + (4, 2, 'Dave Brown', 'dave@techcorp.com', 'Sales Director', 200000.00, '2011-02-01', true, NULL), + (5, 2, 'Eve Wilson', 'eve@techcorp.com', 'Account Executive', 120000.00, '2019-06-01', false, 4), + (6, 4, 'Frank Miller', 'frank@financeinc.com', 'Head Trader', 350000.00, '2005-08-01', true, NULL), + (7, 4, 'Grace Lee', 'grace@financeinc.com', 'Trader', 200000.00, '2015-03-01', false, 6), + (8, 7, 'Henry Chen', 'henry@healthorg.com', 'Research Lead', 180000.00, '2018-11-01', true, NULL), + (9, 7, 'Ivy Park', 'ivy@healthorg.com', 'Researcher', 130000.00, '2022-01-15', false, 8), + (10, 8, 'Jack Davis', 'jack@healthorg.com', 'Clinical Director', 220000.00, '2018-11-01', true, NULL) + """) + + # Projects + conn.execute(""" + CREATE TABLE projects ( + id INTEGER PRIMARY KEY, + company_id INTEGER, + name VARCHAR, + status VARCHAR, + budget DECIMAL(15, 2), + start_date DATE, + end_date DATE, + priority INTEGER + ) + """) + conn.execute(""" + INSERT INTO projects VALUES + (1, 1, 'Platform Rewrite', 'active', 500000.00, '2023-01-01', '2024-06-30', 5), + (2, 1, 'Mobile App', 'completed', 200000.00, '2022-01-01', '2022-12-31', 4), + (3, 1, 'Data Pipeline', 'active', 150000.00, '2023-06-01', '2024-03-31', 3), + (4, 2, 'Risk Dashboard', 'active', 300000.00, '2023-03-01', '2024-02-28', 5), + (5, 4, 'Patient Portal', 'active', 400000.00, '2023-04-01', '2024-09-30', 5), + (6, 4, 'Research DB', 'completed', 100000.00, '2022-06-01', '2023-01-31', 2), + (7, 3, 'Inventory System', 'cancelled', 80000.00, '2021-01-01', '2021-06-30', 3) + """) + + # Project Assignments (many-to-many join table with its own metrics) + conn.execute(""" + CREATE TABLE project_assignments ( + id INTEGER PRIMARY KEY, + employee_id INTEGER, + project_id INTEGER, + role VARCHAR, + assigned_at DATE, + hours_allocated INTEGER + ) + """) + conn.execute(""" + INSERT INTO project_assignments VALUES + (1, 1, 1, 'lead', '2023-01-01', 40), + (2, 2, 1, 'contributor', '2023-01-15', 30), + (3, 3, 1, 'contributor', '2023-02-01', 25), + (4, 2, 3, 'lead', '2023-06-01', 35), + (5, 3, 3, 'contributor', '2023-06-15', 20), + (6, 1, 2, 'lead', '2022-01-01', 20), + (7, 2, 2, 'contributor', '2022-02-01', 40), + (8, 6, 4, 'lead', '2023-03-01', 30), + (9, 7, 4, 'contributor', '2023-03-15', 40), + (10, 8, 5, 'lead', '2023-04-01', 35), + (11, 9, 5, 'contributor', '2023-04-15', 30), + (12, 10, 5, 'reviewer', '2023-04-01', 10), + (13, 8, 6, 'lead', '2022-06-01', 25), + (14, 9, 6, 'contributor', '2022-07-01', 30) + """) + + # Timesheets (time-series data) + conn.execute(""" + CREATE TABLE timesheets ( + id INTEGER PRIMARY KEY, + employee_id INTEGER, + project_id INTEGER, + work_date DATE, + hours DECIMAL(4, 2), + description VARCHAR, + is_billable BOOLEAN + ) + """) + # Generate time-series data for testing cumulative/rolling + conn.execute(""" + INSERT INTO timesheets VALUES + (1, 2, 1, '2023-01-02', 8.0, 'Feature development', true), + (2, 2, 1, '2023-01-03', 7.5, 'Code review', true), + (3, 2, 1, '2023-01-04', 8.0, 'Bug fixes', true), + (4, 2, 1, '2023-01-05', 6.0, 'Meetings', false), + (5, 3, 1, '2023-01-02', 8.0, 'Testing', true), + (6, 3, 1, '2023-01-03', 8.0, 'Documentation', false), + (7, 3, 1, '2023-01-04', 7.0, 'Testing', true), + (8, 1, 1, '2023-01-02', 4.0, 'Planning', false), + (9, 1, 1, '2023-01-03', 3.0, 'Review', true), + (10, 1, 1, '2023-01-04', 5.0, 'Architecture', true), + (11, 6, 4, '2023-03-06', 9.0, 'Trading analysis', true), + (12, 6, 4, '2023-03-07', 10.0, 'Risk modeling', true), + (13, 7, 4, '2023-03-06', 8.0, 'Data prep', true), + (14, 7, 4, '2023-03-07', 8.0, 'Testing', true), + (15, 8, 5, '2023-04-03', 8.0, 'Research design', true), + (16, 8, 5, '2023-04-04', 7.0, 'Literature review', false), + (17, 9, 5, '2023-04-03', 6.0, 'Data collection', true), + (18, 9, 5, '2023-04-04', 8.0, 'Analysis', true) + """) + + # Expenses (with optional project_id - nullable FK) + conn.execute(""" + CREATE TABLE expenses ( + id INTEGER PRIMARY KEY, + employee_id INTEGER, + project_id INTEGER, + category VARCHAR, + amount DECIMAL(10, 2), + submitted_at DATE, + approved_at DATE, + status VARCHAR + ) + """) + conn.execute(""" + INSERT INTO expenses VALUES + (1, 2, 1, 'software', 500.00, '2023-01-10', '2023-01-12', 'approved'), + (2, 2, NULL, 'meals', 75.00, '2023-01-15', NULL, 'pending'), + (3, 3, 1, 'equipment', 1200.00, '2023-01-20', '2023-01-25', 'approved'), + (4, 1, NULL, 'travel', 2500.00, '2023-02-01', '2023-02-05', 'approved'), + (5, 6, 4, 'software', 800.00, '2023-03-10', '2023-03-12', 'approved'), + (6, 6, NULL, 'meals', 150.00, '2023-03-15', NULL, 'rejected'), + (7, 8, 5, 'equipment', 3000.00, '2023-04-10', '2023-04-15', 'approved'), + (8, 9, 5, 'travel', 1800.00, '2023-04-20', NULL, 'pending'), + (9, 10, NULL, 'meals', 200.00, '2023-04-25', '2023-04-26', 'approved') + """) + + # Invoices (with optional project_id) + conn.execute(""" + CREATE TABLE invoices ( + id INTEGER PRIMARY KEY, + company_id INTEGER, + project_id INTEGER, + invoice_number VARCHAR, + status VARCHAR, + issued_at DATE, + due_at DATE, + paid_at DATE, + total_amount DECIMAL(15, 2) + ) + """) + conn.execute(""" + INSERT INTO invoices VALUES + (1, 1, 1, 'INV-001', 'paid', '2023-01-31', '2023-02-28', '2023-02-15', 50000.00), + (2, 1, 1, 'INV-002', 'paid', '2023-02-28', '2023-03-31', '2023-03-20', 75000.00), + (3, 1, 2, 'INV-003', 'paid', '2022-06-30', '2022-07-31', '2022-07-15', 100000.00), + (4, 1, NULL, 'INV-004', 'sent', '2023-03-31', '2023-04-30', NULL, 25000.00), + (5, 2, 4, 'INV-005', 'paid', '2023-06-30', '2023-07-31', '2023-07-25', 150000.00), + (6, 2, 4, 'INV-006', 'overdue', '2023-09-30', '2023-10-31', NULL, 100000.00), + (7, 4, 5, 'INV-007', 'sent', '2023-09-30', '2023-10-31', NULL, 200000.00), + (8, 4, 6, 'INV-008', 'paid', '2023-01-31', '2023-02-28', '2023-02-20', 100000.00) + """) + + # Invoice Line Items (for fan-out testing) + conn.execute(""" + CREATE TABLE invoice_line_items ( + id INTEGER PRIMARY KEY, + invoice_id INTEGER, + description VARCHAR, + quantity INTEGER, + unit_price DECIMAL(10, 2), + line_total DECIMAL(10, 2) + ) + """) + conn.execute(""" + INSERT INTO invoice_line_items VALUES + (1, 1, 'Development hours', 100, 250.00, 25000.00), + (2, 1, 'Design hours', 50, 200.00, 10000.00), + (3, 1, 'Project management', 30, 500.00, 15000.00), + (4, 2, 'Development hours', 200, 250.00, 50000.00), + (5, 2, 'QA hours', 100, 150.00, 15000.00), + (6, 2, 'Documentation', 20, 500.00, 10000.00), + (7, 3, 'Full project delivery', 1, 100000.00, 100000.00), + (8, 4, 'Consulting', 50, 500.00, 25000.00), + (9, 5, 'Trading system', 1, 150000.00, 150000.00), + (10, 6, 'Risk module', 1, 100000.00, 100000.00), + (11, 7, 'Portal development', 400, 500.00, 200000.00), + (12, 8, 'Research DB', 1, 100000.00, 100000.00) + """) + + return conn + + +@pytest.fixture +def kitchen_sink_layer(kitchen_sink_db): + """Create semantic layer from kitchen_sink.yml Cube fixture with test DB.""" + adapter = CubeAdapter() + graph = adapter.parse("tests/fixtures/cube/kitchen_sink.yml") + + layer = SemanticLayer(auto_register=False) + layer.graph = graph + layer.conn = kitchen_sink_db + + return layer + + +# ============================================================ +# Basic Parsing Tests +# ============================================================ + + +class TestCubeParsingKitchenSink: + """Test that the complex Cube YAML is parsed correctly.""" + + def test_all_cubes_parsed(self): + """Verify all cubes are imported.""" + adapter = CubeAdapter() + graph = adapter.parse("tests/fixtures/cube/kitchen_sink.yml") + + expected = [ + "companies", + "departments", + "employees", + "projects", + "project_assignments", + "timesheets", + "expenses", + "invoices", + "invoice_line_items", + ] + for name in expected: + assert name in graph.models, f"Model {name} not found" + + def test_relationships_parsed(self): + """Verify relationships are parsed correctly.""" + adapter = CubeAdapter() + graph = adapter.parse("tests/fixtures/cube/kitchen_sink.yml") + + # departments -> companies (many_to_one) + departments = graph.get_model("departments") + assert any(r.name == "companies" for r in departments.relationships) + + # employees -> departments (many_to_one) + employees = graph.get_model("employees") + assert any(r.name == "departments" for r in employees.relationships) + + # projects -> project_assignments (one_to_many) + projects = graph.get_model("projects") + pa_rel = next((r for r in projects.relationships if r.name == "project_assignments"), None) + assert pa_rel is not None + assert pa_rel.type == "one_to_many" + + def test_segments_parsed(self): + """Verify segments are parsed with correct SQL normalization.""" + adapter = CubeAdapter() + graph = adapter.parse("tests/fixtures/cube/kitchen_sink.yml") + + companies = graph.get_model("companies") + active_segment = next((s for s in companies.segments if s.name == "active_companies"), None) + assert active_segment is not None + assert "{model}" in active_segment.sql + assert "${CUBE}" not in active_segment.sql + + def test_filtered_measures_parsed(self): + """Verify measures with filters are parsed.""" + adapter = CubeAdapter() + graph = adapter.parse("tests/fixtures/cube/kitchen_sink.yml") + + employees = graph.get_model("employees") + manager_count = employees.get_metric("manager_count") + assert manager_count is not None + assert manager_count.filters is not None + assert len(manager_count.filters) > 0 + + def test_derived_measures_parsed(self): + """Verify derived/calculated measures are parsed.""" + adapter = CubeAdapter() + graph = adapter.parse("tests/fixtures/cube/kitchen_sink.yml") + + projects = graph.get_model("projects") + completion_rate = projects.get_metric("completion_rate") + assert completion_rate is not None + # Should be detected as derived or ratio type + assert completion_rate.type in ["derived", "ratio", None] + + +# ============================================================ +# Single Model Query Tests +# ============================================================ + + +class TestSingleModelQueries: + """Test queries against single models.""" + + def test_simple_count(self, kitchen_sink_layer): + """Test basic count metric.""" + result = kitchen_sink_layer.query(metrics=["companies.count"]) + rows = fetch_dicts(result) + assert len(rows) == 1 + assert rows[0]["count"] == 4 + + def test_sum_metric(self, kitchen_sink_layer): + """Test sum aggregation.""" + result = kitchen_sink_layer.query(metrics=["departments.total_budget"]) + rows = fetch_dicts(result) + assert len(rows) == 1 + # Sum of all department budgets + expected = 2000000 + 1500000 + 800000 + 5000000 + 1200000 + 500000 + 3000000 + 2500000 + assert rows[0]["total_budget"] == expected + + def test_avg_metric(self, kitchen_sink_layer): + """Test avg aggregation.""" + result = kitchen_sink_layer.query(metrics=["employees.avg_salary"]) + rows = fetch_dicts(result) + assert len(rows) == 1 + # Average of all salaries + total = 250000 + 180000 + 140000 + 200000 + 120000 + 350000 + 200000 + 180000 + 130000 + 220000 + expected = total / 10 + assert rows[0]["avg_salary"] == expected + + def test_count_distinct(self, kitchen_sink_layer): + """Test count_distinct aggregation.""" + result = kitchen_sink_layer.query(metrics=["employees.headcount"]) + rows = fetch_dicts(result) + assert len(rows) == 1 + assert rows[0]["headcount"] == 10 + + def test_min_max(self, kitchen_sink_layer): + """Test min/max aggregations.""" + result = kitchen_sink_layer.query(metrics=["departments.min_budget", "departments.max_budget"]) + rows = fetch_dicts(result) + assert len(rows) == 1 + assert rows[0]["min_budget"] == 500000 # Operations at RetailCo + assert rows[0]["max_budget"] == 5000000 # Trading at FinanceInc + + def test_filtered_measure(self, kitchen_sink_layer): + """Test measure with filter.""" + result = kitchen_sink_layer.query(metrics=["employees.manager_count"]) + rows = fetch_dicts(result) + assert len(rows) == 1 + # 5 managers: Alice, Dave, Frank, Henry, Jack + assert rows[0]["manager_count"] == 5 + + def test_grouped_by_dimension(self, kitchen_sink_layer): + """Test grouping by categorical dimension.""" + result = kitchen_sink_layer.query(metrics=["companies.count"], dimensions=["companies.industry"]) + rows = fetch_dicts(result) + industry_counts = {row["industry"]: row["count"] for row in rows} + assert industry_counts["Technology"] == 1 + assert industry_counts["Finance"] == 1 + assert industry_counts["Retail"] == 1 + assert industry_counts["Healthcare"] == 1 + + def test_segment_filter(self, kitchen_sink_layer): + """Test using a segment as a filter.""" + result = kitchen_sink_layer.query(metrics=["companies.count"], segments=["companies.active_companies"]) + rows = fetch_dicts(result) + assert len(rows) == 1 + assert rows[0]["count"] == 3 # TechCorp, FinanceInc, HealthOrg (not RetailCo) + + def test_multiple_segments(self, kitchen_sink_layer): + """Test combining multiple segments.""" + result = kitchen_sink_layer.query( + metrics=["companies.count"], + segments=["companies.active_companies", "companies.tech_companies"], + ) + rows = fetch_dicts(result) + assert len(rows) == 1 + assert rows[0]["count"] == 1 # Only TechCorp is both active and tech + + def test_time_dimension_with_granularity(self, kitchen_sink_layer): + """Test time dimension with granularity.""" + result = kitchen_sink_layer.query( + metrics=["timesheets.total_hours"], + dimensions=["timesheets.work_date__month"], + ) + rows = fetch_dicts(result) + # Should have distinct months + assert len(rows) >= 2 + + +# ============================================================ +# Join Tests (Multi-Model Queries) +# ============================================================ + + +class TestJoinQueries: + """Test queries that join multiple models.""" + + def test_simple_many_to_one_join(self, kitchen_sink_layer): + """Test simple many-to-one join.""" + result = kitchen_sink_layer.query( + metrics=["departments.total_budget"], + dimensions=["companies.name"], + ) + rows = fetch_dicts(result) + budget_by_company = {row["name"]: row["total_budget"] for row in rows} + # TechCorp: Engineering(2M) + Sales(1.5M) + Marketing(0.8M) = 4.3M + assert budget_by_company["TechCorp"] == 4300000 + # FinanceInc: Trading(5M) + Risk(1.2M) = 6.2M + assert budget_by_company["FinanceInc"] == 6200000 + + def test_two_hop_join(self, kitchen_sink_layer): + """Test two-hop join: employees -> departments -> companies.""" + result = kitchen_sink_layer.query( + metrics=["employees.total_salary"], + dimensions=["companies.name"], + ) + rows = fetch_dicts(result) + salary_by_company = {row["name"]: row["total_salary"] for row in rows} + # TechCorp employees: Alice(250k) + Bob(180k) + Carol(140k) + Dave(200k) + Eve(120k) = 890k + assert salary_by_company["TechCorp"] == 890000 + # FinanceInc: Frank(350k) + Grace(200k) = 550k + assert salary_by_company["FinanceInc"] == 550000 + + def test_join_with_dimension_from_joined_model(self, kitchen_sink_layer): + """Test using a dimension from a joined model.""" + result = kitchen_sink_layer.query( + metrics=["employees.count"], + dimensions=["departments.name"], + ) + rows = fetch_dicts(result) + emp_by_dept = {row["name"]: row["count"] for row in rows} + assert emp_by_dept["Engineering"] == 3 # Alice, Bob, Carol + assert emp_by_dept["Sales"] == 2 # Dave, Eve + assert emp_by_dept["Trading"] == 2 # Frank, Grace + + def test_join_table_with_own_metrics(self, kitchen_sink_layer): + """Test that join tables (project_assignments) can have their own metrics.""" + result = kitchen_sink_layer.query( + metrics=["project_assignments.total_hours_allocated"], + dimensions=["projects.name"], + ) + rows = fetch_dicts(result) + hours_by_project = {row["name"]: row["total_hours_allocated"] for row in rows} + # Platform Rewrite: 40+30+25 = 95 + assert hours_by_project["Platform Rewrite"] == 95 + + def test_count_distinct_across_join(self, kitchen_sink_layer): + """Test count_distinct across a join.""" + result = kitchen_sink_layer.query( + metrics=["project_assignments.unique_employees"], + dimensions=["projects.name"], + ) + rows = fetch_dicts(result) + unique_by_project = {row["name"]: row["unique_employees"] for row in rows} + # Platform Rewrite: employees 1, 2, 3 + assert unique_by_project["Platform Rewrite"] == 3 + + +# ============================================================ +# Fan-Out / Symmetric Aggregate Tests +# ============================================================ + + +class TestFanOutAggregation: + """Test correct aggregation when joins cause row fan-out.""" + + def test_invoice_total_with_line_items(self, kitchen_sink_layer): + """Test that invoice totals aren't inflated by line items join. + + This is a critical test for symmetric aggregates. + Invoice 1 has 3 line items, so without symmetric aggs the + total would be 3x the correct value. + """ + result = kitchen_sink_layer.query( + metrics=["invoices.total_invoiced", "invoice_line_items.count"], + dimensions=["invoices.invoice_number"], + ) + rows = fetch_dicts(result) + inv1 = next(r for r in rows if r["invoice_number"] == "INV-001") + # Invoice 1 total is 50000, has 3 line items + # With proper symmetric aggregation, total should still be 50000 + assert inv1["total_invoiced"] == 50000 + assert inv1["count"] == 3 + + def test_project_budget_with_assignments(self, kitchen_sink_layer): + """Test project budget isn't inflated by assignment fan-out.""" + result = kitchen_sink_layer.query( + metrics=["projects.total_budget", "project_assignments.count"], + dimensions=["projects.name"], + ) + rows = fetch_dicts(result) + platform = next(r for r in rows if r["name"] == "Platform Rewrite") + # Platform Rewrite has budget 500000, 3 assignments + # Budget should still be 500000, not 1.5M + assert platform["total_budget"] == 500000 + assert platform["count"] == 3 + + +# ============================================================ +# Nullable Foreign Key Tests +# ============================================================ + + +class TestNullableForeignKeys: + """Test queries involving optional (nullable) foreign keys.""" + + def test_expenses_with_null_project(self, kitchen_sink_layer): + """Test expenses that have NULL project_id.""" + # Get all expenses count (includes those with NULL project_id) + result = kitchen_sink_layer.query(metrics=["expenses.count"]) + rows = fetch_dicts(result) + assert rows[0]["count"] == 9 # Total expenses + + # Now query with segment that filters to project_id IS NULL + result = kitchen_sink_layer.query( + metrics=["expenses.count"], + segments=["expenses.general_expenses"], + ) + rows = fetch_dicts(result) + # 4 expenses have NULL project_id + assert rows[0]["count"] == 4 + + def test_left_join_preserves_null_fk(self, kitchen_sink_layer): + """Test that LEFT JOIN properly handles NULL FKs. + + When joining expenses to projects, expenses with NULL project_id + should still be included in totals. + """ + # Query expenses with project dimension - should still get all expenses + # if we're using proper LEFT JOINs + result = kitchen_sink_layer.query( + metrics=["expenses.total_amount"], + dimensions=["projects.name"], + ) + # The SQL should use LEFT JOIN for nullable FKs + # This is a hole if sidemantic uses INNER JOIN here + rows = fetch_dicts(result) + # We should have rows for each project with expenses + # AND potentially a NULL row for general expenses + assert len(rows) > 0 + + +# ============================================================ +# Derived Metric Tests +# ============================================================ + + +class TestDerivedMetrics: + """Test derived/calculated metrics.""" + + def test_ratio_metric(self, kitchen_sink_layer): + """Test a simple ratio metric.""" + result = kitchen_sink_layer.query(metrics=["timesheets.billable_ratio"]) + rows = fetch_dicts(result) + assert len(rows) == 1 + # Total hours: 8+7.5+8+6+8+8+7+4+3+5+9+10+8+8+8+7+6+8 = 129.5 + # Billable hours: 8+7.5+8+8+7+3+5+9+10+8+8+8+6+8 = 103.5 + # Ratio ≈ 0.799 + ratio = rows[0]["billable_ratio"] + assert ratio is not None + assert 0.7 < ratio < 0.9 + + def test_derived_metric_with_filtered_measures(self, kitchen_sink_layer): + """Test derived metric that uses filtered measures.""" + result = kitchen_sink_layer.query(metrics=["expenses.approval_rate"]) + rows = fetch_dicts(result) + assert len(rows) == 1 + # 6 approved, 9 total = 66.67% + rate = rows[0]["approval_rate"] + assert rate is not None + assert abs(rate - (6 / 9)) < 0.01 + + def test_completion_rate_by_company(self, kitchen_sink_layer): + """Test completion rate (derived) grouped by dimension.""" + result = kitchen_sink_layer.query( + metrics=["projects.completion_rate"], + dimensions=["companies.name"], + ) + rows = fetch_dicts(result) + rates = {row["name"]: row["completion_rate"] for row in rows} + # TechCorp: 1 completed, 3 total = 33.33% + assert rates["TechCorp"] == pytest.approx(1 / 3, rel=0.01) + # HealthOrg: 1 completed, 2 total = 50% + assert rates["HealthOrg"] == pytest.approx(0.5, rel=0.01) + + +# ============================================================ +# Edge Cases and Potential Holes +# ============================================================ + + +class TestEdgeCasesAndHoles: + """Test potential edge cases that might reveal holes in sidemantic.""" + + def test_self_referential_join(self, kitchen_sink_layer): + """Test self-referential relationship (manager_id). + + This is a potential hole - does sidemantic support self-joins? + """ + # This would require a self-join: employees.manager_id -> employees.id + # Most semantic layers don't handle this well + # We skip this as a known limitation but document it + pytest.skip("Self-referential joins not currently supported - potential enhancement") + + def test_multiple_paths_to_same_model(self, kitchen_sink_layer): + """Test when there are multiple paths to the same model. + + expenses -> employees -> departments -> companies + expenses -> projects -> companies + + Which path should sidemantic use? + """ + # Query expenses with company dimension - could go through employee or project + result = kitchen_sink_layer.query( + metrics=["expenses.total_amount"], + dimensions=["companies.name"], + ) + rows = fetch_dicts(result) + # Should work and produce some result + assert len(rows) > 0 + + def test_measure_from_intermediate_model(self, kitchen_sink_layer): + """Test querying a measure from an intermediate model in a join chain. + + Query: employees metric + companies dimension + But also want departments metric in the same query. + + This uses pre-aggregation to avoid fan-out: each metric is aggregated + to the dimension grain separately, then the results are joined. + """ + result = kitchen_sink_layer.query( + metrics=["employees.total_salary", "departments.total_budget"], + dimensions=["companies.name"], + ) + rows = fetch_dicts(result) + # Should have salary and budget by company + techcorp = next(r for r in rows if r["name"] == "TechCorp") + assert techcorp["total_salary"] == 890000 + assert techcorp["total_budget"] == 4300000 + + def test_multiple_filtered_measures_same_model(self, kitchen_sink_layer): + """Test multiple filtered measures from the same model. + + Each filtered measure should compute independently using CASE WHEN. + """ + result = kitchen_sink_layer.query( + metrics=[ + "expenses.approved_amount", + "expenses.pending_amount", + "expenses.total_amount", + ], + ) + rows = fetch_dicts(result) + assert len(rows) == 1 + # Approved: 500+1200+2500+800+3000+200 = 8200 + # Pending: 75+1800 = 1875 + # Total: 500+75+1200+2500+800+150+3000+1800+200 = 10225 + assert rows[0]["approved_amount"] == 8200 + assert rows[0]["pending_amount"] == 1875 + assert rows[0]["total_amount"] == 10225 + + def test_filter_on_joined_model(self, kitchen_sink_layer): + """Test filtering on a dimension from a joined model.""" + result = kitchen_sink_layer.query( + metrics=["employees.count"], + filters=["companies.industry = 'Technology'"], + ) + rows = fetch_dicts(result) + assert len(rows) == 1 + # TechCorp employees: 5 + assert rows[0]["count"] == 5 + + def test_segment_from_joined_model(self, kitchen_sink_layer): + """Test using a segment from a joined model. + + This might be a hole - do segments work across joins? + """ + # This would use companies.active_companies segment when querying employees + # Many semantic layers don't support this + try: + result = kitchen_sink_layer.query( + metrics=["employees.count"], + segments=["companies.active_companies"], + ) + rows = fetch_dicts(result) + # Should only include employees from active companies + # Excludes RetailCo which has no employees anyway in our data + assert rows[0]["count"] == 10 + except Exception as e: + pytest.skip(f"Cross-model segments not supported: {e}") + + def test_having_clause_on_aggregate(self, kitchen_sink_layer): + """Test HAVING clause (filter on aggregated value). + + Filter: departments with total_budget > 1M + """ + result = kitchen_sink_layer.query( + metrics=["departments.total_budget"], + dimensions=["departments.name"], + filters=["departments.total_budget > 1000000"], + ) + rows = fetch_dicts(result) + # Only departments with budget > 1M + dept_names = {row["name"] for row in rows} + assert "Engineering" in dept_names # 2M + assert "Trading" in dept_names # 5M + assert "Operations" not in dept_names # 500k + + def test_order_by_metric(self, kitchen_sink_layer): + """Test ordering by a metric.""" + result = kitchen_sink_layer.query( + metrics=["departments.total_budget"], + dimensions=["departments.name"], + order_by=["departments.total_budget"], + ) + rows = fetch_dicts(result) + budgets = [row["total_budget"] for row in rows] + # Should be ascending + assert budgets == sorted(budgets) + + def test_limit_and_offset(self, kitchen_sink_layer): + """Test LIMIT and OFFSET.""" + result = kitchen_sink_layer.query( + metrics=["employees.count"], + dimensions=["employees.name"], + order_by=["employees.name"], + limit=3, + ) + rows = fetch_dicts(result) + assert len(rows) == 3 + + def test_boolean_dimension_grouping(self, kitchen_sink_layer): + """Test grouping by boolean dimension.""" + result = kitchen_sink_layer.query( + metrics=["employees.count"], + dimensions=["employees.is_manager"], + ) + rows = fetch_dicts(result) + by_manager = {row["is_manager"]: row["count"] for row in rows} + assert by_manager[True] == 5 + assert by_manager[False] == 5 + + def test_numeric_dimension_as_grouping(self, kitchen_sink_layer): + """Test grouping by numeric dimension (priority).""" + result = kitchen_sink_layer.query( + metrics=["projects.count"], + dimensions=["projects.priority"], + ) + rows = fetch_dicts(result) + by_priority = {row["priority"]: row["count"] for row in rows} + # Priority 5: 3 projects, 4: 1, 3: 2, 2: 1 + assert by_priority[5] == 3 + assert by_priority[4] == 1 + + def test_aggregation_on_aggregation(self, kitchen_sink_layer): + """Test whether we can aggregate an already aggregated value. + + E.g., AVG of department budgets per company. + This requires a two-level aggregation. + """ + # This might not work - sidemantic might not support nested aggregation + try: + # First aggregate departments by company, then avg those budgets + # This would be: SELECT company, AVG(dept_budget_per_company) + # which requires subquery or window function + pytest.skip("Nested aggregation not directly supported") + except Exception: + pytest.skip("Nested aggregation not supported") + + +# ============================================================ +# SQL Compilation Inspection Tests +# ============================================================ + + +class TestSQLCompilation: + """Test that compiled SQL is correct.""" + + def test_cte_structure(self, kitchen_sink_layer): + """Verify CTEs are generated for each model.""" + sql = kitchen_sink_layer.compile( + metrics=["employees.total_salary"], + dimensions=["companies.name"], + ) + # Should have CTEs for employees, departments, companies + assert "employees_cte" in sql + assert "departments_cte" in sql + assert "companies_cte" in sql + + def test_join_types(self, kitchen_sink_layer): + """Verify correct join types are used.""" + sql = kitchen_sink_layer.compile( + metrics=["employees.count"], + dimensions=["companies.name"], + ) + # Should use LEFT JOIN for standard joins + assert "LEFT JOIN" in sql + + def test_filter_pushdown(self, kitchen_sink_layer): + """Verify filters are pushed down to CTEs when possible.""" + sql = kitchen_sink_layer.compile( + metrics=["employees.count"], + dimensions=["employees.title"], + filters=["employees.is_manager = true"], + ) + # Filter should ideally be in the employees_cte WHERE clause + # Look for the filter in CTE definition + assert "is_manager" in sql + + def test_group_by_generated(self, kitchen_sink_layer): + """Verify GROUP BY is generated for dimension queries.""" + sql = kitchen_sink_layer.compile( + metrics=["companies.count"], + dimensions=["companies.industry"], + ) + assert "GROUP BY" in sql + + def test_segment_filter_application(self, kitchen_sink_layer): + """Verify segment SQL is correctly applied.""" + sql = kitchen_sink_layer.compile( + metrics=["companies.count"], + segments=["companies.active_companies"], + ) + # Should have is_active = true somewhere + assert "is_active" in sql.lower() + assert "true" in sql.lower() + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/fixtures/cube/kitchen_sink.yml b/tests/fixtures/cube/kitchen_sink.yml new file mode 100644 index 00000000..38d5ced1 --- /dev/null +++ b/tests/fixtures/cube/kitchen_sink.yml @@ -0,0 +1,748 @@ +# Kitchen Sink Cube Test Fixture +# Complex multi-entity data model designed to stress-test sidemantic's Cube adapter +# and find holes in query generation logic. +# +# Entities: +# - companies (top-level entity) +# - departments (belongs_to company) +# - employees (belongs_to department) +# - projects (belongs_to company, has_many employees via project_assignments) +# - project_assignments (join table: employee <-> project, with hours) +# - timesheets (belongs_to employee, belongs_to project) +# - expenses (belongs_to employee, optional project) +# - invoices (belongs_to company, optional project) +# - invoice_line_items (belongs_to invoice) + +cubes: + # ============================================================ + # Companies - Top level entity + # ============================================================ + - name: companies + sql_table: companies + description: Top-level company entities + + dimensions: + - name: id + sql: id + type: number + primary_key: true + + - name: name + sql: name + type: string + + - name: industry + sql: industry + type: string + + - name: founded_at + sql: founded_at + type: time + + - name: is_active + sql: is_active + type: boolean + + - name: employee_count + sql: employee_count + type: number + description: Denormalized employee count for testing numeric dimensions + + measures: + - name: count + type: count + + - name: total_employees + sql: employee_count + type: sum + description: Sum of denormalized employee counts + + - name: avg_company_size + sql: employee_count + type: avg + + segments: + - name: active_companies + sql: "${CUBE}.is_active = true" + + - name: tech_companies + sql: "${CUBE}.industry = 'Technology'" + + - name: large_companies + sql: "${CUBE}.employee_count >= 100" + + # ============================================================ + # Departments - belongs_to Company + # ============================================================ + - name: departments + sql_table: departments + description: Departments within companies + + dimensions: + - name: id + sql: id + type: number + primary_key: true + + - name: company_id + sql: company_id + type: number + + - name: name + sql: name + type: string + + - name: budget + sql: budget + type: number + + - name: created_at + sql: created_at + type: time + + measures: + - name: count + type: count + + - name: total_budget + sql: budget + type: sum + + - name: avg_budget + sql: budget + type: avg + + - name: max_budget + sql: budget + type: max + + - name: min_budget + sql: budget + type: min + + segments: + - name: high_budget + sql: "${CUBE}.budget >= 1000000" + + joins: + - name: companies + sql: "${CUBE}.company_id = ${companies.id}" + relationship: many_to_one + + # ============================================================ + # Employees - belongs_to Department (transitively to Company) + # ============================================================ + - name: employees + sql_table: employees + description: Employee records + + dimensions: + - name: id + sql: id + type: number + primary_key: true + + - name: department_id + sql: department_id + type: number + + - name: name + sql: name + type: string + + - name: email + sql: email + type: string + + - name: title + sql: title + type: string + + - name: salary + sql: salary + type: number + + - name: hired_at + sql: hired_at + type: time + + - name: is_manager + sql: is_manager + type: boolean + + - name: manager_id + sql: manager_id + type: number + description: Self-referential FK for testing + + measures: + - name: count + type: count + + - name: headcount + type: count_distinct + sql: id + description: Distinct employee count + + - name: total_salary + sql: salary + type: sum + + - name: avg_salary + sql: salary + type: avg + + - name: max_salary + sql: salary + type: max + + - name: manager_count + type: count + filters: + - sql: "${CUBE}.is_manager = true" + description: Count of managers only + + - name: non_manager_count + type: count + filters: + - sql: "${CUBE}.is_manager = false" + + segments: + - name: managers + sql: "${CUBE}.is_manager = true" + + - name: high_earners + sql: "${CUBE}.salary >= 150000" + + - name: recent_hires + sql: "${CUBE}.hired_at >= CURRENT_DATE - INTERVAL '90 days'" + + joins: + - name: departments + sql: "${CUBE}.department_id = ${departments.id}" + relationship: many_to_one + + # ============================================================ + # Projects - belongs_to Company, has_many Employees via Assignments + # ============================================================ + - name: projects + sql_table: projects + description: Company projects + + dimensions: + - name: id + sql: id + type: number + primary_key: true + + - name: company_id + sql: company_id + type: number + + - name: name + sql: name + type: string + + - name: status + sql: status + type: string + description: active, completed, cancelled + + - name: budget + sql: budget + type: number + + - name: start_date + sql: start_date + type: time + + - name: end_date + sql: end_date + type: time + + - name: priority + sql: priority + type: number + description: 1-5 priority level + + measures: + - name: count + type: count + + - name: active_count + type: count + filters: + - sql: "${CUBE}.status = 'active'" + + - name: completed_count + type: count + filters: + - sql: "${CUBE}.status = 'completed'" + + - name: total_budget + sql: budget + type: sum + + - name: avg_budget + sql: budget + type: avg + + - name: completion_rate + type: number + sql: "${completed_count}::float / NULLIF(${count}, 0)" + description: Derived metric - ratio of completed to total + + segments: + - name: active + sql: "${CUBE}.status = 'active'" + + - name: completed + sql: "${CUBE}.status = 'completed'" + + - name: high_priority + sql: "${CUBE}.priority >= 4" + + - name: big_budget + sql: "${CUBE}.budget >= 100000" + + joins: + - name: companies + sql: "${CUBE}.company_id = ${companies.id}" + relationship: many_to_one + + # One-to-many: project has many assignments + - name: project_assignments + sql: "${CUBE}.id = ${project_assignments.project_id}" + relationship: one_to_many + + # ============================================================ + # Project Assignments - Join table (Employee <-> Project) + # This is a CRITICAL edge case: join tables with their own metrics + # ============================================================ + - name: project_assignments + sql_table: project_assignments + description: Many-to-many join between employees and projects + + dimensions: + - name: id + sql: id + type: number + primary_key: true + + - name: employee_id + sql: employee_id + type: number + + - name: project_id + sql: project_id + type: number + + - name: role + sql: role + type: string + description: lead, contributor, reviewer + + - name: assigned_at + sql: assigned_at + type: time + + - name: hours_allocated + sql: hours_allocated + type: number + + measures: + - name: count + type: count + description: Number of assignments + + - name: total_hours_allocated + sql: hours_allocated + type: sum + + - name: avg_hours_allocated + sql: hours_allocated + type: avg + + - name: lead_count + type: count + filters: + - sql: "${CUBE}.role = 'lead'" + + - name: unique_employees + type: count_distinct + sql: employee_id + + - name: unique_projects + type: count_distinct + sql: project_id + + segments: + - name: leads_only + sql: "${CUBE}.role = 'lead'" + + - name: high_allocation + sql: "${CUBE}.hours_allocated >= 40" + + joins: + - name: employees + sql: "${CUBE}.employee_id = ${employees.id}" + relationship: many_to_one + + - name: projects + sql: "${CUBE}.project_id = ${projects.id}" + relationship: many_to_one + + # ============================================================ + # Timesheets - belongs_to Employee, belongs_to Project + # Time-series data for cumulative/rolling window testing + # ============================================================ + - name: timesheets + sql_table: timesheets + description: Employee time entries + + dimensions: + - name: id + sql: id + type: number + primary_key: true + + - name: employee_id + sql: employee_id + type: number + + - name: project_id + sql: project_id + type: number + + - name: work_date + sql: work_date + type: time + + - name: hours + sql: hours + type: number + + - name: description + sql: description + type: string + + - name: is_billable + sql: is_billable + type: boolean + + measures: + - name: count + type: count + + - name: total_hours + sql: hours + type: sum + + - name: billable_hours + sql: hours + type: sum + filters: + - sql: "${CUBE}.is_billable = true" + + - name: non_billable_hours + sql: hours + type: sum + filters: + - sql: "${CUBE}.is_billable = false" + + - name: avg_daily_hours + sql: hours + type: avg + + - name: billable_ratio + type: number + sql: "${billable_hours}::float / NULLIF(${total_hours}, 0)" + + # Rolling window measure (cumulative) + - name: cumulative_hours + sql: hours + type: sum + rolling_window: + trailing: unbounded + + segments: + - name: billable + sql: "${CUBE}.is_billable = true" + + - name: overtime + sql: "${CUBE}.hours > 8" + + joins: + - name: employees + sql: "${CUBE}.employee_id = ${employees.id}" + relationship: many_to_one + + - name: projects + sql: "${CUBE}.project_id = ${projects.id}" + relationship: many_to_one + + # ============================================================ + # Expenses - belongs_to Employee, optionally belongs_to Project + # Tests optional (nullable) foreign keys + # ============================================================ + - name: expenses + sql_table: expenses + description: Employee expense reports + + dimensions: + - name: id + sql: id + type: number + primary_key: true + + - name: employee_id + sql: employee_id + type: number + + - name: project_id + sql: project_id + type: number + description: Optional - can be NULL for general expenses + + - name: category + sql: category + type: string + description: travel, meals, equipment, software + + - name: amount + sql: amount + type: number + + - name: submitted_at + sql: submitted_at + type: time + + - name: approved_at + sql: approved_at + type: time + description: NULL if not yet approved + + - name: status + sql: status + type: string + description: pending, approved, rejected + + measures: + - name: count + type: count + + - name: total_amount + sql: amount + type: sum + + - name: approved_amount + sql: amount + type: sum + filters: + - sql: "${CUBE}.status = 'approved'" + + - name: pending_amount + sql: amount + type: sum + filters: + - sql: "${CUBE}.status = 'pending'" + + - name: avg_expense + sql: amount + type: avg + + - name: max_expense + sql: amount + type: max + + - name: approval_rate + type: number + sql: "COUNT(CASE WHEN ${CUBE}.status = 'approved' THEN 1 END)::float / NULLIF(COUNT(*), 0)" + + segments: + - name: approved + sql: "${CUBE}.status = 'approved'" + + - name: pending + sql: "${CUBE}.status = 'pending'" + + - name: travel_expenses + sql: "${CUBE}.category = 'travel'" + + - name: high_value + sql: "${CUBE}.amount >= 1000" + + - name: project_related + sql: "${CUBE}.project_id IS NOT NULL" + + - name: general_expenses + sql: "${CUBE}.project_id IS NULL" + + joins: + - name: employees + sql: "${CUBE}.employee_id = ${employees.id}" + relationship: many_to_one + + # Note: This is a nullable FK - project_id can be NULL + - name: projects + sql: "${CUBE}.project_id = ${projects.id}" + relationship: many_to_one + + # ============================================================ + # Invoices - belongs_to Company, optionally Project + # Tests multi-level aggregation scenarios + # ============================================================ + - name: invoices + sql_table: invoices + description: Company invoices + + dimensions: + - name: id + sql: id + type: number + primary_key: true + + - name: company_id + sql: company_id + type: number + + - name: project_id + sql: project_id + type: number + description: Optional project association + + - name: invoice_number + sql: invoice_number + type: string + + - name: status + sql: status + type: string + description: draft, sent, paid, overdue + + - name: issued_at + sql: issued_at + type: time + + - name: due_at + sql: due_at + type: time + + - name: paid_at + sql: paid_at + type: time + + - name: total_amount + sql: total_amount + type: number + + measures: + - name: count + type: count + + - name: total_invoiced + sql: total_amount + type: sum + + - name: paid_amount + sql: total_amount + type: sum + filters: + - sql: "${CUBE}.status = 'paid'" + + - name: overdue_amount + sql: total_amount + type: sum + filters: + - sql: "${CUBE}.status = 'overdue'" + + - name: avg_invoice + sql: total_amount + type: avg + + - name: collection_rate + type: number + sql: "${paid_amount}::float / NULLIF(${total_invoiced}, 0)" + + segments: + - name: paid + sql: "${CUBE}.status = 'paid'" + + - name: overdue + sql: "${CUBE}.status = 'overdue'" + + - name: draft + sql: "${CUBE}.status = 'draft'" + + joins: + - name: companies + sql: "${CUBE}.company_id = ${companies.id}" + relationship: many_to_one + + - name: projects + sql: "${CUBE}.project_id = ${projects.id}" + relationship: many_to_one + + # One-to-many: invoice has many line items + - name: invoice_line_items + sql: "${CUBE}.id = ${invoice_line_items.invoice_id}" + relationship: one_to_many + + # ============================================================ + # Invoice Line Items - belongs_to Invoice + # Tests fan-out aggregation (symmetric aggregates) + # ============================================================ + - name: invoice_line_items + sql_table: invoice_line_items + description: Line items on invoices + + dimensions: + - name: id + sql: id + type: number + primary_key: true + + - name: invoice_id + sql: invoice_id + type: number + + - name: description + sql: description + type: string + + - name: quantity + sql: quantity + type: number + + - name: unit_price + sql: unit_price + type: number + + - name: line_total + sql: line_total + type: number + + measures: + - name: count + type: count + + - name: total_line_value + sql: line_total + type: sum + + - name: total_quantity + sql: quantity + type: sum + + - name: avg_unit_price + sql: unit_price + type: avg + + - name: avg_line_total + sql: line_total + type: avg + + joins: + - name: invoices + sql: "${CUBE}.invoice_id = ${invoices.id}" + relationship: many_to_one diff --git a/tests/metrics/test_filters.py b/tests/metrics/test_filters.py index 33d90170..ce6ea62d 100644 --- a/tests/metrics/test_filters.py +++ b/tests/metrics/test_filters.py @@ -41,8 +41,9 @@ def test_metric_level_filter_basic(layer): print("SQL with metric-level filter:") print(sql) - # Should contain the metric's filter in a CASE WHEN expression - assert "CASE WHEN status = 'completed' THEN amount END" in sql + # Metric filter should be applied via CASE WHEN in the CTE, not in WHERE clause + assert "CASE WHEN status = 'completed'" in sql + assert "completed_revenue_raw" in sql def test_metric_level_multiple_filters(layer): @@ -71,8 +72,9 @@ def test_metric_level_multiple_filters(layer): sql = layer.compile(metrics=["orders.high_value_completed_revenue"]) - # Should contain both filters combined in a CASE WHEN expression - assert "CASE WHEN status = 'completed' AND amount > 100 THEN amount END" in sql + # Should contain both filters combined via CASE WHEN in CTE + assert "CASE WHEN status = 'completed' AND amount > 100" in sql + assert "high_value_completed_revenue_raw" in sql def test_metric_filters_combined_with_query_filters(layer): @@ -100,9 +102,9 @@ def test_metric_filters_combined_with_query_filters(layer): # Add query-level filter on top of metric-level filter sql = layer.compile(metrics=["orders.completed_revenue"], filters=["orders_cte.region = 'US'"]) - # Should contain metric filter in CASE WHEN and query filter in WHERE - assert "CASE WHEN status = 'completed' THEN amount END" in sql - assert "region = 'US'" in sql + # Metric filter should be in CASE WHEN, query filter pushed down to CTE WHERE + assert "CASE WHEN status = 'completed'" in sql # Metric filter in CTE + assert "region = 'US'" in sql # Query filter pushed down into CTE def test_mixed_filtered_and_unfiltered_metrics(layer): @@ -135,9 +137,12 @@ def test_mixed_filtered_and_unfiltered_metrics(layer): sql = layer.compile(metrics=["orders.total_revenue", "orders.completed_revenue"]) - # Should have filtered and unfiltered metrics coexisting - # completed_revenue uses CASE WHEN, total_revenue doesn't - assert "CASE WHEN status = 'completed' THEN amount END" in sql + # Completed filter should be in CASE WHEN for completed_revenue only + assert "CASE WHEN status = 'completed'" in sql + assert "completed_revenue_raw" in sql + # Total revenue should have no CASE WHEN + assert "amount AS total_revenue_raw" in sql + # Both metrics should be in the SELECT assert "total_revenue" in sql assert "completed_revenue" in sql @@ -166,8 +171,9 @@ def test_metric_filter_with_time_dimension(layer): sql = layer.compile(metrics=["orders.recent_completed_revenue"], dimensions=["orders.created_at__month"]) - # Should contain both filters combined in CASE WHEN - assert "CASE WHEN status = 'completed' AND created_at >= CURRENT_DATE - 30 THEN amount END" in sql + # Both filters should be in CASE WHEN combined with AND + assert "CASE WHEN status = 'completed' AND created_at >= CURRENT_DATE" in sql + assert "recent_completed_revenue_raw" in sql def test_metric_filter_column_not_in_query_dimensions(layer): @@ -361,6 +367,7 @@ def test_metric_level_filters_use_case_when(layer): primary_key="order_id", dimensions=[ Dimension(name="region", type="categorical"), + Dimension(name="status", type="categorical"), ], metrics=[ Metric(name="completed_revenue", agg="sum", sql="amount", filters=["{model}.status = 'completed'"]), @@ -371,8 +378,9 @@ def test_metric_level_filters_use_case_when(layer): sql = layer.compile(metrics=["orders.completed_revenue"], dimensions=["orders.region"]) - # Metric-level filter should be in CASE WHEN expression - assert "CASE WHEN status = 'completed' THEN amount END" in sql + # Metric-level filter should be in CASE WHEN (conditional aggregation) + assert "CASE WHEN status = 'completed'" in sql + assert "completed_revenue_raw" in sql def test_having_filter_with_actual_data(layer): diff --git a/tests/metrics/test_symmetric_aggs.py b/tests/metrics/test_symmetric_aggs.py index 548051f3..aa5ef2e4 100644 --- a/tests/metrics/test_symmetric_aggs.py +++ b/tests/metrics/test_symmetric_aggs.py @@ -168,7 +168,12 @@ def test_fanout_join_detection_multiple_joins(): def test_symmetric_aggregates_in_sql_generation(): - """Test that SQL generation uses symmetric aggregates for fan-out joins.""" + """Test that SQL generation uses pre-aggregation to handle fan-out. + + When metrics come from different models at different join levels, + the generator uses pre-aggregation: each metric is aggregated separately + to the dimension grain, then the results are joined together. + """ graph = SemanticGraph() # Orders (base) - has two one-to-many relationships @@ -216,9 +221,12 @@ def test_symmetric_aggregates_in_sql_generation(): dimensions=["orders.order_date"], ) - # Orders.revenue should use symmetric aggregates (HASH function) - assert "HASH(orders_cte.id)" in sql - assert "SUM(" in sql and "DISTINCT" in sql + # Pre-aggregation approach: each model's metrics are aggregated separately + # and then joined together with FULL OUTER JOIN + assert "orders_preagg" in sql + assert "order_items_preagg" in sql + assert "shipments_preagg" in sql + assert "FULL OUTER JOIN" in sql def test_symmetric_aggregates_with_data(): diff --git a/tests/optimizations/test_predicate_pushdown.py b/tests/optimizations/test_predicate_pushdown.py index 64a58d1a..dc691540 100644 --- a/tests/optimizations/test_predicate_pushdown.py +++ b/tests/optimizations/test_predicate_pushdown.py @@ -328,18 +328,28 @@ def test_metric_level_filters_use_case_when_in_cte(layer): assert cte is not None - # Query-level filter (region) should be pushed to CTE WHERE + # Query-level filter (region) should be in CTE WHERE clause cte_where = cte.this.find(exp.Where) assert cte_where is not None cte_where_sql = cte_where.sql() assert "region" in cte_where_sql - # Metric-level filter should be in a CASE WHEN in the CTE SELECT, not WHERE + # Metric-level filter should NOT be in CTE WHERE (it's in CASE WHEN instead) assert "status" not in cte_where_sql - # Check that CASE WHEN is in the CTE for the metric filter - cte_sql = cte.this.sql() - assert "CASE WHEN status = 'completed' THEN amount END" in cte_sql + # Metric-level filter should be applied via CASE WHEN in the CTE SELECT + cte_select_sql = cte.this.sql() + assert "CASE WHEN status = 'completed'" in cte_select_sql + assert "completed_revenue_raw" in cte_select_sql + + # Main query should NOT have metric filter in WHERE (it's already in CASE WHEN) + main_select = parsed.find(exp.Select) + main_where = main_select.find(exp.Where) + # Main WHERE might be None if all filters were pushed down + if main_where: + main_where_sql = main_where.sql() + # Metric filter should NOT be in main WHERE + assert "status" not in main_where_sql if __name__ == "__main__": diff --git a/tests/test_sql_generation_security.py b/tests/test_sql_generation_security.py index 3426e189..259d0f8c 100644 --- a/tests/test_sql_generation_security.py +++ b/tests/test_sql_generation_security.py @@ -241,8 +241,9 @@ def test_count_metrics_with_filters(layer): sql = layer.compile(metrics=["orders.completed_orders"], dimensions=["orders.status"]) - # Should have valid COUNT with filter in CASE WHEN - assert "CASE WHEN status = 'completed' THEN 1 END" in sql + # Should have valid COUNT with filter in CASE WHEN (conditional aggregation) + assert "CASE WHEN status = 'completed'" in sql + assert "completed_orders_raw" in sql # Should not have invalid * AS syntax assert "* AS completed_orders_raw" not in sql