# CTE Regression Analysis - DuckDB Demo

This notebook demonstrates the CTE regression analysis tools using DuckDB as the backend.
DuckDB is lightweight and doesn't require Java or Spark installation.

## Requirements

```bash
pip install duckdb pandas
```

In [None]:
# Setup DuckDB
import duckdb
import pandas as pd
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
import re

# Create in-memory DuckDB connection
con = duckdb.connect(":memory:")

print(f"DuckDB version: {duckdb.__version__}")
print("Connection established")

## Create Demo Tables

In [None]:
# Customer data
con.execute("""
CREATE TABLE demo_customers AS
SELECT * FROM (
    VALUES
    (1, 'Alice Johnson', 'alice@email.com', '2020-01-15', 'GOLD', 'US'),
    (2, 'Bob Smith', 'bob@email.com', '2020-03-22', 'SILVER', 'US'),
    (3, 'Carol White', 'carol@email.com', '2021-06-10', 'BRONZE', 'UK'),
    (4, 'David Brown', 'david@email.com', '2021-08-05', 'GOLD', 'UK'),
    (5, 'Eve Davis', 'eve@email.com', '2022-01-20', 'SILVER', 'CA'),
    (6, 'Frank Miller', 'frank@email.com', '2022-04-12', 'BRONZE', 'CA'),
    (7, 'Grace Wilson', 'grace@email.com', '2023-02-28', 'GOLD', 'US'),
    (8, 'Henry Taylor', 'henry@email.com', '2023-05-15', 'SILVER', 'UK'),
    (9, 'Ivy Anderson', 'ivy@email.com', '2023-07-01', 'BRONZE', 'US'),
    (10, 'Jack Thomas', 'jack@email.com', '2023-09-10', 'GOLD', 'CA')
) AS t(customer_id, customer_name, email, signup_date, tier, country)
""")

print("Created: demo_customers")
con.execute("SELECT * FROM demo_customers").df()

In [None]:
# Product data
con.execute("""
CREATE TABLE demo_products AS
SELECT * FROM (
    VALUES
    (101, 'Laptop Pro', 'Electronics', 1299.99, 0.15),
    (102, 'Wireless Mouse', 'Electronics', 49.99, 0.10),
    (103, 'USB-C Hub', 'Electronics', 79.99, 0.12),
    (104, 'Desk Chair', 'Furniture', 299.99, 0.08),
    (105, 'Standing Desk', 'Furniture', 599.99, 0.05),
    (106, 'Monitor 27"', 'Electronics', 449.99, 0.10),
    (107, 'Keyboard', 'Electronics', 129.99, 0.15),
    (108, 'Webcam HD', 'Electronics', 89.99, 0.20),
    (109, 'Desk Lamp', 'Furniture', 59.99, 0.10),
    (110, 'Cable Organizer', 'Accessories', 19.99, 0.25)
) AS t(product_id, product_name, category, unit_price, discount_rate)
""")

print("Created: demo_products")
con.execute("SELECT * FROM demo_products").df()

In [None]:
# Order data
con.execute("""
CREATE TABLE demo_orders AS
SELECT * FROM (
    VALUES
    (1001, 1, '2024-01-05', 'COMPLETED', 'CREDIT_CARD'),
    (1002, 2, '2024-01-08', 'COMPLETED', 'PAYPAL'),
    (1003, 1, '2024-01-12', 'COMPLETED', 'CREDIT_CARD'),
    (1004, 3, '2024-01-15', 'SHIPPED', 'CREDIT_CARD'),
    (1005, 4, '2024-01-18', 'COMPLETED', 'BANK_TRANSFER'),
    (1006, 5, '2024-01-22', 'COMPLETED', 'PAYPAL'),
    (1007, 2, '2024-01-25', 'CANCELLED', 'CREDIT_CARD'),
    (1008, 6, '2024-01-28', 'COMPLETED', 'CREDIT_CARD'),
    (1009, 7, '2024-02-01', 'COMPLETED', 'PAYPAL'),
    (1010, 8, '2024-02-05', 'SHIPPED', 'CREDIT_CARD'),
    (1011, 1, '2024-02-08', 'COMPLETED', 'CREDIT_CARD'),
    (1012, 9, '2024-02-12', 'COMPLETED', 'BANK_TRANSFER'),
    (1013, 10, '2024-02-15', 'PENDING', 'PAYPAL'),
    (1014, 3, '2024-02-18', 'COMPLETED', 'CREDIT_CARD'),
    (1015, 4, '2024-02-22', 'COMPLETED', 'CREDIT_CARD')
) AS t(order_id, customer_id, order_date, status, payment_method)
""")

print("Created: demo_orders")
con.execute("SELECT * FROM demo_orders").df()

In [None]:
# Order items data
con.execute("""
CREATE TABLE demo_order_items AS
SELECT * FROM (
    VALUES
    (1, 1001, 101, 1, 1299.99),
    (2, 1001, 102, 2, 49.99),
    (3, 1002, 104, 1, 299.99),
    (4, 1002, 109, 2, 59.99),
    (5, 1003, 106, 1, 449.99),
    (6, 1003, 107, 1, 129.99),
    (7, 1004, 101, 1, 1299.99),
    (8, 1004, 103, 1, 79.99),
    (9, 1005, 105, 1, 599.99),
    (10, 1005, 104, 1, 299.99),
    (11, 1006, 102, 3, 49.99),
    (12, 1006, 110, 5, 19.99),
    (13, 1007, 108, 1, 89.99),
    (14, 1008, 106, 2, 449.99),
    (15, 1008, 107, 1, 129.99),
    (16, 1009, 101, 1, 1299.99),
    (17, 1009, 102, 1, 49.99),
    (18, 1010, 105, 1, 599.99),
    (19, 1011, 103, 2, 79.99),
    (20, 1011, 110, 3, 19.99),
    (21, 1012, 104, 1, 299.99),
    (22, 1013, 101, 1, 1299.99),
    (23, 1014, 108, 2, 89.99),
    (24, 1014, 109, 1, 59.99),
    (25, 1015, 106, 1, 449.99)
) AS t(item_id, order_id, product_id, quantity, unit_price)
""")

print("Created: demo_order_items")
con.execute("SELECT * FROM demo_order_items").df()

## Define Demo Queries

Two versions of a customer order summary query:
- **QUERY_LEGACY**: Original implementation
- **QUERY_NEW**: Refactored with intentional differences

In [None]:
# Legacy Query - Original implementation
QUERY_LEGACY = """
WITH order_totals AS (
    SELECT
        o.order_id,
        o.customer_id,
        o.order_date,
        o.status,
        SUM(oi.quantity * oi.unit_price) AS order_total
    FROM demo_orders o
    JOIN demo_order_items oi ON o.order_id = oi.order_id
    WHERE o.status NOT IN ('CANCELLED', 'PENDING')
    GROUP BY o.order_id, o.customer_id, o.order_date, o.status
),

customer_metrics AS (
    SELECT
        customer_id,
        COUNT(DISTINCT order_id) AS order_count,
        ROUND(SUM(order_total), 2) AS total_amount,
        ROUND(AVG(order_total), 2) AS avg_order_value,
        MIN(order_date) AS first_order_date,
        MAX(order_date) AS last_order_date
    FROM order_totals
    GROUP BY customer_id
),

final AS (
    SELECT
        c.customer_id,
        c.customer_name,
        c.tier,
        c.country,
        cm.order_count,
        cm.total_amount,
        cm.avg_order_value,
        cm.first_order_date,
        cm.last_order_date,
        CASE
            WHEN cm.total_amount >= 2000 THEN 'Premium'
            WHEN cm.total_amount >= 500 THEN 'High Value'
            WHEN cm.total_amount >= 100 THEN 'Standard'
            ELSE 'Low Value'
        END AS customer_segment
    FROM demo_customers c
    JOIN customer_metrics cm ON c.customer_id = cm.customer_id
)

SELECT * FROM final
ORDER BY customer_id
"""

print("QUERY_LEGACY defined")

In [None]:
# New Query - Refactored with intentional differences
QUERY_NEW = """
WITH order_totals AS (
    SELECT
        o.order_id,
        o.customer_id,
        o.order_date,
        o.status,
        SUM(oi.quantity * oi.unit_price) AS order_total
    FROM demo_orders o
    JOIN demo_order_items oi ON o.order_id = oi.order_id
    WHERE o.status NOT IN ('CANCELLED', 'PENDING')
    GROUP BY o.order_id, o.customer_id, o.order_date, o.status
),

customer_metrics AS (
    -- DIFFERENCE 1: Different rounding for total_amount (4 decimals vs 2)
    -- DIFFERENCE 2: Different avg calculation (total/count instead of AVG)
    SELECT
        customer_id,
        COUNT(DISTINCT order_id) AS order_count,
        ROUND(SUM(order_total), 4) AS total_amount,
        ROUND(SUM(order_total) / COUNT(DISTINCT order_id), 2) AS avg_order_value,
        MIN(order_date) AS first_order_date,
        MAX(order_date) AS last_order_date
    FROM order_totals
    GROUP BY customer_id
),

final AS (
    -- DIFFERENCE 3: Different threshold for \"High Value\" (600 vs 500)
    SELECT
        c.customer_id,
        c.customer_name,
        c.tier,
        c.country,
        cm.order_count,
        cm.total_amount,
        cm.avg_order_value,
        cm.first_order_date,
        cm.last_order_date,
        CASE
            WHEN cm.total_amount >= 2000 THEN 'Premium'
            WHEN cm.total_amount >= 600 THEN 'High Value'
            WHEN cm.total_amount >= 100 THEN 'Standard'
            ELSE 'Low Value'
        END AS customer_segment
    FROM demo_customers c
    JOIN customer_metrics cm ON c.customer_id = cm.customer_id
)

SELECT * FROM final
ORDER BY customer_id
"""

print("QUERY_NEW defined")

## Verify Queries Work

In [None]:
print("Legacy Query Results:")
con.execute(QUERY_LEGACY).df()

In [None]:
print("New Query Results:")
con.execute(QUERY_NEW).df()

## DuckDB Comparison Utilities

Below are the comparison utilities adapted for DuckDB.

In [None]:
# DuckDB-based comparison utilities

@dataclass
class ComparisonConfig:
    """Configuration for CTE regression comparison."""
    prefix_a: str = "a_"
    prefix_b: str = "b_"
    only_common_ctes: bool = True
    skip_columns: List[str] = None
    sample_limit: int = 10
    key_columns: Dict[str, List[str]] = None
    
    def __post_init__(self):
        if self.skip_columns is None:
            self.skip_columns = []
        if self.key_columns is None:
            self.key_columns = {}


def parse_ctes(sql: str) -> List[Tuple[str, str]]:
    """Parse CTEs from a WITH clause SQL query."""
    sql_clean = re.sub(r'--.*$', '', sql, flags=re.MULTILINE)
    sql_clean = re.sub(r'/\*.*?\*/', '', sql_clean, flags=re.DOTALL)
    
    if not re.match(r'\s*WITH\b', sql_clean, re.IGNORECASE):
        return []
    
    ctes = []
    pattern = r'(\w+)\s+AS\s*\('
    matches = list(re.finditer(pattern, sql_clean, re.IGNORECASE))
    
    for match in matches:
        cte_name = match.group(1)
        start_paren = match.end() - 1
        
        depth = 1
        pos = start_paren + 1
        while pos < len(sql_clean) and depth > 0:
            if sql_clean[pos] == '(':
                depth += 1
            elif sql_clean[pos] == ')':
                depth -= 1
            pos += 1
        
        cte_body = sql_clean[start_paren + 1:pos - 1].strip()
        ctes.append((cte_name, cte_body))
    
    return ctes


def get_final_select(sql: str) -> str:
    """Extract the final SELECT statement after all CTEs."""
    sql_clean = re.sub(r'--.*$', '', sql, flags=re.MULTILINE)
    sql_clean = re.sub(r'/\*.*?\*/', '', sql_clean, flags=re.DOTALL)
    
    depth = 0
    in_with = False
    last_cte_end = 0
    
    for i, char in enumerate(sql_clean):
        if sql_clean[i:i + 4].upper() == 'WITH':
            in_with = True
        elif char == '(':
            depth += 1
        elif char == ')':
            depth -= 1
            if depth == 0 and in_with:
                last_cte_end = i + 1
    
    remainder = sql_clean[last_cte_end:].strip()
    if remainder.startswith(','):
        remainder = remainder[1:].strip()
    
    return remainder


def register_ctes_as_views(sql: str, prefix: str) -> List[str]:
    """Register CTEs as views in DuckDB."""
    ctes = parse_ctes(sql)
    if not ctes:
        return []
    
    created_views = []
    cte_names = [name for name, _ in ctes]
    
    for cte_name, cte_body in ctes:
        adjusted_body = cte_body
        for ref_name in cte_names:
            pattern = rf'\b{re.escape(ref_name)}\b'
            adjusted_body = re.sub(pattern, f'{prefix}{ref_name}', adjusted_body)
        
        view_name = f"{prefix}{cte_name}"
        try:
            con.execute(f"CREATE OR REPLACE VIEW {view_name} AS {adjusted_body}")
            created_views.append(view_name)
            print(f"Created: {view_name}")
        except Exception as e:
            print(f"Failed: {view_name} - {e}")
    
    # Final SELECT
    if 'final' not in [name.lower() for name in cte_names]:
        final_select = get_final_select(sql)
        if final_select and final_select.upper().startswith('SELECT'):
            adjusted_final = final_select
            for cte_name in cte_names:
                pattern = rf'\b{re.escape(cte_name)}\b'
                adjusted_final = re.sub(pattern, f'{prefix}{cte_name}', adjusted_final)
            
            view_name = f"{prefix}final"
            try:
                con.execute(f"CREATE OR REPLACE VIEW {view_name} AS {adjusted_final}")
                created_views.append(view_name)
                print(f"Created: {view_name}")
            except Exception as e:
                print(f"Failed: {view_name} - {e}")
    
    return created_views


def get_view_columns(view_name: str) -> List[str]:
    """Get column names from a view."""
    df = con.execute(f"SELECT * FROM {view_name} LIMIT 0").df()
    return list(df.columns)


def compare_views(view_a: str, view_b: str) -> Dict[str, Any]:
    """Compare two views and return differences."""
    # Row counts
    count_a = con.execute(f"SELECT COUNT(*) FROM {view_a}").fetchone()[0]
    count_b = con.execute(f"SELECT COUNT(*) FROM {view_b}").fetchone()[0]
    
    # Except counts (rows in A not in B)
    except_a = con.execute(f"""
        SELECT COUNT(*) FROM (
            SELECT * FROM {view_a}
            EXCEPT
            SELECT * FROM {view_b}
        )
    """).fetchone()[0]
    
    # Except counts (rows in B not in A)
    except_b = con.execute(f"""
        SELECT COUNT(*) FROM (
            SELECT * FROM {view_b}
            EXCEPT
            SELECT * FROM {view_a}
        )
    """).fetchone()[0]
    
    # Sample differences
    sample_df = con.execute(f"""
        SELECT * FROM (
            SELECT 'SOURCE_ONLY' as _diff_source, * FROM {view_a}
            EXCEPT
            SELECT 'SOURCE_ONLY', * FROM {view_b}
            
            UNION ALL
            
            SELECT 'TARGET_ONLY' as _diff_source, * FROM {view_b}
            EXCEPT
            SELECT 'TARGET_ONLY', * FROM {view_a}
        )
        LIMIT 10
    """).df()
    
    return {
        "rows_source": count_a,
        "rows_target": count_b,
        "except_source_vs_target": except_a,
        "except_target_vs_source": except_b,
        "total_differences": except_a + except_b,
        "sample_df": sample_df,
        "status": "PASS" if (except_a + except_b) == 0 else "FAIL"
    }


def column_difference_summary(view_a: str, view_b: str, key_columns: List[str]) -> pd.DataFrame:
    """Summary of which columns differ most frequently."""
    all_columns = get_view_columns(view_a)
    compare_columns = [c for c in all_columns if c not in key_columns]
    
    key_join = " AND ".join([f"a.{k} = b.{k}" for k in key_columns])
    
    # Count differences per column
    count_cases = []
    for col in compare_columns:
        count_cases.append(f"""
        SUM(CASE WHEN a."{col}" IS DISTINCT FROM b."{col}" THEN 1 ELSE 0 END) AS "{col}_diff"
        """)
    
    count_query = f"""
    SELECT {', '.join(count_cases)}
    FROM {view_a} a
    INNER JOIN {view_b} b ON {key_join}
    """
    
    counts_row = con.execute(count_query).fetchone()
    
    data = []
    for i, col in enumerate(compare_columns):
        diff_count = counts_row[i]
        if diff_count > 0:
            data.append({"column_name": col, "difference_count": diff_count})
    
    df = pd.DataFrame(data)
    if not df.empty:
        df = df.sort_values("difference_count", ascending=False).reset_index(drop=True)
    return df


def full_regression_compare(
    query_a: str,
    query_b: str,
    prefix_a: str = "a_",
    prefix_b: str = "b_",
    key_columns: Optional[Dict[str, List[str]]] = None,
    config: ComparisonConfig = None
) -> List[Dict[str, Any]]:
    """Full CTE-by-CTE regression comparison."""
    config = config or ComparisonConfig()
    key_columns = key_columns or config.key_columns or {}
    
    # Parse CTEs
    ctes_a = parse_ctes(query_a)
    ctes_b = parse_ctes(query_b)
    
    cte_names_a = [n for n, _ in ctes_a]
    cte_names_b = [n for n, _ in ctes_b]
    
    common = set(cte_names_a) & set(cte_names_b)
    only_a = set(cte_names_a) - common
    only_b = set(cte_names_b) - common
    
    print("\n" + "=" * 80)
    print("CTE ANALYSIS")
    print("=" * 80)
    print(f"CTEs in Query A: {len(cte_names_a)}")
    print(f"CTEs in Query B: {len(cte_names_b)}")
    print(f"Common CTEs:     {len(common)}")
    
    if only_a:
        print(f"\nCTEs ONLY in Query A (source): {sorted(only_a)}")
    if only_b:
        print(f"\nCTEs ONLY in Query B (target): {sorted(only_b)}")
    print("=" * 80)
    
    # Register views
    print("\n--- Registering Query A (Source) ---")
    register_ctes_as_views(query_a, prefix_a)
    
    print("\n--- Registering Query B (Target) ---")
    register_ctes_as_views(query_b, prefix_b)
    
    # Show key_columns being used
    if key_columns:
        print(f"\nKey columns for column-level analysis: {key_columns}")
    
    # Determine which CTEs to compare
    if config.only_common_ctes:
        ctes_to_compare = [n for n in cte_names_a if n in common] + ["final"]
    else:
        ctes_to_compare = cte_names_a + ["final"]
    
    # Compare each CTE
    results = []
    
    for cte_name in ctes_to_compare:
        if cte_name != "final" and cte_name not in common:
            results.append({
                "cte_name": cte_name,
                "status": "SKIPPED",
                "reason": "Not present in target query"
            })
            continue
        
        view_a = f"{prefix_a}{cte_name}"
        view_b = f"{prefix_b}{cte_name}"
        
        # Look up key columns
        keys = key_columns.get(cte_name)
        if keys is None:
            for kc_name, kc_cols in key_columns.items():
                if kc_name.lower() == cte_name.lower():
                    keys = kc_cols
                    break
        
        try:
            report = compare_views(view_a, view_b)
            report["cte_name"] = cte_name
            report["source_view"] = view_a
            report["target_view"] = view_b
            
            # Add column analysis if keys provided
            if keys and report.get("total_differences", 0) > 0:
                print(f"  -> Generating column analysis for {cte_name} using keys: {keys}")
                try:
                    col_sum = column_difference_summary(view_a, view_b, keys)
                    report["column_summary"] = col_sum
                    report["key_columns_used"] = keys
                    print(f"  -> Found {len(col_sum)} columns with differences")
                except Exception as e:
                    report["column_summary_error"] = str(e)
                    print(f"  -> Column analysis failed: {e}")
            
            results.append(report)
        except Exception as e:
            results.append({
                "cte_name": cte_name,
                "status": "ERROR",
                "error": str(e)
            })
    
    return results


def print_summary_table(results: List[Dict[str, Any]], show_details: bool = True) -> None:
    """Print a summary table of all CTE comparisons."""
    print("\n" + "=" * 100)
    print("REGRESSION SUMMARY")
    print("=" * 100)
    print(f"{'CTE Name':<25} {'Status':<8} {'Source Rows':>12} {'Target Rows':>12} {'Src-Tgt':>10} {'Tgt-Src':>10}")
    print("-" * 100)
    
    for r in results:
        if r.get("status") == "ERROR":
            print(f"{r['cte_name']:<25} {'ERROR':<8} {r.get('error', '')}")
        elif r.get("status") == "SKIPPED":
            print(f"{r['cte_name']:<25} {'SKIPPED':<8} {r.get('reason', '')}")
        else:
            icon = "+" if r["status"] == "PASS" else "x"
            print(f"{r['cte_name']:<25} {icon} {r['status']:<6} "
                  f"{r['rows_source']:>12,} {r['rows_target']:>12,} "
                  f"{r['except_source_vs_target']:>10,} {r['except_target_vs_source']:>10,}")
    
    print("-" * 100)
    passed = sum(1 for r in results if r.get("status") == "PASS")
    failed = sum(1 for r in results if r.get("status") == "FAIL")
    errors = sum(1 for r in results if r.get("status") == "ERROR")
    print(f"Total: {len(results)} | Passed: {passed} | Failed: {failed} | Errors: {errors}")
    
    if failed == 0 and errors == 0:
        print("\n*** ALL REGRESSION TESTS PASSED ***")
    else:
        print("\n*** REGRESSION FAILURES DETECTED ***")
        
        if show_details:
            failed_results = [r for r in results if r.get("status") == "FAIL"]
            for r in failed_results:
                print("\n" + "-" * 100)
                print(f"FAILED: {r['cte_name']}")
                print(f"  Source: {r.get('source_view', 'N/A')} | Target: {r.get('target_view', 'N/A')}")
                print(f"  Rows in source only: {r.get('except_source_vs_target', 0):,}")
                print(f"  Rows in target only: {r.get('except_target_vs_source', 0):,}")
                print("-" * 100)
                
                # Show column summary if available
                col_summary = r.get("column_summary")
                if col_summary is not None and not col_summary.empty:
                    print(f"\nCOLUMNS CAUSING DIFFERENCES (sorted by impact):")
                    print("-" * 60)
                    print(f"{'Column Name':<40} {'Diff Count':>15}")
                    print("-" * 60)
                    for _, row in col_summary.iterrows():
                        print(f"{row['column_name']:<40} {row['difference_count']:>15,}")
                    print("-" * 60)
                    print(f"Total columns with differences: {len(col_summary)}")
                else:
                    sample_df = r.get("sample_df")
                    if sample_df is not None and not sample_df.empty:
                        print(f"\nTo see which COLUMNS cause differences, provide key_columns:")
                        print(f"  key_columns={{'{r['cte_name']}': ['your_primary_key_column']}}")
                        print(f"\nSample differing rows:")
                        display(sample_df)
                
                if r.get("column_summary_error"):
                    print(f"\nColumn analysis error: {r['column_summary_error']}")
    
    print("=" * 100)


def quick_compare(
    query_a: str,
    query_b: str,
    key_columns: Optional[Dict[str, List[str]]] = None,
    config: ComparisonConfig = None
) -> List[Dict[str, Any]]:
    """Quick comparison with summary output."""
    results = full_regression_compare(query_a, query_b, key_columns=key_columns, config=config)
    print_summary_table(results)
    return results


def investigate_column(
    view_a: str,
    view_b: str,
    key_columns: List[str],
    column_name: str,
    limit: int = 20
) -> None:
    """Deep investigation of a specific column's differences."""
    key_join = " AND ".join([f"a.{k} = b.{k}" for k in key_columns])
    key_str = ", ".join([f"a.{k}" for k in key_columns])
    
    # Count
    count_sql = f"""
    SELECT COUNT(*) as diff_count
    FROM {view_a} a
    INNER JOIN {view_b} b ON {key_join}
    WHERE a."{column_name}" IS DISTINCT FROM b."{column_name}"
    """
    diff_count = con.execute(count_sql).fetchone()[0]
    
    print("=" * 70)
    print(f"COLUMN INVESTIGATION: {column_name}")
    print("=" * 70)
    print(f"Total rows with different values: {diff_count:,}")
    
    # Distinct value pairs
    pairs_sql = f"""
    SELECT
        a."{column_name}" AS source_value,
        b."{column_name}" AS target_value,
        COUNT(*) AS occurrence_count
    FROM {view_a} a
    INNER JOIN {view_b} b ON {key_join}
    WHERE a."{column_name}" IS DISTINCT FROM b."{column_name}"
    GROUP BY a."{column_name}", b."{column_name}"
    ORDER BY occurrence_count DESC
    LIMIT 20
    """
    print("\nDistinct value pairs (source -> target):")
    display(con.execute(pairs_sql).df())
    
    # Sample rows
    sample_sql = f"""
    SELECT
        {key_str},
        a."{column_name}" AS source_{column_name},
        b."{column_name}" AS target_{column_name}
    FROM {view_a} a
    INNER JOIN {view_b} b ON {key_join}
    WHERE a."{column_name}" IS DISTINCT FROM b."{column_name}"
    LIMIT {limit}
    """
    print(f"\nSample rows (limit {limit}):")
    display(con.execute(sample_sql).df())


print("DuckDB comparison utilities loaded successfully")

## Run Regression Analysis

Compare the legacy and new queries to identify differences.

In [None]:
# Quick comparison with column-level analysis
print("=" * 80)
print("RUNNING QUICK COMPARE")
print("=" * 80)

results = quick_compare(
    QUERY_LEGACY,
    QUERY_NEW,
    key_columns={
        "order_totals": ["order_id"],
        "customer_metrics": ["customer_id"],
        "final": ["customer_id"]
    }
)

## Investigate Specific Columns

In [None]:
print("\n" + "=" * 80)
print("INVESTIGATE: customer_segment column")
print("=" * 80)

investigate_column(
    "a_final",
    "b_final",
    key_columns=["customer_id"],
    column_name="customer_segment"
)

In [None]:
print("\n" + "=" * 80)
print("INVESTIGATE: total_amount column")
print("=" * 80)

investigate_column(
    "a_final",
    "b_final",
    key_columns=["customer_id"],
    column_name="total_amount"
)

## Matching Query Test (Should Pass)

In [None]:
# Query that should match the legacy query exactly
QUERY_MATCHING = """
WITH order_totals AS (
    SELECT
        o.order_id,
        o.customer_id,
        o.order_date,
        o.status,
        SUM(oi.quantity * oi.unit_price) AS order_total
    FROM demo_orders o
    JOIN demo_order_items oi ON o.order_id = oi.order_id
    WHERE o.status NOT IN ('CANCELLED', 'PENDING')
    GROUP BY o.order_id, o.customer_id, o.order_date, o.status
),

customer_metrics AS (
    SELECT
        customer_id,
        COUNT(DISTINCT order_id) AS order_count,
        ROUND(SUM(order_total), 2) AS total_amount,
        ROUND(AVG(order_total), 2) AS avg_order_value,
        MIN(order_date) AS first_order_date,
        MAX(order_date) AS last_order_date
    FROM order_totals
    GROUP BY customer_id
),

final AS (
    SELECT
        c.customer_id,
        c.customer_name,
        c.tier,
        c.country,
        cm.order_count,
        cm.total_amount,
        cm.avg_order_value,
        cm.first_order_date,
        cm.last_order_date,
        CASE
            WHEN cm.total_amount >= 2000 THEN 'Premium'
            WHEN cm.total_amount >= 500 THEN 'High Value'
            WHEN cm.total_amount >= 100 THEN 'Standard'
            ELSE 'Low Value'
        END AS customer_segment
    FROM demo_customers c
    JOIN customer_metrics cm ON c.customer_id = cm.customer_id
)

SELECT * FROM final
ORDER BY customer_id
"""

print("=" * 80)
print("MATCHING QUERY TEST (should PASS)")
print("=" * 80)

results_match = quick_compare(
    QUERY_LEGACY,
    QUERY_MATCHING,
    key_columns={"final": ["customer_id"]}
)

## Summary

This demo showed:

1. **DuckDB as backend** - No Java/Spark required, just `pip install duckdb`
2. **Same functionality** - CTE parsing, view registration, EXCEPT-based comparison
3. **Column-level analysis** - Identifies exactly which columns cause differences
4. **Deep investigation** - `investigate_column()` shows value pairs and samples
5. **Pandas integration** - Results returned as DataFrames for easy analysis

In [None]:
# Cleanup
print("\nDemo completed!")
print("\nTo close connection, run: con.close()")

# Uncomment to close:
# con.close()