# Spark SQL Workshop: Advanced join & group by techniques

## Setup data

In [None]:
%%bash
python ./generate_data.py
python ./run_ddl.py

In [None]:
!jupyter labextension install jupyterlab-mermaid

In [None]:
spark

In [None]:
%%sql
use prod.db

## [Quick refresher] Facts & Dimensions & Join Types

1. `Fact` tables containing information about how dimensions interact with each other in real life. Example: An order fact is an interaction between a customer and a seller involving one or more products. E.g. `Lineitem` & `Orders`.
2. `Dimension` tables store data for a business entity (e.g., customer, product, partner, etc). These tables describe the ‘who’ and ‘what’ types of questions. For example, which stores had the highest revenue yesterday? In this question, stores will be the dimension. E.g., `Customer`, `Supplier`

**Data used for exercises**

![TPCH](./images/tpch.png)

**Common Join Types (in order of real life usage):**

1. **Inner Join(default)**: Get only rows in both tables
2. **Left outer Join**: Get all rows from the left table and only matching rows from the right table. Used to enrich a table with information.
3. **Full outer Join**: Get all rows from both the left and right tables. Used for data validataion

![Joins](./images/join.png)


In [None]:
%%sql
SELECT
  YEAR (o.o_orderdate) AS order_year,
  c.c_name,
  AVG(o.o_totalprice) AS avg_order_price
FROM
  orders o
  LEFT JOIN customer c ON o.o_custkey = c.c_custkey
GROUP BY
  YEAR (o.o_orderdate),
  c.c_custkey,
  c.c_name
ORDER BY
  1 desc,
  c_custkey
LIMIT
  10

```mermaid
flowchart LR
    subgraph A["Orders (Fact Table) - Wider"]
        A1["o_custkey: 1001 | o_orderdate: 2023-01-15 | o_totalprice: 75000.00"]
        A2["o_custkey: 1002 | o_orderdate: 2023-02-20 | o_totalprice: 120000.00"]
        A3["o_custkey: 1001 | o_orderdate: 2023-03-10 | o_totalprice: 95000.00"]
        A4["o_custkey: 1003 | o_orderdate: 2023-04-05 | o_totalprice: 85000.00"]
        A5["o_custkey: 9999 | o_orderdate: 2023-05-12 | o_totalprice: 50000.00"]
    end
    
    subgraph B["Customer (Dimension Table)"]
        B1["c_custkey: 1001 | c_name: John Smith"]
        B2["c_custkey: 1002 | c_name: Jane Doe"]
        B3["c_custkey: 1003 | c_name: Bob Wilson"]
    end
    
    subgraph C["Result"]
        C1["order_year: 2023 | c_name: John Smith | avg_order_price: 85000.00"]
        C2["order_year: 2023 | c_name: Jane Doe | avg_order_price: 120000.00"]
        C3["order_year: 2023 | c_name: Bob Wilson | avg_order_price: 85000.00"]
        C4["order_year: 2023 | c_name: NULL | avg_order_price: 50000.00"]
    end
    
    A1 -.->|matches| B1
    A2 -.->|matches| B2
    A3 -.->|matches| B1
    A4 -.->|matches| B3
    A5 -.->|no match| C4
    
    B1 -->|contributes to| C1
    B2 -->|contributes to| C2
    B3 -->|contributes to| C3
    
    style A fill:#e74c3c,stroke:#c0392b,color:#fff,stroke-width:3px
    style B fill:#3498db,stroke:#2980b9,color:#fff,stroke-width:2px
    style C fill:#27ae60,stroke:#229954,color:#fff,stroke-width:2px
```

## [Joins] can also be used to find differences in datasets

- While `joins` are typically used to combine tables, they can also be used to find differences in datasets

- When joining tables, there is usually one table called the `driver/base` table to which other tables are joined. Different people/teams have a different term for this.

- Join based differences are typically used to assed the `completeness` of a dataset.


### Find data in a table that is not part of another table with `anti join`

- When you need to get rows that are in one table but not in another, use `anti join`

- You can get the rows from the left table that do not have any matches from the right table


#### Exercise ( 5 min )
1. In the below query, get all the data from `orders` CTE that is not in `lineitem` CTE.

```mermaid
flowchart LR
    subgraph A["Orders Table"]
        A1["`**o_orderkey | o_custkey**
        001 | 1001
        002 | 1002
        003 | 1003
        004 | 1004
        005 | 1005`"]
    end
    
    subgraph B["LineItem Table"]
        B1["`**l_orderkey | l_partkey**
        001 | 2001
        003 | 2003
        005 | 2004`"]
    end
    
    subgraph C["Result"]
        C1["`**o_orderkey | o_custkey**
        002 | 1002
        004 | 1004`"]
    end
    
    A1 -->|ANTI JOIN| C1
    B1 -.->|"ON o_orderkey = l_orderkey"| C1
    
    style A fill:#3498db,stroke:#2980b9,color:#fff
    style B fill:#27ae60,stroke:#229954,color:#fff
    style C fill:#e74c3c,stroke:#c0392b,color:#fff
```

In [None]:
%%sql
WITH
  orders AS (
    SELECT
      001 AS o_orderkey,
      1001 AS o_custkey
    UNION ALL
    SELECT
      002 AS o_orderkey,
      1002 AS o_custkey
    UNION ALL
    SELECT
      003 AS o_orderkey,
      1003 AS o_custkey
    UNION ALL
    SELECT
      004 AS o_orderkey,
      1004 AS o_custkey
    UNION ALL
    SELECT
      005 AS o_orderkey,
      1005 AS o_custkey
  ),
  lineitem AS (
    SELECT
      001 AS l_orderkey,
      2001 AS l_partkey
    UNION ALL
    SELECT
      003 AS l_orderkey,
      2003 AS l_partkey
    UNION ALL
    SELECT
      005 AS l_orderkey,
      2004 AS l_partkey
  )
-- Your query here

    
### Find out if a table had data in another table `asof` a specific time with `asof join` (not available in Spark)

- When you need to join rows based on how close they are in time.

- Usually used when you need to get the "latest" price or state from a fact table. This type of join is generally used between fact tables.
                                                                               
- Also referred to as point-in-time join


#### Exercise ( 5 min )
1. In the below query, get the `symbol, company, listing_date` from `stock` CTE, and for the stock, get their price as of asof the `listing_date`.

*Note* Assume the `price_tracker` CTE is a fact table where every change to the stocks is added to (typically, this is in milliseconds, but for simplicity, we keep it at a day level)

```mermaid
flowchart LR
    subgraph A["Stock Table"]
        A1["AAPL | Apple Inc. | 2024-01-15"]
        A2["GOOGL | Alphabet Inc. | 2024-01-26"]
        A3["MSFT | Microsoft Corp. | 2024-02-01"]
    end
    
    subgraph B["Price Tracker Table"]
        B1["AAPL | 150.00 | 2024-01-10"]
        B2["AAPL | 155.00 | 2024-01-20"]
        B3["GOOGL | 2800.00 | 2024-01-25"]
        B4["GOOGL | 2850.00 | 2024-02-05"]
        B5["MSFT | 400.00 | 2024-02-10"]
    end
    
    subgraph C["Result"]
        C1["AAPL | Apple Inc. | 150.00 | 2024-01-10"]
        C2["GOOGL | Alphabet Inc. | 2800.00 | 2024-01-25"]
        C3["MSFT | Microsoft Corp. | None | None"]
    end
    
    A1 -->|valid: price_date ≤ listing_date| B1
    A1 -.->|invalid: 2024-01-20 > 2024-01-15| B2
    A2 -->|valid: price_date ≤ listing_date| B3
    A2 -.->|invalid: 2024-02-05 > 2024-01-26| B4
    A3 -.->|invalid: 2024-02-10 > 2024-02-01| B5
    
    B1 -->|latest valid price| C1
    B3 -->|latest valid price| C2
    
    
    style A fill:#3498db,stroke:#2980b9,color:#fff
    style B fill:#27ae60,stroke:#229954,color:#fff
    style C fill:#e74c3c,stroke:#c0392b,color:#fff
    style B2 fill:#95a5a6,stroke:#7f8c8d,color:#fff
    style B4 fill:#95a5a6,stroke:#7f8c8d,color:#fff
```

In [None]:
%%sql
WITH
  stock AS (
    SELECT
      'AAPL' AS symbol,
      'Apple Inc.' AS company,
      '2024-01-15' AS listing_date
    UNION ALL
    SELECT
      'GOOGL' AS symbol,
      'Alphabet Inc.' AS company,
      '2024-01-26' AS listing_date
    UNION ALL
    SELECT
      'MSFT' AS symbol,
      'Microsoft Corp.' AS company,
      '2024-02-01' AS listing_date
  ),
  price_tracker AS (
    SELECT
      'AAPL' AS symbol,
      150.00 AS price,
      '2024-01-10' AS price_date
    UNION ALL
    SELECT
      'AAPL' AS symbol,
      155.00 AS price,
      '2024-01-20' AS price_date -- this is the latest price for apple and will be picked
    UNION ALL
    SELECT
      'GOOGL' AS symbol,
      2800.00 AS price,
      '2024-01-25' AS price_date
    UNION ALL
    SELECT
      'GOOGL' AS symbol,
      2850.00 AS price,
      '2024-02-05' AS price_date -- this is the latest price for google and will be picked
    UNION ALL
    SELECT
      'MSFT' AS symbol,
      400.00 AS price,
      '2024-02-10' AS price_date -- this is the latest price for microsoft and will be picked
  ),
 -- Your query here

**Hint:** Get all the prices for a symbol and filter to the latest one


### Join results depend on referential integrity (aka `foreign key` relationships)

- In a data warehouse, some upstream tables land sooner than others

- When you `inner join` a quick table with a slow table, you will lose data

- For example, if your order data arrives much quicker than customer data, your joins will either produce nulls (left join) or not be included in the output (inner joins)

- Usually, an `UNKNOWN` catch-all is used for the missing data; you can also re-run the pipeline to reconcile when the slow data lands


#### Exercise ( 10 min )

1. Assume you have to join orders (loaded into our warehouse every 5 minutes) and customers (loaded into our warehouse every 6 hours) tables; how do you ensure that the results of your join are **as expected**? 


2. What will you do if you find the table(s) are incomplete?

In [None]:
%%sql
WITH
  latest_orders AS (
    SELECT
      *
    FROM
      orders
    UNION ALL
    SELECT
      9999999 AS o_orderkey,
      8888888 AS o_custkey,
      'O' AS o_orderstatus,
      1500000.00 AS o_totalprice,
      '2024-06-14' AS o_orderdate,
      '1-URGENT' AS o_orderpriority,
      'Clerk#000000999' AS o_clerk,
      0 AS o_shippriority,
      'New order for non-existent customer' AS o_comment
  )
SELECT
  o.*,
  c.* -- What would you do with these NULLs?
FROM
  latest_orders o
  LEFT JOIN customer c ON o.o_custkey = c.c_custkey
WHERE
  c.c_custkey IS NULL

**Hint**: Start by defining what `as expected` means.

```mermaid
graph LR
    A[Order Data Stream] --> B[Order data ingested in 5 min chunks]
    C[Customer Data Dump] --> D[Customer data loaded every 6h]
    
    B --> F[Join Processing]
    D --> F
    
    F --> G{Join Type}
    
    G -->|LEFT JOIN| H[Orders LEFT JOIN Customers]
    G -->|INNER JOIN| I[Orders INNER JOIN Customers]
    
    H --> J[Result: Orders with NULL customer fields<br/>- Order ID: 123, Customer Name: NULL<br/>- Order ID: 124, Customer Name: NULL]
    
    I --> K[Result: Empty or partial results<br/>- Missing orders without customer data<br/>- Delayed processing until customers arrive]
    
    style A fill:#e1f5fe
    style C fill:#f3e5f5
    style J fill:#ffebee
    style K fill:#fff3e0
```

### Common data issues that create bad outputs when joining

- Ensure that your table(s) have a single grain before joining them together.

- Handle slow and fast data joins based on the use case

- Be careful if your join keys have NULLs, NULL != NULL; But you can use null safe equality <=>. Be very mindful as NULL can represent any number of things.
                                
- Be mindful of applying functions in join criteria; they can impact performance significantly and interferce with query planning in your DB engine. Not incorrect, but will slow down your joins

In [7]:
%%sql 
WITH foo_tbl as (
    select 1 as id
    , 'foo' as data_value
    UNION ALL
    select 2 as id
    , NULL as data_value
),
 bar_tbl as (
    select 1 as id
    , 'bar' as data_value
    UNION ALL
    select 2 as id
    , NULL as data_value
)
select f.*
from foo_tbl f
join bar_tbl b
on f.id = b.id and f.data_value <=> b.data_value

id,data_value
2,


## [Group bys] can be used for validating assumptions about data, creating nested data structures, & Excel like reporting


### Check the distribution of dimensions (date, state, etc) to validate assumptions about your data

#### Exercise ( 5 min )

Is the data representative of real-world customer numbers?

In [None]:
%%sql
SELECT
  n.n_name AS nation_name,
  COUNT(*) AS num_customers
FROM
  customer c
  LEFT JOIN nation n ON c.c_nationkey = n.n_nationkey
GROUP BY
  n.n_name
ORDER BY
  num_customers desc
LIMIT
  10

-- your observations here

### Check unique key constraints
* most warehouses allow you to define PK but don't enforce them

#### Exercise ( 5 min ) 

How would you use the `group by` clause to verify that the' c_custkey' column in the customer table is unique?

In [None]:
%%sql
-- your query here, return some data if there are non-unique customer key

### Aggregation functions beyond the standard count/min/max/avg/sum

- `Statistical agg`: Functions like correlation, sampling, standard deviation, skew, etc

- `Collection agg`: Functions to combine values into nested data types, e.g., array_agg, collect_set, etc

- `Approximation agg`: Functions that are fast by sacrificing accuracy, e.g., approx_distinct, approx_most_frequent

- `Convenience agg`: Functions that make common usages easier, e.g., count_if, bool_or, etc

While you can use your own logic to replicate some of the above functions, in-built functions are generally stable and well-tested.

In [None]:
%%sql
SELECT
  YEAR (o_orderdate) AS yr,
  SUM(
    CASE
      WHEN o_orderpriority = '5-LOW' THEN 1
      ELSE 0
    END
  ) AS num_low_orders,
  count_if (o_orderpriority = '5-LOW') AS num_low_orders_easy -- Convenience agg
FROM
  orders
GROUP BY
  1
ORDER BY
  1 desc

In [None]:
%%sql
SELECT
  l_orderkey,
  collect_list (l_linenumber) AS line_number,
  collect_list (
    struct (
      l_linenumber AS line_number,
      l_quantity AS quantity,
      l_extendedprice AS price
    )
  ) AS line_details -- Structured output with types
FROM
  lineitem
GROUP BY
  1
ORDER BY
  l_orderkey
LIMIT
  10

```mermaid
flowchart LR
    subgraph A["LineItem Table (Input)"]
        A1["orderkey: 1 | linenumber: 1 | quantity: 17 | price: 26734.03"]
        A2["orderkey: 1 | linenumber: 2 | quantity: 36 | price: 57191.40"]
        A3["orderkey: 1 | linenumber: 3 | quantity: 8 | price: 14254.80"]
        A4["orderkey: 2 | linenumber: 1 | quantity: 38 | price: 39447.04"]
        A5["orderkey: 3 | linenumber: 1 | quantity: 45 | price: 47301.30"]
        A6["orderkey: 3 | linenumber: 2 | quantity: 49 | price: 69947.99"]
    end
    
    subgraph C["Result (Aggregated)"]
        C1["orderkey: 1<br/>line_numbers: [1,2,3,4,5,6]<br/>line_details: [struct(1,17,26734), struct(2,36,57191), ...]"]
        C2["orderkey: 2<br/>line_numbers: [1]<br/>line_details: [struct(1,38,39447)]"]
        C3["orderkey: 3<br/>line_numbers: [1,2,3,4,5,6]<br/>line_details: [struct(1,45,47301), struct(2,49,69947), ...]"]
    end
    
    A1 -.-> C1
    A2 -.-> C1
    A3 -.-> C1
    A4 --> C2
    A5 -.-> C3
    A6 -.-> C3
    
    style A fill:#3498db,stroke:#2980b9,color:#fff
    style C fill:#27ae60,stroke:#229954,color:#fff
    
    style A fill:#3498db,stroke:#2980b9,color:#fff
    style C fill:#27ae60,stroke:#229954,color:#fff
```

In [None]:
%%sql
WITH
  order_details AS (
    SELECT
      l_orderkey,
      collect_list (l_linenumber) AS line_number,
      collect_list (
        struct (
          l_linenumber AS line_number,
          l_quantity AS quantity,
          l_extendedprice AS price
        )
      ) AS line_details
    FROM
      lineitem
    GROUP BY
      1
  )
SELECT
  l_orderkey,
  exploded_detail.line_number,
  exploded_detail.quantity,
  exploded_detail.price
  -- , explode(line_number) as individual_line_number
FROM
  order_details LATERAL VIEW explode (line_details) t AS exploded_detail
ORDER BY
  l_orderkey
LIMIT
  20

#### Exercise ( 5 min )

Try the above query by uncommenting the `explode(line_number)` line. What do you think is happening?

### Group by variations for Excel-like reporting

- ROLL UPs, CUBE, and GROUPING SETS are short-hand versions of GROUP BY variations typically used for reporting

| Operation Type | SQL Syntax | Equivalent GROUP BY Combinations |
|---------------|------------|----------------------------------|
| **CUBE** | `GROUP BY CUBE(region, category)` | `GROUP BY region, category`<br/>`UNION ALL`<br/>`GROUP BY region`<br/>`UNION ALL`<br/>`GROUP BY category`<br/>`UNION ALL`<br/>`GROUP BY ()` |
| **ROLLUP** | `GROUP BY ROLLUP(region, category)` | `GROUP BY region, category`<br/>`UNION ALL`<br/>`GROUP BY region`<br/>`UNION ALL`<br/>`GROUP BY ()` |
| **GROUPING SETS** | `GROUP BY GROUPING SETS((region, category), (region), ())` | `GROUP BY region, category`<br/>`UNION ALL`<br/>`GROUP BY region`<br/>`UNION ALL`<br/>`GROUP BY ()` |

**Key Differences**

- **CUBE**: All possible combinations (2^n sets)
- **ROLLUP**: Hierarchical combinations (n+1 sets) based on the order of group by columns
- **GROUPING SETS**: Custom-defined combinations (exactly what you specify)



In [None]:
%%sql
CREATE
OR REPLACE TEMPORARY VIEW sales AS
SELECT
  'North' AS region,
  'Electronics' AS category,
  100 AS amount
UNION ALL
SELECT
  'North' AS region,
  'Clothing' AS category,
  50 AS amount
UNION ALL
SELECT
  'South' AS region,
  'Electronics' AS category,
  80 AS amount
UNION ALL
SELECT
  'South' AS region,
  'Clothing' AS category,
  70 AS amount;

In [None]:
%%sql
-- ROLLUP: Hierarchical aggregation (region -> category -> total)
SELECT
  region,
  category,
  SUM(amount) AS total_sales
FROM
  sales
GROUP BY
  ROLLUP (region, category)
ORDER BY
  region,
  category;

In [None]:
%%sql
-- CUBE: All possible combinations
SELECT
  region,
  category,
  SUM(amount) AS total_sales
FROM
  sales
GROUP BY
  CUBE (region, category)
ORDER BY
  region,
  category;

In [None]:
%%sql
-- GROUPING SETS: Custom combinations
SELECT
  region,
  category,
  SUM(amount) AS total_sales
FROM
  sales
GROUP BY
  GROUPING SETS (
    (region, category), -- detailed
    (region), -- by region only
    () -- grand total only
  )
ORDER BY
  region,
  category;

### Caveats when doing group bys: duplication, incorrect data types, additive/non-additive numbers, etc

- Are you using 'Group by' to remove duplicates? This usually indicates a problem with your underlying data model

- Ensure that the numbers you are aggregating are of the correct data types (e.g., a number stored as a string).

- Be mindful of additive and non-additive numbers (e.g. percentages, distinct counts)

#### Exercise ( 5 min )

Inspect the query below; what is wrong with the logic? 

How would you fix it?

In [None]:
%%sql
-- CTE: Unique suppliers per day
WITH
  daily_suppliers AS (
    SELECT
      DATE (l_shipdate) AS ship_date,
      COUNT(DISTINCT l_suppkey) AS daily_unique_suppliers
    FROM
      lineitem
    GROUP BY
      DATE (l_shipdate)
  )
SELECT
  YEAR (d.ship_date) AS ship_year,
  SUM(d.daily_unique_suppliers) AS yearly_total
FROM
  daily_suppliers d
GROUP BY
  YEAR (d.ship_date)
ORDER BY
  ship_year;

In [None]:
%%sql
-- Your query here

## Recap

We saw

1. How to use joins to find differences in datasets
    * Find data in a table that is not part of another table with `anti join`
    * Find out if a table had data in another table `asof` a specific time with `asof join` (not available in Spark)
    * Join results depend on referential integrity (aka `foreign key` relationships)
    * Common data issues that create bad outputs when joining
2. Using Group bys for validating assumptions about data, creating nested data structures, & Excel like reporting
    * Check the distribution of dimensions (date, state, etc) to validate assumptions about your data
    * Check unique key constraints
    * Aggregation functions beyond standard aggregations
    * Group by variations for Excel-like reporting
    * Caveats to watch out for with group bys                                          

## Checkout > **[Advanced Spark SQL for Data Engineers](https://josephmachado.podia.com/advanced-spark-sql-workshop-for-data-engineers)**
## Next steps
1. **[Feedback](https://form.typeform.com/to/f51flAI1)**

## Recommended reading

1. [SQL for data engineers](https://www.startdataengineering.com/post/improve-sql-skills-de/)
2. [SQL or Python for data processing](https://www.startdataengineering.com/post/sql-v-python/)
3. [dbt tutorial](https://www.startdataengineering.com/post/dbt-data-build-tool-tutorial/)
4. [Build a data project with step-by-step instructions](https://www.startdataengineering.com/post/de-proj-step-by-step/)
