# Workshop: Introduction to Spark UI
## Learn to Monitor and Understand Your Spark Jobs

**Duration:** 1 hour  
**Level:** Complete beginner (no Spark knowledge required)

---

## What You'll Learn

By the end of this workshop, you will be able to:
1. Understand what Spark is doing when you run code
2. Find and read the Spark UI dashboard
3. Identify jobs, stages, and tasks
4. Spot performance issues in your Spark applications

---
# Part 1: Starting Spark

Before we can do anything, we need to start Spark. Run the cell below.

## Start spark session

In [1]:
# 1.1: Start Spark
from pyspark.sql import SparkSession

# Stop any existing Spark session
if "spark" in globals():
    globals()["spark"].stop()

# Create a new Spark session
spark = (
    SparkSession.builder
    .appName("Spark-UI-Workshop")
    .master("spark://127.0.0.1:7077")
    .config("spark.ui.port", "4040")
    .config("spark.ui.host", "0.0.0.0")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.bindAddress", "0.0.0.0")
    .config("spark.executor.memory", "512m")
    .config("spark.executor.cores", "8")
    .config("spark.cores.max", "8")
    .config("spark.sql.adaptive.enabled", "false")
    .getOrCreate()
)

print("Spark is ready!")
print(f"Version: {spark.version}")
print("\n" + "="*50)
print("ACTION REQUIRED: Open the Spark UI")
print("="*50)
print("""
Open this URL in your browser:

  >>> http://localhost:4040 <<<

You should see the Spark UI with these tabs:
- Jobs      (we'll focus on this first)
- Stages  
- Storage
- Environment
- Executors
- SQL

Go to the JOBS tab now.
It should be EMPTY - no jobs yet!

Keep this browser tab open. We'll check it after each step.

NOTE: We disabled AQE (Adaptive Query Execution) for this workshop
to make the Spark UI easier to understand. One action = one job!
""")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/30 12:28:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark is ready!
Version: 3.5.0

ACTION REQUIRED: Open the Spark UI

Open this URL in your browser:

  >>> http://localhost:4040 <<<

You should see the Spark UI with these tabs:
- Jobs      (we'll focus on this first)
- Stages  
- Storage
- Environment
- Executors
- SQL

Go to the JOBS tab now.
It should be EMPTY - no jobs yet!

Keep this browser tab open. We'll check it after each step.

NOTE: We disabled AQE (Adaptive Query Execution) for this workshop
to make the Spark UI easier to understand. One action = one job!



## 1.2: Understanding Spark Architecture

Before we start coding, let's understand how Spark works.

---

### The Components

```
    YOUR CODE (this notebook)
           │
           ▼
    ┌─────────────────┐
    │     DRIVER      │  ← Your "control center"
    │                 │    - Runs your Python code
    │  SparkSession   │    - Plans how to execute queries
    │  SparkContext   │    - Coordinates the executors
    └────────┬────────┘    - Collects results back to you
             │
             │ sends tasks
             ▼
    ┌─────────────────┐
    │     MASTER      │  ← The "manager" (spark://127.0.0.1:7077)
    │                 │    - Knows which workers are available
    │  Cluster Mgr    │    - Assigns resources to applications
    └────────┬────────┘
             │
             │ manages
             ▼
    ┌─────────────────────────────────────────────────────┐
    │                    EXECUTORS                        │
    │                                                     │
    │  ┌───────────┐  ┌───────────┐  ┌───────────┐        │
    │  │ Executor 1│  │ Executor 2│  │ Executor 3│  ...   │
    │  │           │  │           │  │           │        │
    │  │  [Task]   │  │  [Task]   │  │  [Task]   │        │
    │  │  [Task]   │  │  [Task]   │  │  [Task]   │        │
    │  └───────────┘  └───────────┘  └───────────┘        │
    │                                                     │
    │  ← The "workers" that actually process your data    │
    │    - Each executor runs on a machine                │
    │    - Has its own memory and CPU cores               │
    │    - Executes tasks in parallel                     │
    └─────────────────────────────────────────────────────┘
```

**In OUR setup (Spark Standalone on Docker):**
- Master: `spark://127.0.0.1:7077`
- We have 1 executor with 8 cores and 512MB memory
- Driver: This notebook (running locally)

---

### Jobs, Stages, and Tasks

When you run code, Spark breaks it down into smaller pieces:

```
┌─────────────────────────────────────────────────────────────────┐
│                           JOB                                   │
│                                                                 │
│   Created when you call an ACTION (show, count, collect, etc)   │
│                                                                 │
│   ┌─────────────────────────────────────────────────────────┐   │
│   │                      STAGE 1                            │   │
│   │                                                         │   │
│   │   A group of operations that can run WITHOUT shuffling  │   │
│   │                                                         │   │
│   │   ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐          │   │
│   │   │Task 1│ │Task 2│ │Task 3│ │Task 4│ │ ...  │          │   │
│   │   └──────┘ └──────┘ └──────┘ └──────┘ └──────┘          │   │
│   │                                                         │   │
│   │   Tasks run IN PARALLEL on different data partitions    │   │
│   └─────────────────────────────────────────────────────────┘   │
│                           │                                     │
│                    SHUFFLE (data exchange)                      │
│                           │                                     │
│                           ▼                                     │
│   ┌─────────────────────────────────────────────────────────┐   │
│   │                      STAGE 2                            │   │
│   │                                                         │   │
│   │   ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐          │   │
│   │   │Task 1│ │Task 2│ │Task 3│ │Task 4│ │ ...  │          │   │
│   │   └──────┘ └──────┘ └──────┘ └──────┘ └──────┘          │   │
│   └─────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘
```

**Summary:**
- **JOB** = The whole computation (triggered by an action)
- **STAGE** = A piece of work that doesn't need data shuffling
- **TASK** = The smallest unit of work (1 task = 1 partition)
- **SHUFFLE** = Moving data between executors (expensive!)

---

### What Causes a New Stage?

| Operation | Why it needs a shuffle |
|-----------|------------------------|
| `groupBy()` | Needs to group all rows with same key together |
| `join()` | Needs to bring matching rows together |
| `orderBy()` | Needs to sort across all data |
| `repartition()` | Redistributes data across partitions |

**Example:**
```python
df.filter(...).select(...).groupBy(...).count()
```

```
STAGE 1                          STAGE 2
┌─────────────────────────┐      ┌─────────────────────────┐
│ filter                  │      │ groupBy (final combine) │
│   ↓                     │      │   ↓                     │
│ select                  │      │ count                   │
│   ↓                     │      │                         │
│ groupBy (partial agg)   │      │                         │
│   ↓                     │      │                         │
│ write shuffle files     │ ───► │ read shuffle files      │
└─────────────────────────┘      └─────────────────────────┘
                          SHUFFLE
                       (data moves!)
```

The `groupBy` is split across stages:
1. **Stage 1**: Does partial aggregation + writes shuffle files
2. **Shuffle**: Data physically moves between executors
3. **Stage 2**: Reads shuffled data + final aggregation

---

### Quick Reference

| Concept | What is it? | How many? |
|---------|-------------|-----------|
| **JOB** | Triggered by an action | 1 per action |
| **STAGE** | Separated by shuffles | 1+ per job |
| **TASK** | Processes one partition | 1 per partition |
| **PARTITION** | A chunk of your data | Configurable |
| **EXECUTOR** | A worker process | Depends on cluster |

**We'll see ALL of this in the Spark UI as we go through the workshop!**

---
# Part 2: Spark is LAZY

This is the **most important concept** to understand about Spark.

Let's see it in action.

## 2.1: Create a DataFrame

In [2]:
# 2.1: Create a DataFrame
#
# spark.range(10) creates numbers from 0 to 9
# Think of it like a list: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

df = spark.range(10)

print("DataFrame created!")
print("\n" + "="*50)
print("NOW CHECK THE SPARK UI - JOBS TAB")
print("="*50)
print("""
Go to http://localhost:4040 and look at the Jobs tab.

Do you see any jobs?

...

NO! The Jobs tab is still EMPTY!

WHY? Because Spark is LAZY.

We told Spark to create a DataFrame, but Spark said:
"OK, I'll remember that. But I won't actually do anything
 until you ask me for results."

This is called LAZY EVALUATION.
""")

DataFrame created!

NOW CHECK THE SPARK UI - JOBS TAB

Go to http://localhost:4040 and look at the Jobs tab.

Do you see any jobs?

...

NO! The Jobs tab is still EMPTY!

WHY? Because Spark is LAZY.

We told Spark to create a DataFrame, but Spark said:
"OK, I'll remember that. But I won't actually do anything
 until you ask me for results."

This is called LAZY EVALUATION.



### Key Concept: Lazy Evaluation

Spark divides operations into two types:

| Type | Examples | Creates a Job? |
|------|----------|----------------|
| **Transformations** | filter, select, groupBy, join | NO - Spark just remembers them |
| **Actions** | show, count, collect, write | YES - Spark does the work! |

Think of it like cooking:
- **Transformations** = Writing down the recipe steps
- **Actions** = Actually cooking and serving the dish

Let's trigger an **action** to make Spark work!

## 2.2: Trigger an action with .show()

In [3]:
# 2.2: Trigger an action with .show()
#
# .show() is an ACTION - it tells Spark "give me results NOW!"

df.show()

print("\n" + "="*50)
print("NOW CHECK THE SPARK UI - JOBS TAB")
print("="*50)
print("""
Go back to http://localhost:4040 (Jobs tab)

NOW you should see a job!

Look for:
- Description: "showString at NativeMethodAccessorImpl.java:0"
- A BLUE bar (meaning: completed successfully)
- Stages: 2/2
- Tasks: shows how many parallel work units ran

CONGRATULATIONS! You just ran your first Spark job!

KEY INSIGHT:
The job only appeared when we called .show() (an action).
Creating the DataFrame (2.1) did NOT create a job.
""")

[Stage 0:>                                                          (0 + 8) / 8]

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+


NOW CHECK THE SPARK UI - JOBS TAB

Go back to http://localhost:4040 (Jobs tab)

NOW you should see a job!

Look for:
- Description: "showString at NativeMethodAccessorImpl.java:0"
- A BLUE bar (meaning: completed successfully)
- Stages: 2/2
- Tasks: shows how many parallel work units ran

CONGRATULATIONS! You just ran your first Spark job!

KEY INSIGHT:
The job only appeared when we called .show() (an action).
Creating the DataFrame (2.1) did NOT create a job.



                                                                                

## 2.3: Add job description

In [4]:
# 2.3: Add job description
#
# Let's give this job a name so we can find it in easily Spark UI

spark.sparkContext.setJobDescription("2.3: My first df.show()")
df.show()

print("\n" + "="*50)
print("NOW CHECK THE SPARK UI - JOBS TAB")
print("="*50)
print("""
Look for:
- Description: "2.3: My first df.show()"

KEY INSIGHT:
Adding context make it easier to understand 
what is happening when jobs start to scale
""")

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+


NOW CHECK THE SPARK UI - JOBS TAB

Look for:
- Description: "2.3: My first df.show()"

KEY INSIGHT:
Adding context make it easier to understand 
what is happening when jobs start to scale



---
# Part 3: Actions Create Jobs

Every time you call an **action**, Spark creates a new **job**.

Let's run more actions and watch the Jobs tab grow.

## 3.1: Another action - count()

In [6]:
# 3.1: Another action - count()

spark.sparkContext.setJobDescription("3.1: Count the rows")
total = df.count()

print(f"Total rows: {total}")

print("\n" + "="*50)
print("CHECK THE SPARK UI - JOBS TAB")
print("="*50)
print("""
You should see MORE jobs now.

KEY POINT: Each ACTION creates job(s) in Spark UI.
""")

[Stage 4:>                                                          (0 + 1) / 1]

Total rows: 10

CHECK THE SPARK UI - JOBS TAB

You should see MORE jobs now.

KEY POINT: Each ACTION creates job(s) in Spark UI.



                                                                                

## 3.2: Another action - first()

In [7]:
# 3.2: Another action - first()

spark.sparkContext.setJobDescription("3.2: Get first row")

first_row = df.first()
print(f"First row: {first_row}")

print("\n" + "="*50)
print("CHECK THE SPARK UI - JOBS TAB")
print("="*50)
print("""
Even more jobs now!

SUMMARY SO FAR:
- 2.1: Created DataFrame → NO job (lazy!)
- 2.2: Called .show()    → Job appeared!
- 3.1: Called .count()   → More job(s)!
- 3.2: Called .first()   → More job(s)!

Pattern: ACTIONS create JOBS. Transformations don't.
""")

First row: Row(id=0)

CHECK THE SPARK UI - JOBS TAB

Even more jobs now!

SUMMARY SO FAR:
- 2.1: Created DataFrame → NO job (lazy!)
- 2.2: Called .show()    → Job appeared!
- 3.1: Called .count()   → More job(s)!
- 3.2: Called .first()   → More job(s)!

Pattern: ACTIONS create JOBS. Transformations don't.



## 3.3: Understanding Adaptive Query Execution (AQE)

In [None]:
# 3.3: (OPTIONAL) Understanding Adaptive Query Execution (AQE)
#
# You might notice that some actions create MULTIPLE jobs.
# This is often caused by AQE (Adaptive Query Execution).

from pyspark.sql.functions import col, count

print("="*60)
print("WHAT IS ADAPTIVE QUERY EXECUTION (AQE)?")
print("="*60)
print("""
AQE is enabled by default in Spark 3.0+. It optimizes queries
at RUNTIME by gathering statistics as the query runs.

In this workshop, we DISABLED AQE to keep things predictable:
  spark.sql.adaptive.enabled = false

Let's see what happens when AQE is ON vs OFF:
""")

# Create test data (10 million rows, 10,000 categories)
df = spark.range(10_000_000).withColumn("category", (col("id") % 10_000))

# --- AQE ON ---
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.sparkContext.setJobDescription("3.3b: GroupBy with AQE ON")
df.groupBy("category").agg(count("*")).collect()
print("Run 2 (AQE ON):  Done! Check Spark UI.")

# --- AQE OFF ---
spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.sparkContext.setJobDescription("3.3a: GroupBy with AQE OFF")
df.groupBy("category").agg(count("*")).collect()
print("Run 1 (AQE OFF): Done! Check Spark UI.")


# --- Reset ---
spark.conf.set("spark.sql.adaptive.enabled", "false")

print("\n" + "="*60)
print("CHECK THE SPARK UI - COMPARE THE TWO RUNS")
print("="*60)
print("""
STEP 1: Compare task counts in the JOBS TAB
===========================================
Look at the "Tasks" column for each job:

  "3.3a: GroupBy with AQE OFF" → ~200 tasks
  "3.3b: GroupBy with AQE ON"  → fewer tasks!

This is the partition coalescing in action!


STEP 2: See the details in the SQL TAB
======================================
1. Go to the SQL tab
2. Find the two queries (most recent at top)
3. Click on each query to see the execution plan

For "AQE OFF" query:
  - Look for "Exchange" node (the shuffle)
  - It will show the original partition count

For "AQE ON" query:
  - Look for "AQEShuffleRead" node
  - This replaces the regular Exchange
  - Click on it to see: "number of partitions: X"
  - X will be much smaller than 200!


STEP 3: Compare stage details in STAGES TAB
===========================================
1. Go to the Stages tab
2. Find the shuffle stages (Stage with "Shuffle Read")
3. Compare "Tasks" column:
   - AQE OFF stage: ~200 tasks
   - AQE ON stage: fewer tasks


WHY IS AQE FASTER?
==================
Without AQE:
  - Spark uses 200 shuffle partitions (the default)
  - 200 small tasks = lots of scheduling overhead

With AQE:
  - After Stage 1, Spark measures actual data size
  - Sees data is small → coalesces 200 → few partitions
  - Fewer tasks = less overhead = faster!


WHAT ELSE DOES AQE DO?
======================
1. COALESCES PARTITIONS - Combines small partitions (seen above!)
2. OPTIMIZES JOINS - Switches to broadcast join if data is small
3. HANDLES SKEW - Splits large partitions to balance work


WHY WE DISABLED IT FOR THIS WORKSHOP:
=====================================
AQE makes Spark faster, but less predictable for learning:
  - One action might create multiple jobs
  - "Skipped" stages appear unexpectedly

FOR PRODUCTION: Leave AQE ON (default)
FOR LEARNING: Disable it to see exactly what Spark is doing
""")

WHAT IS ADAPTIVE QUERY EXECUTION (AQE)?

AQE is enabled by default in Spark 3.0+. It optimizes queries
at RUNTIME by gathering statistics as the query runs.

In this workshop, we DISABLED AQE to keep things predictable:
  spark.sql.adaptive.enabled = false

Let's see what happens when AQE is ON vs OFF:



                                                                                

Run 2 (AQE ON):  Done! Check Spark UI.


                                                                                

Run 1 (AQE OFF): Done! Check Spark UI.

CHECK THE SPARK UI - COMPARE THE TWO RUNS

STEP 1: Compare task counts in the JOBS TAB
Look at the "Tasks" column for each job:

  "3.3a: GroupBy with AQE OFF" → ~200 tasks
  "3.3b: GroupBy with AQE ON"  → fewer tasks!

This is the partition coalescing in action!


STEP 2: See the details in the SQL TAB
1. Go to the SQL tab
2. Find the two queries (most recent at top)
3. Click on each query to see the execution plan

For "AQE OFF" query:
  - Look for "Exchange" node (the shuffle)
  - It will show the original partition count

For "AQE ON" query:
  - Look for "AQEShuffleRead" node
  - This replaces the regular Exchange
  - Click on it to see: "number of partitions: X"
  - X will be much smaller than 200!


STEP 3: Compare stage details in STAGES TAB
1. Go to the Stages tab
2. Find the shuffle stages (Stage with "Shuffle Read")
3. Compare "Tasks" column:
   - AQE OFF stage: ~200 tasks
   - AQE ON stage: fewer tasks


WHY IS AQE FASTER?
Without AQE:

## 3.4: Understanding SQL Plans, Stages, and Shuffles

In [None]:
# 3.4: Understanding SQL Plans, Stages, and Shuffles
#
# Let's learn how to read Spark's execution plan!

print("="*60)
print("HOW TO READ A SPARK SQL PLAN")
print("="*60)
print("""
Go to the SQL Tab and click on the last query (3.3a or 3.3b).
You'll see a Physical Plan like this:

  == Physical Plan ==
  * HashAggregate (5)          ← Step 5: Final count
  +- Exchange (4)              ← Step 4: SHUFFLE!
     +- * HashAggregate (3)    ← Step 3: Partial count
        +- * Project (2)       ← Step 2: Calculate category
           +- * Range (1)      ← Step 1: Generate numbers

READ IT BOTTOM TO TOP! Data flows upward:
  Step 1 → 2 → 3 → 4 → 5 → Result


PLAN OPERATORS GLOSSARY
=======================
What do these operator names mean?

  | Operator           | Spark Code              | Meaning                    |
  |--------------------|-------------------------|----------------------------|
  | Range              | spark.range()           | Generate numbers           |
  | Scan / TableScan   | spark.read / table      | Read from file/table       |
  | Project            | select() / withColumn() | Select or compute columns  |
  | Filter             | filter() / where()      | Filter rows                |
  | HashAggregate      | groupBy().agg()         | Group and aggregate        |
  | Exchange           | (automatic)             | SHUFFLE! Stage boundary    |
  | Sort               | orderBy()               | Sort data                  |
  | BroadcastHashJoin  | join() (small table)    | Join with broadcast        |
  | SortMergeJoin      | join() (large tables)   | Join with shuffle          |
  | Union              | union()                 | Combine DataFrames         |

"Project" = "What columns do I want?"
  - select("a", "b")           → Project [a, b]
  - withColumn("new", expr)    → Project [*, new]
  - drop("col")                → Project [all except col]


HOW STAGES ARE CREATED
======================
Look for [codegen id : X] in the detailed plan:

  (1) Range [codegen id : 1]        ─┐
  (2) Project [codegen id : 1]       │ Same codegen = STAGE 4
  (3) HashAggregate [codegen id : 1]─┘
                                     
  (4) Exchange                       ← SHUFFLE = STAGE BOUNDARY!
                                     
  (5) HashAggregate [codegen id : 2] ← New codegen = STAGE 5

RULE: Everything between shuffles is ONE stage.
      Exchange (shuffle) creates a new stage.


WHAT IS A SHUFFLE?
==================
A shuffle is when Spark MOVES DATA between executors.

Example: groupBy("category")
  - Data for category=1 might be on Executor A, B, and C
  - To count category=1, all that data must go to ONE place
  - This "moving data around" is the SHUFFLE

         BEFORE SHUFFLE              AFTER SHUFFLE
    ┌─────────────────────┐     ┌─────────────────────┐
    │ Executor A          │     │ Executor A          │
    │ cat=1, cat=2, cat=2 │     │ cat=1, cat=1, cat=1 │
    ├─────────────────────┤ --> ├─────────────────────┤
    │ Executor B          │     │ Executor B          │
    │ cat=3, cat=1, cat=3 │     │ cat=2, cat=2, cat=2 │
    ├─────────────────────┤     ├─────────────────────┤
    │ Executor C          │     │ Executor C          │
    │ cat=1, cat=2, cat=3 │     │ cat=3, cat=3, cat=3 │
    └─────────────────────┘     └─────────────────────┘


ARE SHUFFLES GOOD OR BAD?
=========================
Shuffles are NECESSARY but EXPENSIVE.

NECESSARY because:
  - groupBy needs all rows with same key together
  - join needs matching rows from both tables together
  - orderBy needs to compare all data
  
  Without shuffles, distributed computing wouldn't work!

EXPENSIVE because:
  - Data moves across the NETWORK (slow!)
  - Data is written to DISK (I/O)
  - Creates a stage boundary (synchronization)

BEST PRACTICE:
  ✓ Accept shuffles when needed (groupBy, join, orderBy)
  ✗ Avoid UNNECESSARY shuffles
  ✗ Don't shuffle more data than needed (filter early!)


OPERATIONS THAT CAUSE SHUFFLES
==============================
  - groupBy()     → Groups need same-key data together
  - join()        → Matching rows must meet
  - orderBy()     → Global sorting needs all data
  - repartition() → Explicitly redistributes data
  - distinct()    → Needs to compare all values


TIPS TO MINIMIZE SHUFFLE COST
=============================
1. FILTER EARLY - Remove rows before the shuffle
2. SELECT ONLY NEEDED COLUMNS - Less data to move
3. USE BROADCAST JOINS - For small tables (no shuffle!)
4. CACHE IF REUSING - Avoid recomputing before shuffle

We'll explore shuffles more in Part 6!
""")

HOW TO READ A SPARK SQL PLAN

Go to the SQL Tab and click on the last query (3.3a or 3.3b).
You'll see a Physical Plan like this:

  == Physical Plan ==
  * HashAggregate (5)          ← Step 5: Final count
  +- Exchange (4)              ← Step 4: SHUFFLE!
     +- * HashAggregate (3)    ← Step 3: Partial count
        +- * Project (2)       ← Step 2: Calculate category
           +- * Range (1)      ← Step 1: Generate numbers

READ IT BOTTOM TO TOP! Data flows upward:
  Step 1 → 2 → 3 → 4 → 5 → Result


PLAN OPERATORS GLOSSARY
What do these operator names mean?

  | Operator           | Spark Code              | Meaning                    |
  |--------------------|-------------------------|----------------------------|
  | Range              | spark.range()           | Generate numbers           |
  | Scan / TableScan   | spark.read / table      | Read from file/table       |
  | Project            | select() / withColumn() | Select or compute columns  |
  | Filter             | filter(

---
# Part 4: Transformations are Lazy

Now let's add **transformations** (filter, add columns, etc.)

Remember: transformations are LAZY - they don't create jobs!

## 4.1: Add transformations (NO jobs will be created!)

In [9]:
# 4.1: Add transformations (NO jobs will be created!)
from pyspark.sql.functions import col

# Start with numbers 0-99
df = spark.range(100)

# Add transformations:
# 1. Filter: keep only numbers > 50
# 2. Add a new column: the number multiplied by 2
df_transformed = df \
    .filter(col("id") > 50) \
    .withColumn("doubled", col("id") * 2)

print("Transformations added: filter + withColumn")
print("\n" + "="*50)
print("CHECK THE SPARK UI - JOBS TAB")
print("="*50)
print("""
Did any NEW jobs appear?

NO! The job count is the same as before.

Spark is waiting. It knows we want to:
1. Create numbers 0-99
2. Filter to keep > 50
3. Add a 'doubled' column

But it won't DO any of this until we call an action.
""")

Transformations added: filter + withColumn

CHECK THE SPARK UI - JOBS TAB

Did any NEW jobs appear?

NO! The job count is the same as before.

Spark is waiting. It knows we want to:
1. Create numbers 0-99
2. Filter to keep > 50
3. Add a 'doubled' column

But it won't DO any of this until we call an action.



## 4.2: NOW trigger the action

In [13]:
# 4.2: NOW trigger the action

spark.sparkContext.setJobDescription("4.2: Count transformed data")

# Using count() instead of show() to guarantee exactly 1 job
# (show() can create multiple jobs due to internal optimizations)
total = df_transformed.count()
print(f"Counted {total} rows")

# Let's also see the data (this is a SEPARATE action!)
spark.sparkContext.setJobDescription("4.2: Show the data")
print("\nFirst 5 rows:")
df_transformed.show(5)

print("\n" + "="*50)
print("CHECK THE SPARK UI - JOBS TAB")
print("="*50)
print("""
Look for job: "4.2: Count transformed data"

This ONE job did ALL the work:
  1. Generated numbers 0-99
  2. Filtered to keep only > 50  
  3. Added the 'doubled' column
  4. Counted the rows

All the transformations were combined into ONE job!

You'll also see "4.2: Show the data" - that's a SEPARATE action.
Each action triggers its own job(s). We ran two actions:
  - count() → 1 job
  - show()  → 1-2 jobs (internal optimization)

KEY INSIGHT:
Spark waits until an action, then runs ALL transformations together.
This allows Spark to optimize the entire pipeline.
""")

Counted 49 rows

First 5 rows:
+---+-------+
| id|doubled|
+---+-------+
| 51|    102|
| 52|    104|
| 53|    106|
| 54|    108|
| 55|    110|
+---+-------+
only showing top 5 rows


CHECK THE SPARK UI - JOBS TAB

Look for job: "4.2: Count transformed data"

This ONE job did ALL the work:
  1. Generated numbers 0-99
  2. Filtered to keep only > 50  
  3. Added the 'doubled' column
  4. Counted the rows

All the transformations were combined into ONE job!

You'll also see "4.2: Show the data" - that's a SEPARATE action.
Each action triggers its own job(s). We ran two actions:
  - count() → 1 job
  - show()  → 1-2 jobs (internal optimization)

KEY INSIGHT:
Spark waits until an action, then runs ALL transformations together.
This allows Spark to optimize the entire pipeline.



---
# Part 5: Looking Inside Jobs (Stages & Tasks)

A **Job** is made of **Stages**, and each Stage has **Tasks**.

```
Job
 └── Stage 1
      └── Task 1, Task 2, Task 3...
 └── Stage 2  
      └── Task 1, Task 2, Task 3...
```

**Tasks** are the smallest unit of work. They run IN PARALLEL.

## 5.1: Compare show() vs count() vs take()

In [None]:
# 5.1: Compare show() vs count() vs take()

df = spark.range(10_000)

# First: show() - only needs a few rows
spark.sparkContext.setJobDescription("5.1a: show(5) - reads 6 rows (?)")
df.show(5)

print("\n" + "-"*50 + "\n")

# Second: count() - needs ALL rows
spark.sparkContext.setJobDescription("5.1b: count() - needs ALL rows")
total = df.count()
print(f"Count: {total}")

print("\n" + "-"*50 + "\n")

# Third: take() - exactly n rows
spark.sparkContext.setJobDescription("5.1c: take(5) - exactly 5 rows")
rows = df.take(5)
print(f"Take: {rows}")

print("\n" + "="*50)
print("COMPARE THESE THREE JOBS IN SPARK UI")
print("="*50)
print("""
Find all three jobs and compare them:

"5.1a: show(5) - needs only 5 rows, but reads 6":
  - Click on the job, then click on the Stage
  - Look at Input Records: shows 6 records!
  - WHY? Spark fetches n+1 to check if there's more data

"5.1b: count() - needs ALL rows":
  - Look at Tasks: MULTIPLE tasks!
  - Look at Input Records: 10,000 (all of them!)
  - WHY? .count() must scan ALL the data

"5.1c: take(5) - exactly 5 rows":
  - Look at Input Records: exactly 5!
  - WHY? .take(n) fetches exactly n rows

KEY INSIGHT:
Different actions have different costs!


WHY DOES show(5) PROCESS 6 RECORDS?
===================================
Spark fetches n+1 rows to check if there's MORE data.
This helps it decide whether to show:
  "only showing top 5 rows"

  | Action    | Fetches  | Displays | Why?                    |
  |-----------|----------|----------|-------------------------|
  | show(5)   | 6 rows   | 5 rows   | Checks if more exist    |
  | take(5)   | 5 rows   | 5 rows   | Exactly what you asked  |
  | count()   | ALL rows | 1 number | Must scan everything    |
""")

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+
only showing top 5 rows


--------------------------------------------------

Count: 10000

--------------------------------------------------

Take: [Row(id=0), Row(id=1), Row(id=2), Row(id=3), Row(id=4)]

COMPARE THESE THREE JOBS IN SPARK UI

Find all three jobs and compare them:

"5.1a: show(5) - needs only 5 rows, but reads 6":
  - Click on the job, then click on the Stage
  - Look at Input Records: shows 6 records!
  - WHY? Spark fetches n+1 to check if there's more data

"5.1b: count() - needs ALL rows":
  - Look at Tasks: MULTIPLE tasks!
  - Look at Input Records: 10,000 (all of them!)
  - WHY? .count() must scan ALL the data

"5.1c: take(5) - exactly 5 rows":
  - Look at Input Records: exactly 5!
  - WHY? .take(n) fetches exactly n rows

KEY INSIGHT:
Different actions have different costs!


WHY DOES show(5) PROCESS 6 RECORDS?
Spark fetches n+1 rows to check if there's MORE data.
This helps it decide whether to show:
  "only

## 5.2: Explore a Stage's details

In [42]:
# 5.2: Explore a Stage's details

# df = spark.range(50000)
df = spark.range(50000, numPartitions=16)

spark.sparkContext.setJobDescription("5.2: Count 50,000 numbers")
total = df.count()
print(f"Total: {total}")

print("\n" + "="*50)
print("EXPLORE THE STAGE DETAILS")
print("="*50)
print("""
1. Go to Jobs tab, find "5.2: Count 50,000 numbers"

2. Click on the job to see its details

3. Click on the Stage number to see stage details

4. Look for:
   - Summary Metrics: min/median/max task duration
   - Event Timeline: visual of when tasks ran
   - Tasks table: one row per task

5. In the Event Timeline, look at the colors:
   - GREEN = Executor Computing Time (actual work)
   - Other colors = overhead (scheduling, serialization, etc.)
   - Ideally, most of the bar should be GREEN!


UNDERSTANDING TASK COUNT
========================
The number of tasks depends on your PARALLELISM:
  - More cores = more tasks running in parallel
  - Each task processes a portion of the data

Example with 50,000 rows:
  | Cores | Tasks | Records per Task |
  |-------|-------|------------------|
  | 2     | 2     | 25,000           |
  | 4     | 4     | 12,500           |
  | 8     | 8     | 6,250            |

Check "Input Records" in the Summary Metrics:
  Total records = Tasks × Records per Task
  
In the Executor table at the bottom, you can see:
  - Which executor ran the tasks
  - Total Input Records processed
""")

Total: 50000

EXPLORE THE STAGE DETAILS

1. Go to Jobs tab, find "5.2: Count 50,000 numbers"

2. Click on the job to see its details

3. Click on the Stage number to see stage details

4. Look for:
   - Summary Metrics: min/median/max task duration
   - Event Timeline: visual of when tasks ran
   - Tasks table: one row per task

5. In the Event Timeline, look at the colors:
   - GREEN = Executor Computing Time (actual work)
   - Other colors = overhead (scheduling, serialization, etc.)
   - Ideally, most of the bar should be GREEN!


UNDERSTANDING TASK COUNT
The number of tasks depends on your PARALLELISM:
  - More cores = more tasks running in parallel
  - Each task processes a portion of the data

Example with 50,000 rows:
  | Cores | Tasks | Records per Task |
  |-------|-------|------------------|
  | 2     | 2     | 25,000           |
  | 4     | 4     | 12,500           |
  | 8     | 8     | 6,250            |

Check "Input Records" in the Summary Metrics:
  Total records = Tasks

## 5.3: Number of cores vs input records

### 5.3.1: 4 cores

In [37]:
from pyspark.sql import SparkSession

# Stop any existing Spark session
if "spark" in globals():
    globals()["spark"].stop()

# Create a new Spark session
spark = (
    SparkSession.builder
    .appName("Spark-UI-Workshop")
    .master("spark://127.0.0.1:7077")
    .config("spark.ui.port", "4040")
    .config("spark.ui.host", "0.0.0.0")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.bindAddress", "0.0.0.0")
    .config("spark.executor.memory", "512m")
    .config("spark.executor.cores", "4")
    .config("spark.cores.max", "4")
    .config("spark.sql.adaptive.enabled", "false")
    .getOrCreate()
)

df = spark.range(50000)

spark.sparkContext.setJobDescription("5.3.1: Count 50,000 numbers - 4 cores")
total = df.count()
print(f"Total: {total}")

[Stage 0:>                                                          (0 + 2) / 2]

Total: 50000


                                                                                

### 5.3.1: 2 cores

In [None]:
from pyspark.sql import SparkSession

# Stop any existing Spark session
if "spark" in globals():
    globals()["spark"].stop()

# Create a new Spark session
spark = (
    SparkSession.builder
    .appName("Spark-UI-Workshop")
    .master("spark://127.0.0.1:7077")
    .config("spark.ui.port", "4040")
    .config("spark.ui.host", "0.0.0.0")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.bindAddress", "0.0.0.0")
    .config("spark.executor.memory", "512m")
    .config("spark.executor.cores", "2")
    .config("spark.cores.max", "2")
    .config("spark.sql.adaptive.enabled", "false")
    .getOrCreate()
)

df = spark.range(50000)

spark.sparkContext.setJobDescription("5.3.2: Count 50,000 numbers - 2 cores")
total = df.count()
print(f"Total: {total}")

### 5.3.1: 16 cores

In [None]:
from pyspark.sql import SparkSession

# Stop any existing Spark session
if "spark" in globals():
    globals()["spark"].stop()

# Create a new Spark session
spark = (
    SparkSession.builder
    .appName("Spark-UI-Workshop")
    .master("spark://127.0.0.1:7077")
    .config("spark.ui.port", "4040")
    .config("spark.ui.host", "0.0.0.0")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.bindAddress", "0.0.0.0")
    .config("spark.executor.memory", "512m")
    .config("spark.executor.cores", "16")
    .config("spark.cores.max", "16")
    .config("spark.sql.adaptive.enabled", "false")
    .getOrCreate()
)

df = spark.range(50000)

spark.sparkContext.setJobDescription("5.3.1: Count 50,000 numbers - 16 cores")
total = df.count()
print(f"Total: {total}")

---
# Part 6: Shuffles Create New Stages

So far, our jobs had only **one stage**.

But some operations need to **move data around**. This is called a **shuffle**.

Shuffles create **new stages**.

Operations that cause shuffles:
- `groupBy()` - grouping by a key
- `orderBy()` - sorting all data  
- `join()` - combining two tables

## 6.1: Setup data for groupBy

In [None]:
# 6.1: Setup data for groupBy
from pyspark.sql.functions import col

# Create numbers 0-999, each with a category (0, 1, 2, 3, or 4)
df = spark.range(1000) \
    .withColumn("category", (col("id") % 5))

print("Sample data:")
spark.sparkContext.setJobDescription("6.1: Show sample data")
df.show(20)

print("\nWe have 1000 numbers, each assigned to category 0-4.")
print("Now let's COUNT how many numbers are in each category...")
print("This requires groupBy() - which causes a SHUFFLE!")

Sample data:
+---+--------+
| id|category|
+---+--------+
|  0|       0|
|  1|       1|
|  2|       2|
|  3|       3|
|  4|       4|
|  5|       0|
|  6|       1|
|  7|       2|
|  8|       3|
|  9|       4|
| 10|       0|
| 11|       1|
| 12|       2|
| 13|       3|
| 14|       4|
| 15|       0|
| 16|       1|
| 17|       2|
| 18|       3|
| 19|       4|
+---+--------+
only showing top 20 rows


We have 1000 numbers, each assigned to category 0-4.
Now let's COUNT how many numbers are in each category...
This requires groupBy() - which causes a SHUFFLE!


## 6.2: Run groupBy (causes a shuffle!)

In [19]:
# 6.2: Run groupBy (causes a shuffle!)
from pyspark.sql.functions import count

spark.sparkContext.setJobDescription("6.2: GroupBy - causes SHUFFLE (5 categories / 5 partitions)")
spark.conf.set("spark.sql.shuffle.partitions", "5")
df = spark.range(1000) \
    .withColumn("category", (col("id") % 5))
result = df.groupBy("category").agg(count("*").alias("total"))
result.collect()

# spark.sparkContext.setJobDescription("6.2 (show): GroupBy results")
# result.show()

print("\n" + "="*50)
print("CHECK THE SPARK UI - STAGES TAB")
print("="*50)
print("""
Go to the STAGES tab and find the two stages for "6.2: GroupBy":

STAGE 1 (e.g., Stage 37): BEFORE the shuffle
=============================================
  - Tasks: 8 (one per core/partition)
  - Input Records: 1000 (your original data)
  - Shuffle Write: ~2.4 KiB / 40 records
  
  WHAT HAPPENED:
  - Each task read ~125 rows (1000 ÷ 8 tasks)
  - Did PARTIAL count per category
  - Wrote results to shuffle files (5 categories × 8 tasks = 40 records)


STAGE 2 (e.g., Stage 38): AFTER the shuffle
=============================================
  - Tasks: 200 (default spark.sql.shuffle.partitions!)
  - Shuffle Read: ~2.4 KiB / 40 records
  - Most tasks read 0 records (empty partitions)
  
  WHAT HAPPENED:
  - 200 partitions were created (Spark default)
  - But we only have 5 categories!
  - So only ~5 partitions have data, 195 are empty
  - Each partition does FINAL count for its categories


WHY 200 TASKS IF MOST PARTITIONS ARE EMPTY?
===========================================
Good question! Without AQE, Spark uses a STATIC plan:

  1. BEFORE execution: Spark decides "I'll use 200 shuffle partitions"
  2. DURING shuffle write: Data is hashed to partitions (only 5 get data)
  3. DURING shuffle read: Spark launches ALL 200 tasks anyway
  4. Each empty task: Quickly checks "no data" and completes (~3ms)

  SHUFFLE WRITE (Stage 1):
    Hash function decides which partition: hash(category) % 200
    
    cat=0 → partition 47  (has data)
    cat=1 → partition 122 (has data)
    cat=2 → partition 88  (has data)
    ...
    partition 0, 1, 2... → EMPTY (no categories hashed here)

  SHUFFLE READ (Stage 2):
    Spark launches 200 tasks (one per partition)
    195 tasks: "No data for me!" → finish in ~3ms
    5 tasks: "I have data!" → do the actual work

Check the Stage 2 metrics - you'll see:
  Duration: Min ~3ms (empty), Max ~100ms (has data)

This is wasteful! That's why AQE helps - it detects empty
partitions AFTER shuffle write and skips them in Stage 2.


HOW SHUFFLE WORKS (DIAGRAM)
===========================

Examp`le: groupBy("category") with categories A, B, C`

  STAGE 1: Each partition has mixed data
  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
  │ Partition 1 │ │ Partition 2 │ │ Partition 3 │
  │ A=10, B=5   │ │ A=8, C=12   │ │ B=7, C=3    │
  │ C=2         │ │ B=4         │ │ A=6         │
  └─────────────┘ └─────────────┘ └─────────────┘
         │               │               │
         └───────────────┼───────────────┘
                         │
                    S H U F F L E
                  (data reorganized by key)
                         │
         ┌───────────────┼───────────────┐
         │               │               │
         ▼               ▼               ▼
  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
  │ Partition A │ │ Partition B │ │ Partition C │
  │ A=10        │ │ B=5         │ │ C=2         │
  │ A=8         │ │ B=4         │ │ C=12        │
  │ A=6         │ │ B=7         │ │ C=3         │
  │ ─────────── │ │ ─────────── │ │ ─────────── │
  │ Total: 24   │ │ Total: 16   │ │ Total: 17   │
  └─────────────┘ └─────────────┘ └─────────────┘
  STAGE 2: Each partition has ONE category (can compute final total)


PARTITIONS vs EXECUTORS vs TASKS
================================
These terms are related but different:

  PARTITIONS = Logical chunks of data (how data is divided)
  TASKS      = Work units (1 task processes 1 partition)  
  CORES      = How many tasks run in parallel per executor
  EXECUTORS  = Physical workers (machines/processes)

  ┌──────────────────────────────────────────────────────┐
  │                      EXECUTOR                        │
  │  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐  │
  │  │ Core 1  │  │ Core 2  │  │ Core 3  │  │ Core 4  │  │
  │  │  Task   │  │  Task   │  │  Task   │  │  Task   │  │
  │  │   ↓     │  │   ↓     │  │   ↓     │  │   ↓     │  │
  │  │ Part 1  │  │ Part 2  │  │ Part 3  │  │ Part 4  │  │
  │  └─────────┘  └─────────┘  └─────────┘  └─────────┘  │
  └──────────────────────────────────────────────────────┘

  1 Task = 1 Partition (always)
  1 Core = 1 Task at a time
  1 Executor = Multiple cores = Multiple parallel tasks


SPARK ARCHITECTURE: DRIVER vs EXECUTORS
=======================================

  ┌─────────────────────────────────────────────────────────────┐
  │                         DRIVER                              │
  │                   (your notebook/app)                       │
  │                                                             │
  │  • Creates execution plan (DAG)                             │
  │  • Schedules tasks on executors                             │
  │  • Tracks progress & handles failures                       │
  │  • Does NOT move data during shuffle!                       │
  │  • Receives final results (e.g., collect())                 │
  └─────────────────────────┬───────────────────────────────────┘
                            │ coordinates
              ┌─────────────┼─────────────┐
              │             │             │
              ▼             ▼             ▼
  ┌───────────────┐ ┌───────────────┐ ┌───────────────┐
  │  EXECUTOR 1   │ │  EXECUTOR 2   │ │  EXECUTOR 3   │
  │               │ │               │ │               │
  │ • Runs tasks  │ │ • Runs tasks  │ │ • Runs tasks  │
  │ • Stores data │ │ • Stores data │ │ • Stores data │
  │ • Shuffle I/O │ │ • Shuffle I/O │ │ • Shuffle I/O │
  └───────┬───────┘ └───────┬───────┘ └───────┬───────┘
          │                 │                 │
          └────────────────►│◄────────────────┘
                    SHUFFLE DATA
               (executors talk directly!)

  KEY INSIGHT: 
  During shuffle, data moves BETWEEN EXECUTORS directly.
  The driver only coordinates - it doesn't touch the data!


SHUFFLE: SINGLE vs MULTIPLE EXECUTORS
=====================================

In this workshop, we have 1 EXECUTOR with multiple cores:

  ┌────────────────┐
  │     DRIVER     │ ◄── Your notebook
  └───────┬────────┘
          │ coordinates
          ▼
  ┌───────────────────────────────────────────┐
  │              SINGLE EXECUTOR              │
  │  ┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐  │
  │  │Core 1 │ │Core 2 │ │Core 3 │ │Core 4 │  │
  │  │Task 1 │ │Task 2 │ │Task 3 │ │Task 4 │  │
  │  └───┬───┘ └───┬───┘ └───┬───┘ └───┬───┘  │
  │      │         │         │         │      │
  │      └────┬────┴────┬────┴────┬────┘      │
  │           ▼         ▼         ▼           │
  │      ┌─────────────────────────────┐      │
  │      │     LOCAL DISK (shuffle)    │      │
  │      └─────────────────────────────┘      │
  └───────────────────────────────────────────┘
  
  Shuffle = DISK I/O only (fast)


In PRODUCTION with multiple executors:

  ┌────────────────┐
  │     DRIVER     │
  └───────┬────────┘
          │ coordinates
     ┌────┴────┐
     ▼         ▼
  ┌─────────────┐         ┌─────────────┐
  │ EXECUTOR 1  │         │ EXECUTOR 2  │
  │ ┌───┐ ┌───┐ │         │ ┌───┐ ┌───┐ │
  │ │T1 │ │T2 │ │         │ │T3 │ │T4 │ │
  │ └─┬─┘ └─┬─┘ │         │ └─┬─┘ └─┬─┘ │
  │   └──┬──┘   │         │   └──┬──┘   │
  │      ▼      │         │      ▼      │
  │  [Disk 1]   │◄───────►│  [Disk 2]   │
  └─────────────┘ NETWORK └─────────────┘
  
  Shuffle = DISK I/O + NETWORK I/O (slow!)

This is why "shuffles are expensive" - in real clusters,
data must travel over the network between machines!


HOW TO REDUCE SHUFFLE PARTITIONS
================================
Spark uses 200 shuffle partitions by default:
  spark.sql.shuffle.partitions = 200

With only 5 categories, most partitions are empty!
Solutions:
  1. Enable AQE (coalesces automatically) - disabled for learning
  2. Manually set: spark.conf.set("spark.sql.shuffle.partitions", "5")

HOW TO CHOOSE PARTITION COUNT?
  - Match your data: 5 categories → 5 partitions
  - Or use your cores: 8 cores → 8 partitions (for parallelism)
  - Rule of thumb: 2-4 partitions per core for large datasets
  
  Too few  → Less parallelism, slower
  Too many → Empty partitions, overhead


SHUFFLE WRITE vs SHUFFLE READ
=============================
  Stage 1: Shuffle Write = Data SENT (2.4 KiB)
           ↓ (data moves across network/disk)
  Stage 2: Shuffle Read  = Data RECEIVED (2.4 KiB)
  
  These should match! Data written = Data read.
""")


CHECK THE SPARK UI - STAGES TAB

Go to the STAGES tab and find the two stages for "6.2: GroupBy":

STAGE 1 (e.g., Stage 37): BEFORE the shuffle
  - Tasks: 8 (one per core/partition)
  - Input Records: 1000 (your original data)
  - Shuffle Write: ~2.4 KiB / 40 records
  
  WHAT HAPPENED:
  - Each task read ~125 rows (1000 ÷ 8 tasks)
  - Did PARTIAL count per category
  - Wrote results to shuffle files (5 categories × 8 tasks = 40 records)


STAGE 2 (e.g., Stage 38): AFTER the shuffle
  - Tasks: 200 (default spark.sql.shuffle.partitions!)
  - Shuffle Read: ~2.4 KiB / 40 records
  - Most tasks read 0 records (empty partitions)
  
  WHAT HAPPENED:
  - 200 partitions were created (Spark default)
  - But we only have 5 categories!
  - So only ~5 partitions have data, 195 are empty
  - Each partition does FINAL count for its categories


WHY 200 TASKS IF MOST PARTITIONS ARE EMPTY?
Good question! Without AQE, Spark uses a STATIC plan:

  1. BEFORE execution: Spark decides "I'll use 200 shuffl

---
# Part 7: The SQL Tab

The **SQL tab** shows you a visual diagram of HOW Spark executes your query.

This is extremely useful for understanding what's happening!

## 7.1: Total sales by store

In [None]:
# 7.1: Reading the SQL tab
from pyspark.sql.functions import sum

# Create sales data: store (0-4), amount
df = spark.range(10000000) \
    .withColumn("store", (col("id") % 5)) \
    .withColumn("amount", col("id") * 10)

spark.sparkContext.setJobDescription("7.1: Total sales by store")

result = df.groupBy("store").agg(
    sum("amount").alias("total_sales")
)
result.collect()

spark.sparkContext.setJobDescription("7.1 (show): Total sales by store")
result.show()

print("\n" + "="*50)
print("EXPLORE THE SQL TAB")
print("="*50)
print("""
1. Go to the SQL tab in Spark UI
2. Click on the most recent query
3. You'll see a DAG (diagram) and detailed metrics


THE PHYSICAL PLAN
=================
Read bottom to top - data flows upward:

  == Physical Plan ==
  * HashAggregate (5)      ← FINAL sum (5 output rows = 5 stores)
  +- Exchange (4)          ← SHUFFLE (200 partitions)
     +- * HashAggregate (3) ← PARTIAL sum (40 rows = 5 stores × 8 tasks)
        +- * Project (2)    ← Calculate store and amount columns
           +- * Range (1)   ← Generate 10,000 numbers


KEY METRICS TO LOOK FOR
=======================

RANGE (Step 1):
  "number of output rows: 10,000"
  → Generated 10,000 numbers (0 to 9,999)

PROJECT (Step 2):
  → Calculated: store = id % 5, amount = id * 10
  → No row count shown (same as input: 10,000)

HASH AGGREGATE - PARTIAL (Step 3):
  "number of output rows: 40"
  → Why 40? = 5 stores × 8 tasks
  → Each task computed partial sum for each store it saw

EXCHANGE - THE SHUFFLE (Step 4):
  "shuffle records written: 40"    ← Data sent (matches partial agg output)
  "records read: 40"               ← Data received (should match!)
  "number of partitions: 200"      ← Default shuffle partitions
  "shuffle bytes written: 2.4 KiB" ← Total data shuffled
  "local bytes read: 2.4 KiB"      ← Read from LOCAL disk (same executor)
  "remote bytes read: 0.0 B"       ← No NETWORK transfer (single executor!)

HASH AGGREGATE - FINAL (Step 5):
  "number of output rows: 5"
  → Final result: 5 stores with total sales each


UNDERSTANDING MIN/MED/MAX METRICS
=================================
Many metrics show (min, med, max) across tasks:

  "shuffle write time: total 484ms (min 0ms, med 0ms, max 85ms)"
  
  This means:
  - Total across all tasks: 484ms
  - Minimum task: 0ms (empty partition, no data)
  - Median task: 0ms (most partitions empty!)
  - Maximum task: 85ms (partition with data)

  The big gap between median (0ms) and max (85ms) shows
  that most of the 200 partitions were EMPTY!


WHAT IS SPILL? (IMPORTANT!)
===========================
Spill happens when Spark RUNS OUT OF MEMORY during an operation.

  NORMAL (data fits in memory):
  ┌───────────────────────────────┐
  │           MEMORY              │
  │  ┌─────────────────────────┐  │
  │  │   Hash Table for        │  │
  │  │   Aggregation           │  │
  │  │   (all data fits!)      │  │
  │  └─────────────────────────┘  │
  └───────────────────────────────┘
           ↓ Fast!


  SPILL (data doesn't fit in memory):
  ┌───────────────────────────────┐
  │           MEMORY              │
  │  ┌─────────────────────────┐  │
  │  │   Hash Table (FULL!)    │  │
  │  └───────────┬─────────────┘  │
  └──────────────│────────────────┘
                 │ Overflow!
                 ▼
  ┌───────────────────────────────┐
  │            DISK               │
  │  ┌─────────────────────────┐  │
  │  │   Spilled data          │  │  ← SLOW!
  │  │   (temporary files)     │  │
  │  └─────────────────────────┘  │
  └───────────────────────────────┘

When spill happens:
  - groupBy() → hash table too large
  - join()    → join buffers overflow  
  - orderBy() → sort buffers overflow

Why it's BAD:
  - Disk is 100-1000x SLOWER than memory
  - Data written to disk, then read back
  - Significantly slows down your job

In this query:
  "spill size: 0.0 B" → GOOD! Everything fit in memory

If you see spill size > 0, try:
  1. Increase executor memory: spark.executor.memory
  2. Reduce data per partition (more partitions)
  3. Filter data earlier to reduce size


MEMORY METRICS
==============
  "peak memory: 55.0 MiB (min 256 KiB, med 256 KiB, max 1280 KiB)"
  → Memory used for hash tables during aggregation
  → Shows how much memory each task needed


LOCAL vs REMOTE READS
=====================
  "local bytes read: 2.4 KiB"   ← Data from SAME executor
  "remote bytes read: 0.0 B"    ← Data from OTHER executors

  In our single-executor setup, all reads are LOCAL.
  In production with multiple executors:
  - remote bytes > 0 means network transfer
  - High remote bytes = expensive shuffle


WHOLESTAGECODEGEN
=================
  "WholeStageCodegen (1) duration: 240ms"
  "WholeStageCodegen (2) duration: 899ms"

  Spark compiles multiple operators into optimized Java code.
  [codegen id : 1] = Range + Project + Partial HashAggregate
  [codegen id : 2] = Final HashAggregate
  
  The * before operators means they're part of codegen:
    * HashAggregate  ← In codegen (fast!)
    Exchange         ← NOT in codegen (shuffle can't be compiled)


WHAT TO LOOK FOR (QUICK REFERENCE)
==================================
| Metric                  | Good          | Warning Sign        |
|-------------------------|---------------|---------------------|
| spill size              | 0 B           | > 0 (memory issue)  |
| remote bytes read       | Low           | High (network cost) |
| number of partitions    | Matches data  | 200 with few keys   |
| output rows             | Expected      | Unexpected count    |
| peak memory             | Reasonable    | Very high           |
""")

                                                                                

+-----+---------------+
|store|    total_sales|
+-----+---------------+
|    0| 99999950000000|
|    1| 99999970000000|
|    3|100000010000000|
|    2| 99999990000000|
|    4|100000030000000|
+-----+---------------+


EXPLORE THE SQL TAB

1. Go to the SQL tab in Spark UI
2. Click on the most recent query
3. You'll see a DAG (diagram) and detailed metrics


THE PHYSICAL PLAN
Read bottom to top - data flows upward:

  == Physical Plan ==
  * HashAggregate (5)      ← FINAL sum (5 output rows = 5 stores)
  +- Exchange (4)          ← SHUFFLE (200 partitions)
     +- * HashAggregate (3) ← PARTIAL sum (40 rows = 5 stores × 8 tasks)
        +- * Project (2)    ← Calculate store and amount columns
           +- * Range (1)   ← Generate 10,000 numbers


KEY METRICS TO LOOK FOR

RANGE (Step 1):
  "number of output rows: 10,000"
  → Generated 10,000 numbers (0 to 9,999)

PROJECT (Step 2):
  → Calculated: store = id % 5, amount = id * 10
  → No row count shown (same as input: 10,000)

HASH AGGREGAT

## 7.2: Making Spark SPILL to disk (demonstration)

In [24]:
# 7.2: Making Spark SPILL to disk (demonstration)
#
# In 7.1, we had 10,000 rows but only 5 stores.
# The hash table only needed 5 entries - easy to fit in memory!
#
# To cause SPILL, we need HIGH CARDINALITY (many unique keys).

from pyspark.sql.functions import col, count, sum as spark_sum

print("="*60)
print("DEMONSTRATING MEMORY SPILL")
print("="*60)

# Create data with HIGH CARDINALITY - many unique keys!
# 5 million rows with 1 million unique keys
print("\nCreating 5 million rows with 1 million unique keys...")

spark.sparkContext.setJobDescription("7.2: High cardinality groupBy (causes SPILL)")

df_spill = spark.range(5000000) \
    .withColumn("group_key", (col("id") % 1000000)) \
    .withColumn("value", col("id") * 10)

# This groupBy needs a hash table with 1 MILLION entries!
result = df_spill.groupBy("group_key").agg(
    count("*").alias("cnt"),
    spark_sum("value").alias("total")
)

result.collect()
print("Done! Check the SQL tab for spill metrics.")

print("\n" + "="*60)
print("CHECK THE SQL TAB FOR SPILL")
print("="*60)
print("""
1. Go to SQL tab, find "7.2: High cardinality groupBy (causes SPILL)"
2. Look at the HashAggregate metrics - there are TWO:

   PARTIAL HashAggregate (Step 3 - BEFORE shuffle):
   ================================================
   "spill size: ~456 MiB"              ← IT SPILLED!
   "peak memory: ~96 MiB"              ← Memory limit per task
   "number of output rows: 5,000,000"  ← All rows processed
   "number of sort fallback tasks: 8"  ← Tasks that used disk!

   FINAL HashAggregate (Step 5 - AFTER shuffle):
   ==============================================
   "spill size: 0.0 B"                 ← No spill here
   "peak memory: ~250 MiB"             
   "number of output rows: 1,000,000"  ← 1M unique groups


WHY DID THE PARTIAL AGGREGATE SPILL?
====================================
The PARTIAL aggregation (before shuffle) processes ALL 5M rows:
  - Each of 8 tasks processes ~625,000 rows
  - Each task sees ~125,000 unique keys (1M keys / 8 tasks)
  - Hash table for 125,000 keys doesn't fit in task memory
  - Spark SPILLS overflow data to disk!

The FINAL aggregation (after shuffle) didn't spill because:
  - Data is already partially aggregated
  - Each of 200 partitions has fewer entries to handle


WHAT IS "SORT FALLBACK TASKS"?
==============================
"number of sort fallback tasks: 8"

When hash aggregation runs out of memory, Spark falls back
to SORT-BASED aggregation:
  1. Spill data to disk
  2. Sort the spilled data by key
  3. Aggregate sorted data (sequential scan)

This is SLOWER than hash aggregation but uses less memory.
All 8 tasks had to use this fallback = significant spill!


WHY DID IT SPILL NOW BUT NOT IN 7.1?
====================================
  ┌──────────────────────────────────────────────────────┐
  │  7.1: Low Cardinality      │  7.2: High Cardinality  │
  │  (5 unique keys)           │  (1M unique keys)       │
  │                            │                         │
  │  Hash Table per task:      │  Hash Table per task:   │
  │  ┌─────┐                   │  ┌─────────────────┐    │
  │  │ 0   │                   │  │ 0, 1, 2, 3...   │    │
  │  │ 1   │  ← 5 entries!     │  │ ...             │    │
  │  │ 2   │                   │  │ ... 124,997     │    │
  │  │ 3   │                   │  │ ... 124,998     │    │
  │  │ 4   │                   │  │ ... 124,999     │    │
  │  └─────┘                   │  └─────────────────┘    │
  │                            │  ← 125,000 entries!     │
  │  ~1 KB memory              │  ~12 MB memory per task │
  │                            │         │               │
  │  Fits in memory ✓          │         ▼ SPILL!        │
  │                            │      [DISK: 456 MiB]    │
  └──────────────────────────────────────────────────────┘


KEY INSIGHT: CARDINALITY MATTERS!
=================================
Low cardinality groupBy (few unique keys):
  → Small hash table → No spill → Fast!

High cardinality groupBy (many unique keys):  
  → Large hash table → May spill → Slower!

Look at the timing difference:
  - 7.1: ~1 second (no spill)
  - 7.2: ~10 seconds (456 MiB spilled!)

This is why knowing your data cardinality is important!
""")

DEMONSTRATING MEMORY SPILL

Creating 5 million rows with 1 million unique keys...


                                                                                

Done! Check the SQL tab for spill metrics.

CHECK THE SQL TAB FOR SPILL

1. Go to SQL tab, find "7.2: High cardinality groupBy (causes SPILL)"
2. Look at the HashAggregate metrics - there are TWO:

   PARTIAL HashAggregate (Step 3 - BEFORE shuffle):
   "spill size: ~456 MiB"              ← IT SPILLED!
   "peak memory: ~96 MiB"              ← Memory limit per task
   "number of output rows: 5,000,000"  ← All rows processed
   "number of sort fallback tasks: 8"  ← Tasks that used disk!

   FINAL HashAggregate (Step 5 - AFTER shuffle):
   "spill size: 0.0 B"                 ← No spill here
   "peak memory: ~250 MiB"             
   "number of output rows: 1,000,000"  ← 1M unique groups


WHY DID THE PARTIAL AGGREGATE SPILL?
The PARTIAL aggregation (before shuffle) processes ALL 5M rows:
  - Each of 8 tasks processes ~625,000 rows
  - Each task sees ~125,000 unique keys (1M keys / 8 tasks)
  - Hash table for 125,000 keys doesn't fit in task memory
  - Spark SPILLS overflow data to disk!

T

## 7.3: How to FIX spill - and when AQE can hurt!

In [None]:
# 7.3: How to FIX spill - and when AQE can hurt!
#
# In 7.2 we saw spill with AQE OFF.
# What if we try AQE ON? Let's see...

from pyspark.sql.functions import col, count, sum as spark_sum

print("="*60)
print("WHEN AQE CAN MAKE THINGS WORSE")
print("="*60)

# Enable AQE
spark.conf.set("spark.sql.adaptive.enabled", "true")

print("\nRunning same query with AQE ON...")

spark.sparkContext.setJobDescription("7.3a: High cardinality with AQE ON")

df_spill = spark.range(5000000) \
    .withColumn("group_key", (col("id") % 1000000)) \
    .withColumn("value", col("id") * 10)

result = df_spill.groupBy("group_key").agg(
    count("*").alias("cnt"),
    spark_sum("value").alias("total")
)
result.collect()

print("Done! Check SQL tab - compare with 7.2")

# Reset AQE
spark.conf.set("spark.sql.adaptive.enabled", "false")

print("\n" + "="*60)
print("WHY AQE MADE IT WORSE!")
print("="*60)
print("""
Compare the FINAL HashAggregate spill:

  7.2 (AQE OFF): 200 partitions → spill size: 0 B      ✓
  7.3 (AQE ON):  8 partitions   → spill size: 52 MiB  ✗

WHAT HAPPENED?
==============
AQE saw:   "200 partitions with ~250KB each - too small!"
AQE did:   Coalesced 200 → 8 partitions
Result:    Each partition now has 125K keys → SPILL!

  WITHOUT AQE (200 partitions):
  ┌─────┐ ┌─────┐ ┌─────┐     ┌─────┐
  │5000 │ │5000 │ │5000 │ ... │5000 │  ← 5K keys each
  │keys │ │keys │ │keys │     │keys │     (fits in memory!)
  └─────┘ └─────┘ └─────┘     └─────┘
     200 small partitions = OK

  WITH AQE (coalesced to 8 partitions):
  ┌───────────────────────────────────┐
  │         125,000 keys              │  ← 125K keys each
  │    (TOO BIG - SPILLS TO DISK!)    │     (doesn't fit!)
  └───────────────────────────────────┘
     8 large partitions = SPILL!

AQE optimizes for SHUFFLE OVERHEAD (fewer small tasks)
but doesn't consider AGGREGATION MEMORY needs!


HOW TO FIX SPILL
================

OPTION 1: MORE PARTITIONS
=========================
More partitions = Less data per task = Smaller hash tables

  ┌─────────────────────────────────────────────────────────┐
  │  100 partitions          vs      500 partitions         │
  │                                                         │
  │  1M keys ÷ 100 = 10K keys/task   1M ÷ 500 = 2K keys/task│
  │                                                         │
  │  Hash table: ~10K entries        Hash table: ~2K entries│
  │  Memory: ~1 MB per task          Memory: ~200 KB/task   │
  │                                                         │
  │  May spill!                      Fits easily! ✓         │
  └─────────────────────────────────────────────────────────┘

To increase partitions:
  spark.conf.set("spark.sql.shuffle.partitions", "500")

Trade-off:
  ✓ Less memory per task → No spill
  ✗ More tasks → More scheduling overhead
  
Best for: High cardinality data


OPTION 2: MORE MEMORY
=====================
More memory = Larger hash tables fit = No spill

  ┌─────────────────────────────────────────────────────────┐
  │  512 MB executor         vs      2 GB executor          │
  │                                                         │
  │  Task memory: ~50 MB             Task memory: ~200 MB   │
  │  Hash table limit: ~30 MB        Hash table: ~120 MB    │
  │                                                         │
  │  125K keys = ~12 MB              125K keys = ~12 MB     │
  │  + overhead = SPILL!             + overhead = FITS! ✓   │
  └─────────────────────────────────────────────────────────┘

To increase memory:
  spark.conf.set("spark.executor.memory", "2g")

Or increase the fraction used for execution:
  spark.conf.set("spark.memory.fraction", "0.8")  # default is 0.6

Trade-off:
  ✓ More memory per task → No spill
  ✗ Need more RAM on cluster
  
Best for: You have available RAM


OPTION 3: DISABLE AQE COALESCING
================================
Keep AQE benefits but prevent aggressive coalescing:

  spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "false")

This keeps:
  ✓ AQE's dynamic join optimization
  ✓ AQE's skew handling
  
But disables:
  ✗ Partition coalescing (which caused our problem!)


OPTION 4: FILTER AND SELECT EARLY
=================================
Reduce data BEFORE the groupBy:

  # Bad: groupBy on full data
  df.groupBy("key").agg(sum("value"))
  
  # Better: filter first
  df.filter(col("date") > "2024-01-01") \\
    .select("key", "value") \\
    .groupBy("key").agg(sum("value"))

Less data = Smaller hash tables = Less spill


SUMMARY: WHEN TO USE WHAT
=========================
+-------------------------------------------------------+    
| Problem                  | Solution                   |
|--------------------------|----------------------------|
| High cardinality groupBy | More partitions            |
| Cluster has spare RAM    | Increase executor memory   |
| AQE coalescing too much  | Disable coalescing         |
| Processing full table    | Filter/select early        |
| Very wide rows           | Select only needed columns |
+-------------------------------------------------------+

KEY INSIGHT
===========
Spill happens when: Data per task > Task memory

Fix by either:
  1. Reduce data per task (more partitions, filter early)
  2. Increase task memory (more executor memory)

AQE is usually helpful, but for high cardinality aggregations,
it can coalesce partitions too aggressively and cause spill!
""")

WHEN AQE CAN MAKE THINGS WORSE

Running same query with AQE ON...


                                                                                

Done! Check SQL tab - compare with 7.2

WHY AQE MADE IT WORSE!

Compare the FINAL HashAggregate spill:

  7.2 (AQE OFF): 200 partitions → spill size: 0 B      ✓
  7.3 (AQE ON):  8 partitions   → spill size: 52 MiB  ✗

WHAT HAPPENED?
AQE saw:   "200 partitions with ~250KB each - too small!"
AQE did:   Coalesced 200 → 8 partitions
Result:    Each partition now has 125K keys → SPILL!

  WITHOUT AQE (200 partitions):
  ┌─────┐ ┌─────┐ ┌─────┐     ┌─────┐
  │5000 │ │5000 │ │5000 │ ... │5000 │  ← 5K keys each
  │keys │ │keys │ │keys │     │keys │     (fits in memory!)
  └─────┘ └─────┘ └─────┘     └─────┘
     200 small partitions = OK

  WITH AQE (coalesced to 8 partitions):
  ┌───────────────────────────────────┐
  │         125,000 keys              │  ← 125K keys each
  │    (TOO BIG - SPILLS TO DISK!)    │     (doesn't fit!)
  └───────────────────────────────────┘
     8 large partitions = SPILL!

AQE optimizes for SHUFFLE OVERHEAD (fewer small tasks)
but doesn't consider AGGREGATION ME

## 7.4: Reading SQL Plans to Find Optimizations

The SQL plan tells you EXACTLY what Spark will do. Learning to read it helps you spot performance issues.

**Common pattern to look for:** Check if your parallelism matches your available cores!

In [None]:
# 7.4: Reading SQL Plans to Find Optimizations
#
# One common mistake: Adding more cores but not more partitions!
# The SQL plan reveals this issue clearly.

from pyspark.sql.functions import col, count, sum

print("="*70)
print("SCENARIO: We increased from 2 cores to 16 cores")
print("          but the query got SLOWER! Why?")
print("="*70)

# BAD: Default partitions (usually 2 on local)
print("\n--- BAD: Using default partitions ---")
df_bad = spark.range(5000000) \
    .withColumn("group_key", (col("id") % 1000000)) \
    .withColumn("value", col("id") * 10)

print("\nPhysical Plan (look for 'splits'):")
df_bad.groupBy("group_key").agg(count("*")).explain()

print("\n" + "-"*70)

# GOOD: Explicit partitions matching cores
print("\n--- GOOD: Explicit partitions = 16 (matching our cores) ---")
df_good = spark.range(0, 5000000, 1, numPartitions=16) \
    .withColumn("group_key", (col("id") % 1000000)) \
    .withColumn("value", col("id") * 10)

print("\nPhysical Plan (look for 'splits'):")
df_good.groupBy("group_key").agg(count("*")).explain()

print("\n" + "="*70)
print("WHAT TO LOOK FOR IN THE PLAN")
print("="*70)
print("""
In the Range operator, find 'splits=Some(N)':

  BAD:  Range (0, 5000000, step=1, splits=Some(2))
        ^
        Only 2 partitions! 14 cores sitting idle!

  GOOD: Range (0, 5000000, step=1, splits=Some(16))
        ^
        16 partitions! All cores working!
""")

print("""
THE RULE:
=========
    Partitions >= Available Cores

    Otherwise, cores sit idle waiting for work!

    ┌────────────────────────────────────────────────┐
    │  16 Cores with 2 Partitions                    │
    │  ┌────┐ ┌────┐ ┌────┐ ... ┌────┐               │
    │  │BUSY│ │BUSY│ │IDLE│     │IDLE│  ← 14 idle!   │
    │  └────┘ └────┘ └────┘     └────┘               │
    └────────────────────────────────────────────────┘

    ┌────────────────────────────────────────────────┐
    │  16 Cores with 16 Partitions                   │
    │  ┌────┐ ┌────┐ ┌────┐ ... ┌────┐               │
    │  │BUSY│ │BUSY│ │BUSY│     │BUSY│  ← all work!  │
    │  └────┘ └────┘ └────┘     └────┘               │
    └────────────────────────────────────────────────┘
""")

print("""
HOW TO FIX PARTITION COUNT:
===========================

1. For spark.range():
   spark.range(0, 5000000, 1, numPartitions=16)

2. For reading files:
   spark.read.option("maxPartitionBytes", "128MB").parquet(path)

3. For any DataFrame (adds a shuffle!):
   df.repartition(16)

4. Set default parallelism at session start:
   .config("spark.default.parallelism", "16")
""")

SCENARIO: We increased from 2 cores to 16 cores
          but the query got SLOWER! Why?

--- BAD: Using default partitions ---

Physical Plan (look for 'splits'):
== Physical Plan ==
*(2) HashAggregate(keys=[group_key#247L], functions=[count(1)])
+- Exchange hashpartitioning(group_key#247L, 200), ENSURE_REQUIREMENTS, [plan_id=514]
   +- *(1) HashAggregate(keys=[group_key#247L], functions=[partial_count(1)])
      +- *(1) Project [(id#245L % 1000000) AS group_key#247L]
         +- *(1) Range (0, 5000000, step=1, splits=8)



----------------------------------------------------------------------

--- GOOD: Explicit partitions = 16 (matching our cores) ---

Physical Plan (look for 'splits'):
== Physical Plan ==
*(2) HashAggregate(keys=[group_key#265L], functions=[count(1)])
+- Exchange hashpartitioning(group_key#265L, 200), ENSURE_REQUIREMENTS, [plan_id=543]
   +- *(1) HashAggregate(keys=[group_key#265L], functions=[partial_count(1)])
      +- *(1) Project [(id#263L % 1000000) AS group_k

In [18]:
# 7.4b: Prove it - Run both and compare times
#
# Let's actually run both versions and see the difference

import time
from pyspark.sql.functions import col, count, sum

print("="*70)
print("PERFORMANCE COMPARISON: Partition count vs Core count")
print("="*70)

# Run with 2 partitions (bad)
spark.sparkContext.setJobDescription("7.4a: BAD - Only 2 partitions")
df_bad = spark.range(5000000) \
    .withColumn("group_key", (col("id") % 1000000)) \
    .withColumn("value", col("id") * 10)

start = time.time()
df_bad.groupBy("group_key").agg(count("*"), sum("value")).collect()
time_bad = time.time() - start
print(f"\n2 partitions:  {time_bad:.1f} seconds")

# Run with 16 partitions (good)
spark.sparkContext.setJobDescription("7.4b: GOOD - 16 partitions")
spark.conf.set("spark.sql.shuffle.partitions", "16")
df_good = spark.range(0, 5000000, 1, numPartitions=16) \
    .withColumn("group_key", (col("id") % 1000000)) \
    .withColumn("value", col("id") * 10)

start = time.time()
df_good.groupBy("group_key").agg(count("*"), sum("value")).collect()
time_good = time.time() - start
print(f"16 partitions: {time_good:.1f} seconds")

# Calculate improvement
if time_bad > time_good:
    improvement = ((time_bad - time_good) / time_bad) * 100
    print(f"\n→ {improvement:.0f}% faster with proper parallelism!")
else:
    print(f"\n→ Results vary - check Spark UI for task distribution")

print("\n" + "="*70)
print("CHECK SPARK UI - SQL TAB")
print("="*70)
print("""
Compare the two queries:

  7.4a (BAD):  Range shows splits=Some(2)
               Stage 0 has only 2 tasks

  7.4b (GOOD): Range shows splits=Some(16)
               Stage 0 has 16 tasks running in parallel

KEY INSIGHT:
============
More cores WITHOUT more partitions = WORSE performance!

The overhead of managing unused resources costs time,
while only 2 cores actually do the work.
""")

PERFORMANCE COMPARISON: Partition count vs Core count


                                                                                


2 partitions:  7.5 seconds


                                                                                

16 partitions: 10.0 seconds

→ Results vary - check Spark UI for task distribution

CHECK SPARK UI - SQL TAB

Compare the two queries:

  7.4a (BAD):  Range shows splits=Some(2)
               Stage 0 has only 2 tasks

  7.4b (GOOD): Range shows splits=Some(16)
               Stage 0 has 16 tasks running in parallel

KEY INSIGHT:
More cores WITHOUT more partitions = WORSE performance!

The overhead of managing unused resources costs time,
while only 2 cores actually do the work.



---
# Part 8: The Executors Tab

The **Executors tab** shows you the health of your cluster.

- **Driver**: Your notebook (sends commands)
- **Executor(s)**: The workers (do the processing)

In [None]:
# 8.1: Check executor health
from pyspark.sql.functions import rand

# Run a larger job
df = spark.range(10_000_000) \
    .withColumn("value", rand() * 1000) \
    .withColumn("group", (col("id") % 50_000))

spark.sparkContext.setJobDescription("8.1: Large aggregation")

result = df.groupBy("group").agg(
    count("*").alias("count"),
    sum("value").alias("total")
)
result.show(10)

print("\n" + "="*50)
print("CHECK THE EXECUTORS TAB")
print("="*50)
print("""
1. Go to the EXECUTORS tab

2. You should see:
   - "driver" row - your notebook
   - Executor row(s) - the workers

3. Important columns:
   +---------------------------------------------------------+
   | Column         | Good Value | Warning Sign              |
   |----------------|------------|---------------------------|
   | Disk Used      | 0          | High = spilling to disk   |
   | Failed Tasks   | 0          | Any failures = problem    |
   | GC Time        | Low        | High = memory pressure    |
   +---------------------------------------------------------+

KEY INSIGHT:
The Executors tab is your "health dashboard".
Check it when jobs are slow or failing.
""")

                                                                                


CHECK THE EXECUTORS TAB

1. Go to the EXECUTORS tab

2. You should see:
   - "driver" row - your notebook
   - Executor row(s) - the workers

3. Important columns:
   +---------------------------------------------------------+
   |----------------|------------|---------------------------|
   | Disk Used      | 0          | High = spilling to disk   |
   | Failed Tasks   | 0          | Any failures = problem    |
   | GC Time        | Low        | High = memory pressure    |
   +---------------------------------------------------------+

KEY INSIGHT:
The Executors tab is your "health dashboard".
Check it when jobs are slow or failing.



---
# Part 9: Event Timeline Colors

When you look at a Stage's **Event Timeline**, you'll see colored bars.

Each color means something different!

In [None]:
# 9.1: Understanding Event Timeline colors
from pyspark.sql.functions import avg

df = spark.range(50000) \
    .withColumn("data", rand())

spark.sparkContext.setJobDescription("9.1: For Event Timeline")

result = df.groupBy((col("id") % 10).alias("bucket")).agg(
    avg("data").alias("average")
)
result.show()

print("\n" + "="*50)
print("EXPLORE THE EVENT TIMELINE")
print("="*50)
print("""
1. Jobs tab → Click "9.1: For Event Timeline"

2. Click on a Stage to see details

3. Find "Event Timeline" (expand if collapsed)

4. Each task is a horizontal bar. The COLORS mean:

   +-----------------------------------------+
   | Color  | Meaning                        |
   |--------|--------------------------------|
   | Blue   | Scheduler Delay (waiting)      |
   | Red    | Task Deserialization           |
   | Orange | Shuffle Read Time              |
   | GREEN  | Executor Computing (the work!) |
   | Yellow | Shuffle Write Time             |
   | Purple | Result Serialization           |
   | Cyan   | Getting Result Time            |
   +-----------------------------------------+

IDEAL: Most of each bar should be GREEN!
   - Too much blue? Cluster is overloaded
   - Too much orange/yellow? Shuffle is the bottleneck
""")

---
# Part 10: Caching

If you use the same data multiple times, you can **cache** it.

This stores the data in memory so Spark doesn't have to recompute it.

## 10.1: Caching demonstration

In [28]:
# 10.1: Caching demonstration
import time
from pyspark.sql.functions import avg, rand

# Create data
df = spark.range(100_000) \
    .withColumn("value", rand() * 100) \
    .withColumn("category", (col("id") % 10))

# Query 1: NO cache
print("Query 1: WITHOUT cache")
spark.sparkContext.setJobDescription("10.1a: No cache")
start = time.time()
df.groupBy("category").agg(sum("value")).show()
print(f"Time: {time.time() - start:.2f} seconds")

# Now CACHE the DataFrame
print("\n" + "-"*50)
print("Caching the data...")
df.cache()

# Query 2: Populates the cache
print("\nQuery 2: Populating cache")
spark.sparkContext.setJobDescription("10.1b: Populate cache")
start = time.time()
df.groupBy("category").agg(avg("value")).show()
print(f"Time: {time.time() - start:.2f} seconds")

# Query 3: USES the cache
print("\nQuery 3: Using cache (should be faster!)")
spark.sparkContext.setJobDescription("10.1c: Using cache")
start = time.time()
df.groupBy("category").agg(count("*")).show()
print(f"Time: {time.time() - start:.2f} seconds")

print("\n" + "="*50)
print("CHECK THE STORAGE TAB NOW!")
print("="*50)
print("""
1. Go to the STORAGE tab

2. You should see your cached DataFrame:
   - Size in Memory
   - Fraction Cached (should be 100%)

3. In Jobs tab, compare 10.1a, 10.1b, 10.1c:
   - 10.1c might show "Skipped" stages (read from cache!)

4. In SQL tab, look for "InMemoryTableScan"
   - This means it read from cache!

>>> Run the NEXT cell to unpersist when done viewing <<<
""")

Query 1: WITHOUT cache


                                                                                

+--------+------------------+
|category|        sum(value)|
+--------+------------------+
|       0| 503731.4222419934|
|       7| 501159.7311670624|
|       6| 500157.0110107722|
|       9|501392.38308998715|
|       5| 493392.3598788386|
|       1|503723.96725818363|
|       3| 499204.3114544312|
|       8|499195.94903787115|
|       2|500297.06337110884|
|       4| 502219.8874024233|
+--------+------------------+

Time: 1.78 seconds

--------------------------------------------------
Caching the data...

Query 2: Populating cache


                                                                                

+--------+------------------+
|category|        avg(value)|
+--------+------------------+
|       0| 50.37314222419934|
|       7| 50.11597311670624|
|       6| 50.01570110107722|
|       9| 50.13923830899871|
|       5| 49.33923598788386|
|       1| 50.37239672581836|
|       3| 49.92043114544312|
|       8|49.919594903787115|
|       2| 50.02970633711089|
|       4| 50.22198874024233|
+--------+------------------+

Time: 1.69 seconds

Query 3: Using cache (should be faster!)
+--------+--------+
|category|count(1)|
+--------+--------+
|       0|   10000|
|       7|   10000|
|       6|   10000|
|       9|   10000|
|       5|   10000|
|       1|   10000|
|       3|   10000|
|       8|   10000|
|       2|   10000|
|       4|   10000|
+--------+--------+

Time: 1.30 seconds

CHECK THE STORAGE TAB NOW!

1. Go to the STORAGE tab

2. You should see your cached DataFrame:
   - Size in Memory
   - Fraction Cached (should be 100%)

3. In Jobs tab, compare 10.1a, 10.1b, 10.1c:
   - 10.1c might s

## 10.2: Clean up cache

In [26]:
# 10.2: Clean up cache
#
# Run this AFTER you've checked the Storage tab!

df.unpersist()
print("Cache cleared!")
print("\nCheck the Storage tab again - it should be empty now.")

Cache cleared!

Check the Storage tab again - it should be empty now.


---
# Part 11: Joins

**Joins** combine data from two tables.

In [27]:
# 11.1: Create two tables

# Sales table
sales = spark.createDataFrame([
    (1, "Store A", 100),
    (2, "Store B", 200),
    (3, "Store A", 150),
    (4, "Store C", 300),
    (5, "Store B", 250),
], ["sale_id", "store", "amount"])

# Stores table  
stores = spark.createDataFrame([
    ("Store A", "New York"),
    ("Store B", "Los Angeles"),
    ("Store C", "Chicago"),
], ["store", "city"])

print("Sales table:")
sales.show()

print("Stores table:")
stores.show()

Sales table:


                                                                                

+-------+-------+------+
|sale_id|  store|amount|
+-------+-------+------+
|      1|Store A|   100|
|      2|Store B|   200|
|      3|Store A|   150|
|      4|Store C|   300|
|      5|Store B|   250|
+-------+-------+------+

Stores table:
+-------+-----------+
|  store|       city|
+-------+-----------+
|Store A|   New York|
|Store B|Los Angeles|
|Store C|    Chicago|
+-------+-----------+



In [28]:
# 11.2: Join the tables

spark.sparkContext.setJobDescription("11.2: Join sales with stores")

joined = sales.join(stores, "store")
joined.show()

print("\n" + "="*50)
print("CHECK THE SQL TAB")
print("="*50)
print("""
1. Go to SQL tab, find the join query

2. Look at the DAG:
   - Two "LocalTableScan" boxes (reading both tables)
   - "BroadcastHashJoin" - the join

BROADCAST JOIN:
The "stores" table is tiny, so Spark "broadcasts" it -
sends the small table to all workers.
This avoids shuffling the larger table!
""")

                                                                                

+-------+-------+------+-----------+
|  store|sale_id|amount|       city|
+-------+-------+------+-----------+
|Store C|      4|   300|    Chicago|
|Store B|      2|   200|Los Angeles|
|Store B|      5|   250|Los Angeles|
|Store A|      1|   100|   New York|
|Store A|      3|   150|   New York|
+-------+-------+------+-----------+


CHECK THE SQL TAB

1. Go to SQL tab, find the join query

2. Look at the DAG:
   - Two "LocalTableScan" boxes (reading both tables)
   - "BroadcastHashJoin" - the join

BROADCAST JOIN:
The "stores" table is tiny, so Spark "broadcasts" it -
sends the small table to all workers.
This avoids shuffling the larger table!



In [29]:
# 11.3: Join + GroupBy (combining everything!)

spark.sparkContext.setJobDescription("11.3: Total sales by city")

result = sales \
    .join(stores, "store") \
    .groupBy("city") \
    .agg(sum("amount").alias("total_sales"))

result.show()

print("\n" + "="*50)
print("FINAL EXERCISE: ANALYZE THE QUERY")
print("="*50)
print("""
In the SQL tab, find this query. You should see:

1. Two scans (reading both tables)
2. BroadcastHashJoin (joining)
3. HashAggregate (partial sum)
4. Exchange (shuffle!)
5. HashAggregate (final sum)

This combines EVERYTHING from the workshop:
- Reading data
- Joining tables
- Shuffling
- Aggregating
""")

                                                                                

+-----------+-----------+
|       city|total_sales|
+-----------+-----------+
|Los Angeles|        450|
|    Chicago|        300|
|   New York|        250|
+-----------+-----------+


FINAL EXERCISE: ANALYZE THE QUERY

In the SQL tab, find this query. You should see:

1. Two scans (reading both tables)
2. BroadcastHashJoin (joining)
3. HashAggregate (partial sum)
4. Exchange (shuffle!)
5. HashAggregate (final sum)

This combines EVERYTHING from the workshop:
- Reading data
- Joining tables
- Shuffling
- Aggregating



---
# Summary

## Key Concepts

1. **Spark is LAZY**: Transformations don't run until you call an action
2. **Actions create Jobs**: show(), count(), collect(), write()
3. **Jobs contain Stages**: Stages are separated by shuffles
4. **Stages contain Tasks**: Tasks run in parallel
5. **Shuffles are expensive**: groupBy, join, orderBy move data

## Spark UI Tabs

| Tab | What to Look For |
|-----|------------------|
| Jobs | Blue=success, Red=failed, job count |
| Stages | Shuffle read/write, skipped stages |
| Storage | Cached DataFrames |
| Executors | Failed tasks, GC time, disk spill |
| SQL | Visual DAG, Exchange = shuffle |

## Event Timeline Colors

| Color | Meaning |
|-------|-------------------------------|
| Green | Executor Computing (the work!) |
| Blue | Scheduler Delay |
| Orange | Shuffle Read |
| Yellow | Shuffle Write |

## Quick Reference

```python
# Name your jobs
spark.sparkContext.setJobDescription("My job name")

# Cache data you reuse
df.cache()
df.unpersist()
```

In [2]:
# Clean up - run when done
print("Stopping Spark session...")
spark.stop()
print("\nWorkshop complete!")
print("\nKey takeaways:")
print("1. Spark is LAZY - transformations wait for actions")
print("2. Jobs → Stages → Tasks")
print("3. Shuffles create new stages (expensive!)")
print("4. Use setJobDescription() to label your jobs")
print("5. Check Executors tab for cluster health")

Stopping Spark session...

Workshop complete!

Key takeaways:
1. Spark is LAZY - transformations wait for actions
2. Jobs → Stages → Tasks
3. Shuffles create new stages (expensive!)
4. Use setJobDescription() to label your jobs
5. Check Executors tab for cluster health
