In [1]:
import warnings
warnings.filterwarnings("ignore")

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

import os
# Set environment variables (local paths)
os.environ["JAVA_HOME"] = "D:/Programs/Java"
os.environ["HADOOP_HOME"] = "D:/Programs/hadoop"
os.environ["SPARK_HOME"] = "D:/Programs/spark/spark-3.5.6-bin-hadoop3"  # Adjust if different

import findspark
findspark.init("D:/Programs/spark/spark-3.5.6-bin-hadoop3")

In [2]:
spark = (
    SparkSession
    .builder
    .appName("Testing Partitioning")
    .config("spark.sql.adaptive.enabled", "false") 
    .master("local[*]")
    .getOrCreate()
)
sc = spark.sparkContext
sc.setLogLevel("ERROR")

In [3]:
transactions_file = "../../data/transactions.parquet"
customers_file = "../../data/customers.parquet"

df_transactions = spark.read.parquet(transactions_file)
df_customers = spark.read.parquet(customers_file)

| Join Type       | Broadcast Enabled | AQE Enabled | Expected Join               |
| --------------- | ------------------| ----------- | --------------------------- |
| Sort-Merge Join | ❌ (disabled)      | ❌           | Sort-Merge                  |
| Broadcast Join  | ✅ (enabled)       | ❌           | Broadcast                   |
| AQE Join        | ✅ (enabled)       | ✅           | Adaptive (likely broadcast) |

### Sort Merge Join vs Broadcast Join

| Metric | **Sort Merge Join** | **Broadcast Hash Join** |
| --- | --- | --- |
| **Join Type Used** | SortMergeJoin | BroadcastHashJoin |
| **Broadcast Table Size** | N/A | 32.2 MiB |
| **Broadcast Build Time** | N/A | 218 ms |
| **Broadcast Collect Time** | N/A | 3.8 s |
| **Time to Broadcast** | N/A | 41 ms |
| **Input Data Size** | 870.7 MiB (Parquet source) | 870.7 MiB (Parquet source) |
| **Shuffle Read (Total)** | 2.4 GiB | 0 B (Broadcast avoids shuffling) |
| **Shuffle Write (Total)** | 2.4 GiB | 0 B |
| **Spill (Memory / Disk)** | 512 MiB / 140.5 MiB | 0 / 0 |
| **Max Shuffle Read (per task)** | 1.1 GiB | 0 |
| **Execution Time (Stage)** | ~5.1 min (Stage 4 alone was 3.3 min) | 1.1 min |
| **# of Output Rows (Join Output)** | 39,790,092 | 39,790,092 |
| **Number of Partitions in Join** | 200 | 15 (only from left side) |
| **Skew Detected** | Yes (Shuffle Read: 1.1 GiB max for one task) | No skew (no shuffling) |

#### **Key Observations and Inferences**

1. **Broadcast Join avoids shuffling**:
    - No shuffle read/write → no skew → less memory/disk spill.
    - All data from `df_customers` was broadcast (only 32.2 MiB).
2. **Sort Merge Join is expensive**:
    - High shuffle read/write → ~2.4 GiB in both directions.
    - Significant skew (some tasks reading 1.1 GiB).
    - Required sorting on both sides + high memory/disk usage.
    - Worst task took 3.1 minutes.
3. **Broadcast Join is more efficient for small dimension tables**:
    - Avoids shuffle.
    - Reduces execution time.
    - Reduces memory pressure and GC time.

#### Why Only 15 Tasks in Broadcast Join?

Because only the **left side** (fact table) was partitioned and parallelized. The right side (`df_customers`) was broadcast and handled by the driver. So execution happens only across partitions of the left table (`df_transactions`, which probably has 15 partitions after your latest transformation).

## 1. Sort-Merge Join Job

In [4]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)  # Disable broadcast join
spark.conf.set("spark.sql.adaptive.enabled", "false")       # Disable AQE
df_joined = df_transactions.join(df_customers, on="cust_id", how="inner")
df_joined.write.format("noop").mode("overwrite").save("output/sort_merge_join")

| Metric | Value |
| --- | --- |
| **Join Type Used** | SortMergeJoin |
| **Number of Output Rows** | 39,790,092 |
| **Total Execution Time** | 5.1 min |
| **Input Size (Combined)** | 870.7 MiB |
| **# of Tasks (Completed)** | 215 |
| **# of Partitions in Join** | 200 |

**First Exchange (Transactions Table)**
| Metric | Value |
| --- | --- |
| **Shuffle Write (MB)** | 2.4 GiB (~2457 MB) |
| **Shuffle Read (MB)** | 2.4 GiB (~2457 MB) |
| **# of Records Written** | 39,790,092 |
| **Peak Memory Used (Sort)** | 11.5 GiB (max 3.9 GiB per task) |
| **Spill Size (Sort)** | 512 MiB |

**Second Exchange (Customers Table)**
| Metric | Value |
| --- | --- |
| **Shuffle Write (MB)** | 383.8 KiB (~0.375 MB) |
| **Shuffle Read (MB)** | 712.3 KiB (~0.695 MB) |
| **# of Records Written** | 5,000 |
| **Peak Memory Used (Sort)** | 6.3 GiB |
| **Spill Size (Sort)** | 0.0 B |

### Inference & Observations

1. **Why high shuffle size for Sort-Merge Join?**
    - SMJ requires both sides of the join to be **sorted** and **shuffled by join key** (`cust_id`). This causes **full data shuffles** on both sides (especially the large `transactions` DataFrame).
2. **Why two exchanges?**
    - One `Exchange` for sorting and shuffling `transactions`.
    - One `Exchange` for sorting and shuffling `customers`.
3. **Memory Pressure:**
    - One of the sort stages spilled to disk (512 MB) — indicates **insufficient memory for sorting** some partitions.
4. **Join Output:**
    - You got ~40 million joined rows. Despite smaller `customers` table, Spark didn’t use Broadcast Join because auto broadcast was **disabled**.
5. **Total execution time is high** due to:
    - Large shuffle read/write (especially 2.4 GiB).
    - Sorting overhead (spill to disk).
    - 200 tasks running under memory pressure.

#### **Q1: Why was Shuffle Read 2.4 GiB if the data was only ~800 MB?**

> Because the same data is read and shuffled across multiple partitions and tasks — not just once.
> 
- You **repartitioned** your data into **200 partitions** (we see `number of partitions: 200`).
- Your 800 MB dataset is **shuffled and split** into 200 parts.
- Every task in a downstream stage **may read shuffle blocks from multiple upstream partitions** (i.e., data is *fanned out*).
- This means **some shuffle blocks get copied multiple times**, and **shuffle read includes all bytes read across all tasks**, not just unique bytes.
- So:
    
    👉 Actual dataset = 800 MB
    
    👉 Shuffle write (unique data shuffled) = 2.4 GiB
    
    👉 Shuffle read (total across all tasks) = 2.4 GiB
    
- It includes:
    - Multiple tasks reading overlapping data
    - Possible internal serialization and deserialization overheads
    - Metadata, small file overhead, etc.

#### Evidence of Skew from the Metrics

| Metric | Median | Max | Skew Indicator? |
| --- | --- | --- | --- |
| **Duration** | 0.7 s | **3.1 min** | ✅ Yes (very high max) |
| **Shuffle Read Size** | 6.7 MiB | **1.1 GiB** | ✅ Yes (huge diff) |
| **Shuffle Read Records** | 110,390 | **~1.76 million** | ✅ Yes (16x higher) |
| **Spill (disk)** | 0 | **140.5 MiB** | ✅ Yes (spill only on 1 task) |
    
- **Root Cause in Your Case**
    
    Based on the physical plan and exchange metrics, this likely happened during the **SortMergeJoin**:
    
    - You had **200 partitions**.
    - But **some keys (like a `cust_id`) occurred way more frequently** than others.
    - That **caused one or more partitions to carry massive amounts of data**.
    
    In fact:
    
    - One task processed **1.1 GiB of shuffle read** (vs 6.7 MiB median).
    - It took **3.1 minutes**, while most tasks finished in <1 s.
    
    That’s a textbook example of **partition-level skew**.
    
- When does this happen most often?
    - During **joins**, when one side has **skewed key distribution**.
    - Especially when doing **SortMergeJoin**, because data has to be repartitioned and sorted by key.
    - If the join key (e.g., `cust_id`) has a few very **"hot" values**, they dominate the shuffle.

## 2. Broadcast Join Job

In [5]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10 * 1024 * 1024)  # 10 MB
spark.conf.set("spark.sql.adaptive.enabled", "false")                    # Disable AQE
df_joined = df_transactions.join(F.broadcast(df_customers), on="cust_id", how="inner")
df_joined.write.format("noop").mode("overwrite").save("output/broadcast_join")

 Details
== Physical Plan ==
OverwriteByExpression (10)
+- * Project (9)
   +- * BroadcastHashJoin Inner BuildRight (8)
      :- * Filter (3)
      :  +- * ColumnarToRow (2)
      :     +- Scan parquet  (1)
      +- BroadcastExchange (7)
         +- * Filter (6)
            +- * ColumnarToRow (5)
               +- Scan parquet  (4)

## 3. AQE-Enabled Join Job

In [6]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10 * 1024 * 1024)  # Enable broadcast threshold
spark.conf.set("spark.sql.adaptive.enabled", "true")                     # Enable AQE
df_joined = df_transactions.join(df_customers, on="cust_id", how="inner")
df_joined.write.format("noop").mode("overwrite").save("output/aqe_join")

== Physical Plan ==
OverwriteByExpression (18)
+- AdaptiveSparkPlan (17)
   +- == Final Plan ==
      ResultQueryStage (11), Statistics(sizeInBytes=8.0 EiB)
      +- * Project (10)
         +- * BroadcastHashJoin Inner BuildRight (9)
            :- * Filter (3)
            :  +- * ColumnarToRow (2)
            :     +- Scan parquet  (1)
            +- BroadcastQueryStage (8), Statistics(sizeInBytes=32.2 MiB, rowCount=5.00E+3)
               +- BroadcastExchange (7)
                  +- * Filter (6)
                     +- * ColumnarToRow (5)
                        +- Scan parquet  (4)

| **Metric** | **AQE Join** |
| --- | --- |
| **Join Type Used** | BroadcastHashJoin (picked by AQE) |
| **Execution Time** | 33 seconds |
| **Input Data Size (Left Table)** | 862.7 MiB |
| **Input Rows (Left Table)** | 39,790,092 |
| **Input Data Size (Right Table)** | 14.99 KiB (post-filter), broadcast size 32.2 MiB |
| **Broadcast Build Time** | 73 ms |
| **Broadcast Collect Time** | 1.6 s |
| **Time to Broadcast** | 41 ms |
| **Shuffle Read / Write** | None (Broadcast avoids shuffle) |
| **Join Output Rows** | 39,790,092 |
| **Spill** | No memory or disk spill |
| **Skew Handling** | Not needed — AQE chose optimal path |

- **AQE (Adaptive Query Execution)** smartly **picked Broadcast Join** because:
    - `df_customers` (right table) is only **32.2 MiB**, ideal for broadcast.
    - Avoids shuffle, thus avoiding skew and spill entirely.
    - Executes much faster than manual Sort-Merge Join (5.1–5.6 min).
- **Execution Time**: **33s** — fastest among all 3 join methods.
- **Why AQE is powerful**:
    - Inspects runtime statistics (e.g. actual size of `df_customers` after filter).
    - Dynamically selects the **best join strategy** based on actual data (not estimates).
    - Helps **reduce skew**, optimize **partitioning**, and eliminate **unnecessary shuffles**.


| Metric | Sort Merge Join | Manual Broadcast Join | AQE Join (Auto) |
| --- | --- | --- | --- |
| Join Type | SortMergeJoin | BroadcastHashJoin | BroadcastHashJoin |
| Shuffle Read | 2.4 GiB | 0 | 0 |
| Shuffle Write | 2.4 GiB | 0 | 0 |
| Broadcast Size | N/A | 32.2 MiB | 32.2 MiB |
| Execution Time | ~5.1–5.6 min | 1.1 min | **33s** |
| Spill (Memory / Disk) | 512 MiB / 140 MiB | 0 / 0 | 0 / 0 |
| Max Task Skew | 1.1 GiB read | None | None |
| Output Rows | 39,790,092 | 39,790,092 | 39,790,092 |
| Strategy Adapted at Runtime | ❌ | ❌ | ✅ |

In [7]:
spark.stop()