[YT Video link](https://youtu.be/KnUXztKueMU?si=ILuFJgT4CVukrOMz)

# Spark Query Plan

### understanding spark query plan (with examples)

when you write a spark query, it doesn't just run directly. spark goes through multiple stages to transform your code into an optimized execution plan. let's break it down step by step:

---

## **step 1: writing pyspark code**
consider this simple query:

```python
df = spark.read.csv("employees.csv", header=True, inferSchema=True)
df.filter(df["salary"] > 50000).select("name", "salary").show()
```

even though we wrote just two operations (`filter` and `select`), under the hood, spark follows **several steps** before execution.

---

## **step 2: syntax validation**
before spark even thinks about execution, it checks whether the code **follows correct syntax** (like checking if columns exist, functions are used properly, etc.).

- ✅ `df["salary"] > 50000` → valid syntax  
- ❌ `df["salary"] > "high"` → invalid, type mismatch error

if the syntax is fine, spark moves ahead.

---

## **step 3: unresolved logical plan**
at first, spark **doesn’t know what `salary` means** because it hasn’t checked the dataset yet. it creates an **unresolved logical plan**, meaning:

- it understands the structure of the query (`filter` then `select`)
- but it doesn't know if the columns exist or what their types are

### **example of an unresolved logical plan**
```plaintext
'Project ['name, 'salary]   ← selecting columns
 +- 'Filter ('salary > 50000)   ← filtering condition
    +- 'UnresolvedRelation [employees.csv]   ← table not verified yet
```

at this stage, `salary` and `employees.csv` are just **strings**, not actual references.

---

## **step 4: catalog lookup (resolving logical plan)**
spark now **consults the catalog** (metadata store) to **validate the table and column names**.

- ✅ if `employees.csv` is found and contains `salary` → spark proceeds  
- ❌ if the table or column is missing → error: `AnalysisException: cannot resolve column 'salary'`

once resolved, the logical plan is updated with **actual references** instead of just names.

### **resolved logical plan**
```plaintext
Project [name, salary]
 +- Filter (salary#23 > 50000)
    +- Relation[ name#21, salary#23] csv
```

now, `salary#23` means it's a real column from the dataset.

---

## **step 5: logical optimization (catalyst optimizer)**
spark now **optimizes the logical plan** using **rule-based optimizations**.

### **example optimization - predicate pushdown**
consider if our `employees.csv` is huge. instead of reading everything, spark **pushes the filter down** to the data source level (if supported).

#### **before optimization (bad)**
```plaintext
Read entire employees.csv → Apply filter (salary > 50000) → Select columns
```
(spark reads all data first, which is inefficient.)

#### **after optimization (good)**
```plaintext
Push filter (salary > 50000) to csv reader → Read only matching rows → Select columns
```
(this reduces data read from disk, making execution faster.)

this is called **filter pushdown**, and spark does similar optimizations like **pushdown projection**, where it only reads required columns instead of all.

### **optimized logical plan**
```plaintext
Project [name, salary]
 +- Relation[ name#21, salary#23] csv (PushedFilters: [salary > 50000])
```
(the filter has been pushed down to the csv reader.)

---

## **step 6: physical plan generation**
now spark converts the optimized logical plan into an **actual execution plan**.

- **what execution strategy should be used?**
- **what is the best way to run this on a distributed cluster?**

this stage decides whether to:
- use **sort-merge join** or **broadcast join**
- use **hash aggregation** or **sort aggregation**
- apply **parallel execution strategies**

### **example of a physical plan**
```plaintext
*(1) Project [name, salary]
 +- *(1) Filter (salary#23 > 50000)
    +- FileScan csv [name#21, salary#23] PushedFilters: [salary > 50000]
```
- `*(1)` → means it's **executing on multiple cores in parallel**
- `FileScan csv` → shows it's reading only **necessary columns and rows**
- `PushedFilters: [salary > 50000]` → confirms **filter pushdown happened**

---

## **step 7: execution**
now spark **executes the physical plan** in stages:
1. loads the data (with filters applied at the source if possible)
2. applies transformations in a distributed way
3. materializes results and returns output

```
+-------+--------+
|  name | salary |
+-------+--------+
| Alice | 60000  |
|  Bob  | 70000  |
+-------+--------+
```

---

## **summary of spark query plan stages**
| stage | what happens? |
|-------|--------------|
| **1. syntax validation** | checks if the query syntax is correct |
| **2. unresolved logical plan** | spark understands the structure but doesn’t verify column names or table existence |
| **3. catalog lookup (resolving)** | spark verifies column names & table existence |
| **4. logical optimization (catalyst optimizer)** | optimizes query, applies filter & projection pushdown |
| **5. physical plan generation** | decides best execution strategy (joins, aggregations, etc.) |
| **6. execution** | spark runs the plan in parallel on the cluster |

---

## **extra: seeing the query plan in action**
you can inspect the query plan using:
```python
df.explain(True)  # detailed plan
```
or
```python
df.explain("formatted")  # better readability
```

---

## **key optimizations in catalyst optimizer**
1. **predicate pushdown**: apply `WHERE` filters at the data source (reducing I/O)
2. **projection pushdown**: only select required columns (avoiding reading extra data)
3. **constant folding**: precompute constant expressions (`2 + 3 → 5`)
4. **reordering joins**: reorders joins to process smaller datasets first
5. **eliminating unnecessary operations**: removes redundant transformations

---

## **final thoughts**
- spark **does not execute line by line**—it **builds a plan, optimizes it, then executes efficiently**.
- **filter pushdown** and **projection pushdown** reduce data read from disk, improving performance.
- `explain()` is your friend—**use it to understand query plans** and identify inefficiencies.

---

that’s the full breakdown. lmk if you need more depth on any step. 🚀

# Imports & Configuration

In [7]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('MyApp').getOrCreate()
sc = spark.sparkContext

# Reading File

In [3]:
transactions_file = "../data/data_skew/transactions.parquet"
df_transactions = spark.read.parquet(transactions_file)

In [5]:
df_transactions.show(5, False) #show full string, no tructacate(True i.e truncate by dfeault value)

+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
|cust_id   |start_date|end_date  |txn_id         |date      |year|month|day|expense_type |amt   |city       |
+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
|C0YDPQWPBJ|2010-07-01|2018-12-01|TZ5SMKZY9S03OQJ|2018-10-07|2018|10   |7  |Entertainment|10.42 |boston     |
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYIAPPNU066CJ5R|2016-03-27|2016|3    |27 |Motor/Travel |44.34 |portland   |
|C0YDPQWPBJ|2010-07-01|2018-12-01|TETSXIK4BLXHJ6W|2011-04-11|2011|4    |11 |Entertainment|3.18  |chicago    |
|C0YDPQWPBJ|2010-07-01|2018-12-01|TQKL1QFJY3EM8LO|2018-02-22|2018|2    |22 |Groceries    |268.97|los_angeles|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYL6DFP09PPXMVB|2010-10-16|2010|10   |16 |Entertainment|2.66  |chicago    |
+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
only showi

In [4]:
customers_file = "../data/data_skew/customers.parquet"
df_customers = spark.read.parquet(customers_file)

In [6]:
df_customers.show(5, False)

+----------+-------------+---+------+----------+-----+-----------+
|cust_id   |name         |age|gender|birthday  |zip  |city       |
+----------+-------------+---+------+----------+-----+-----------+
|C007YEYTX9|Aaron Abbott |34 |Female|7/13/1991 |97823|boston     |
|C00B971T1J|Aaron Austin |37 |Female|12/16/2004|30332|chicago    |
|C00WRSJF1Q|Aaron Barnes |29 |Female|3/11/1977 |23451|denver     |
|C01AZWQMF3|Aaron Barrett|31 |Male  |7/9/1998  |46613|los_angeles|
|C01BKUFRHA|Aaron Becker |54 |Male  |11/24/1979|40284|san_diego  |
+----------+-------------+---+------+----------+-----+-----------+
only showing top 5 rows



# Spark's Query Plans

Image Credits: `Databricks`

![Spark Execution](../data/spark-execution.png)

# Narrow Transformations
- `filter` rows where `city='boston'`
- `add` a new column: adding `first_name` and `last_name`
- `alter` an exisitng column: adding 5 to `age` column
- `select` relevant columns

> In PySpark, lit(5) creates a column with the literal value 5 for every row in a DataFrame. The lit function is used to add a column with a constant value, regardless of the other data in the DataFrame. It's useful for tasks like adding flags, metadata, or performing calculations with a fixed value.


In [8]:
df_narrow_transform = (
    df_customers
    .filter(col("city") == "boston")
    .withColumn("first_name", split("name", " ").getItem(0))
    .withColumn("last_name", split("name", " ").getItem(1))
    .withColumn("age", col("age") + lit(5))
    .select("cust_id", "first_name", "last_name", "age", "gender", "birthday")
)

df_narrow_transform.show(5, False)
df_narrow_transform.explain(True)

+----------+----------+---------+---+------+---------+
|cust_id   |first_name|last_name|age|gender|birthday |
+----------+----------+---------+---+------+---------+
|C007YEYTX9|Aaron     |Abbott   |39 |Female|7/13/1991|
|C08XAQUY73|Aaron     |Lambert  |59 |Female|11/5/1966|
|C094P1VXF9|Aaron     |Lindsey  |29 |Male  |9/21/1990|
|C097SHE1EF|Aaron     |Lopez    |27 |Female|4/18/2001|
|C0DTC6436T|Aaron     |Schwartz |57 |Female|7/9/1962 |
+----------+----------+---------+---+------+---------+
only showing top 5 rows

== Parsed Logical Plan ==
'Project ['cust_id, 'first_name, 'last_name, 'age, 'gender, 'birthday]
+- Project [cust_id#22, name#23, (cast(age#24 as bigint) + cast(5 as bigint)) AS age#131L, gender#25, birthday#26, zip#27, city#28, first_name#112, last_name#121]
   +- Project [cust_id#22, name#23, age#24, gender#25, birthday#26, zip#27, city#28, first_name#112, split(name#23,  , -1)[1] AS last_name#121]
      +- Project [cust_id#22, name#23, age#24, gender#25, birthday#26, zip#2

# Wide Transformations
1. Repartition
2. Coalesce
3. Joins
4. GroupBy
   - `count`
   - `countDistinct`
   - `sum`

## 1. Repartition

In [9]:
df_transactions.rdd.getNumPartitions()

12

In [10]:
df_transactions.repartition(24).explain(True)

== Parsed Logical Plan ==
Repartition 24, true
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Analyzed Logical Plan ==
cust_id: string, start_date: string, end_date: string, txn_id: string, date: string, year: string, month: string, day: string, expense_type: string, amt: string, city: string
Repartition 24, true
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Optimized Logical Plan ==
Repartition 24, true
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange RoundRobinPartitioning(24), REPARTITION_BY_NUM, [plan_id=80]
   +- FileScan parquet [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] Batched: true, DataFilters: [], Format: Parquet, Locatio

## 2. Coalesce

In [12]:
df_transactions.coalesce(5).explain(True)

== Parsed Logical Plan ==
Repartition 5, false
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Analyzed Logical Plan ==
cust_id: string, start_date: string, end_date: string, txn_id: string, date: string, year: string, month: string, day: string, expense_type: string, amt: string, city: string
Repartition 5, false
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Optimized Logical Plan ==
Repartition 5, false
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Physical Plan ==
Coalesce 5
+- *(1) ColumnarToRow
   +- FileScan parquet [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/afaqueahmad/Documents/youtube/spar

### Why doesn't `.coalesce()` explicitly show the partitioning scheme?

`.coalesce` doesn't show the partitioning scheme e.g. `RoundRobinPartitioning` because: 
- The operation only minimizes data movement by merging into fewer partitions, it doesn't do any shuffling.
- Because no shuffling is done, the partitioning scheme remains the same as the original DataFrame and Spark doesn't include it explicitly in it's plan as the partitioning scheme is unaffected by `.coalesce`

## 3. Joins

In [13]:
spark.conset("spark.sql.autoBroadcastJoinThreshold", -1)

In [14]:
df_joined = (
    df_transactions.join(
        df_customers,
        how="inner",
        on="cust_id"
    )
)

In [15]:
df_joined.explain(True)

== Parsed Logical Plan ==
'Join UsingJoin(Inner,Buffer(cust_id))
:- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet
+- Relation [cust_id#67,name#68,age#69,gender#70,birthday#71,zip#72,city#73] parquet

== Analyzed Logical Plan ==
cust_id: string, start_date: string, end_date: string, txn_id: string, date: string, year: string, month: string, day: string, expense_type: string, amt: string, city: string, name: string, age: string, gender: string, birthday: string, zip: string, city: string
Project [cust_id#0, start_date#1, end_date#2, txn_id#3, date#4, year#5, month#6, day#7, expense_type#8, amt#9, city#10, name#68, age#69, gender#70, birthday#71, zip#72, city#73]
+- Join Inner, (cust_id#0 = cust_id#67)
   :- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet
   +- Relation [cust_id#67,name#68,age#69,gender#70,birthday#71,zip#72,city#73] parquet

== O

## 4. GroupBy

In [16]:
df_transactions.printSchema()

root
 |-- cust_id: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- txn_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- expense_type: string (nullable = true)
 |-- amt: string (nullable = true)
 |-- city: string (nullable = true)



### GroupBy Count

In [17]:
df_city_counts = (
    df_transactions
    .groupBy("city")
    .count()
)

In [18]:
df_city_counts.explain(True)

== Parsed Logical Plan ==
'Aggregate ['city], ['city, count(1) AS count#199L]
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Analyzed Logical Plan ==
city: string, count: bigint
Aggregate [city#10], [city#10, count(1) AS count#199L]
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Optimized Logical Plan ==
Aggregate [city#10], [city#10, count(1) AS count#199L]
+- Project [city#10]
   +- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[city#10], functions=[count(1)], output=[city#10, count#199L])
   +- Exchange hashpartitioning(city#10, 200), ENSURE_REQUIREMENTS, [id=#131]
      +- HashAggregate(keys=[city#10], functions=[partial_count(1)], output=[city#10, count#203L])
         +- File

In [19]:
df_txn_amt_city = (
    df_transactions
    .groupBy("city")
    .agg(sum("amt").alias("txn_amt"))
)

In [20]:
df_txn_amt_city.explain(True)

== Parsed Logical Plan ==
'Aggregate ['city], ['city, sum('amt) AS txn_amt#216]
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Analyzed Logical Plan ==
city: string, txn_amt: double
Aggregate [city#10], [city#10, sum(cast(amt#9 as double)) AS txn_amt#216]
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Optimized Logical Plan ==
Aggregate [city#10], [city#10, sum(cast(amt#9 as double)) AS txn_amt#216]
+- Project [amt#9, city#10]
   +- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[city#10], functions=[sum(cast(amt#9 as double))], output=[city#10, txn_amt#216])
   +- Exchange hashpartitioning(city#10, 200), ENSURE_REQUIREMENTS, [id=#144]
      +- HashAggregate(keys=[city#10], function

### GroupBy Count Distinct 

In [21]:
df_txn_per_city = (
    df_transactions
    .groupBy("cust_id")
    .agg(countDistinct("city").alias("city_count"))
)

In [22]:
df_txn_per_city.show(5, False)
df_txn_per_city.explain(True)



+----------+----------+
|cust_id   |city_count|
+----------+----------+
|CPP8BY8U93|10        |
|CYB8BX9LU1|10        |
|CFRT841CCD|10        |
|CA0TSNMYDK|10        |
|COZ8NONEVZ|10        |
+----------+----------+
only showing top 5 rows

== Parsed Logical Plan ==
'Aggregate ['cust_id], ['cust_id, 'count(distinct 'city) AS city_count#232]
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Analyzed Logical Plan ==
cust_id: string, city_count: bigint
Aggregate [cust_id#0], [cust_id#0, count(distinct city#10) AS city_count#232L]
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Optimized Logical Plan ==
Aggregate [cust_id#0], [cust_id#0, count(distinct city#10) AS city_count#232L]
+- Project [cust_id#0, city#10]
   +- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

==

                                                                                

# 5. Intresting Observations

### Why is a filter step present despite predicate pushdown? 

This is largely due to the way `Spark's Catalyst Optimizer` works. Specifically, due to two separate stages of the query optimization process: Physical Planning and Logical Planning.

- **Logical Planning**: Catalyst optimizer simplifies the unresolved logical plan (which represents the user's query) by applying various rule-based optimizations. This includes `predicate pushdown`, `projection pushdown` where filter conditions and column projections are moved as close to the data source as possible.

- **Physical Planning** phase is where the logical plan is translated into one or more physical plans, which can actually be executed on the cluster. This includes operations like file `scans`, `filters`, `projections`, etc.

In this case, during the logical planning phase, the predicate (`col("city") == "boston"`) has been pushed down and will be applied during the scan of the Parquet file (`PushedFilters: [IsNotNull(city), EqualTo(city,boston)]`), thus improving performance.

Now, during the physical planning phase, the same filter condition (`+- *(1) Filter (isnotnull(city#73) AND (city#73 = boston))`) is applied again to the data that's been loaded into memory. This is because of the following reasons:

1. **Guaranteed Correctness:** It might seem **redundant**, but remember that not all data sources can handle pushed-down predicates, and not all predicates can be pushed down. Therefore, **even if a predicate is pushed down to the data source, Spark still includes the predicate in the physical plan** to cover cases where the data source might not have been able to fully apply the predicate. This is Spark's way of making sure the correct data is always returned, no matter the capabilities of the data source.

2. **No Assumptions**: Spark's Catalyst optimizer doesn't make assumptions about the data source's ability to handle pushed-down predicates. The optimizer aims to generate plans that return correct results across a wide range of scenarios. Even if the filter is pushed down, Spark does not have the feedback from data source whether the pushdown was successful or not, so it includes the filter operation in the physical plan as well.

It is more of a **fail-safe mechanism** to ensure data **integrity** and **correctness**.

---

### In what cases will predicate pushdown not work?

2 Examples where **filter pushdown** will not work:

1. **Complex Data Types**: Spark's Parquet data source does not push down filters that involve **complex types**, such as **arrays**, **maps**, and **structs**. This is because these complex data types can have complicated nested structures that the Parquet reader cannot easily filter on.

Here's an example:

```
root
 |-- Name: string (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+----------+-----------------------------+
|Name      |properties                   |
+----------+-----------------------------+
|Afaque    |[eye -> black, hair -> black]|
|Naved     |[eye ->, hair -> brown]      |
|Ali       |[eye -> black, hair -> red]  |
|Amaan     |[eye -> grey, hair -> grey]  |
|Omaira    |[eye -> , hair -> brown]     |
+----------+-----------------------------+
```

```python
dfilter(dproperties.getItem("eye") == "brown").show()
```

```
== Physical Plan ==
*(1) Filter (metadata#123[key] = value)
+- *(1) ColumnarToRow
   +- FileScan parquet [id#122,metadata#123] Batched: true, DataFilters: [(metadata#123[key] = value)], Format: Parquet, ...
```

------------------------------------------------

3. Unsupported Expressions: 

In Spark, `Parquet` data source does not support pushdown for filters involving a `.cast` operation. The reason for this behaviour is as follows:
- `.cast` changes the datatype of the column, and the Parquet data source may not be able to perform the filter operation correctly on the cast data.

**Note**: This behavior may vary based on the data source. For example, if you're working with a JDBC data source connected to a database that supports SQL-like operations, the `.cast` filter could potentially be pushed down to the database.

### Example of operation where filter pushdown doesn't work

In [23]:
df_customer_gt_50 = (
    df_customers
    .filter(col("age").cast("int") > 50)
)
df_customer_gt_50.show(5, False)
df_customer_gt_50.explain(True)

+----------+--------------+---+------+----------+-----+------------+
|cust_id   |name          |age|gender|birthday  |zip  |city        |
+----------+--------------+---+------+----------+-----+------------+
|C01BKUFRHA|Aaron Becker  |54 |Male  |11/24/1979|40284|san_diego   |
|C01WMZQ7PN|Aaron Brady   |51 |Female|8/20/1994 |52204|philadelphia|
|C021567NJZ|Aaron Briggs  |57 |Male  |3/10/1990 |22008|philadelphia|
|C02JNTM46B|Aaron Chambers|51 |Male  |1/6/2001  |63337|new_york    |
|C030A69V1L|Aaron Clarke  |55 |Male  |4/28/1999 |77176|philadelphia|
+----------+--------------+---+------+----------+-----+------------+
only showing top 5 rows

== Parsed Logical Plan ==
'Filter (cast('age as int) > 50)
+- Relation [cust_id#67,name#68,age#69,gender#70,birthday#71,zip#72,city#73] parquet

== Analyzed Logical Plan ==
cust_id: string, name: string, age: string, gender: string, birthday: string, zip: string, city: string
Filter (cast(age#69 as int) > 50)
+- Relation [cust_id#67,name#68,age#69,gend

# understanding exchange, roundrobinpartitioning, hashpartitioning, and hashaggregate in spark query plans

## **understanding `exchange`, `roundrobinpartitioning`, `hashpartitioning`, and `hashaggregate` in spark query plans**  

when you see **exchange** in a spark query plan, it means **data is being shuffled** across partitions. let’s break down what this means and why it happens.

---

## **1. what is exchange? (data shuffle)**
`exchange` means spark is **redistributing data across partitions**, usually because of a join, groupBy, or some other operation that requires rebalancing data.

- **before exchange**: data is in some initial partitions
- **exchange happens**: spark moves data around the cluster
- **after exchange**: data is now in new partitions

**example:**  
```python
df = spark.read.csv("employees.csv", header=True, inferSchema=True)

df.groupBy("department").count().explain(True)
```
**query plan output:**
```plaintext
== Physical Plan ==
*(2) HashAggregate(keys=[department#23], functions=[count(1)])
+- Exchange hashpartitioning(department#23, 200), REPARTITION_BY_NUM
   +- *(1) HashAggregate(keys=[department#23], functions=[partial_count(1)])
      +- FileScan csv [department#23]
```
### **what’s happening here?**
- **`FileScan csv`** → loads data
- **`HashAggregate (partial_count)`** → counts records **locally** on each partition
- **`Exchange hashpartitioning(department, 200)`** → **shuffles data** so that all rows with the same `department` end up on the same partition
- **`HashAggregate (final count)`** → performs final aggregation after shuffle

---

## **2. `roundrobinpartitioning` (fair redistribution)**
this partitioning method **evenly distributes rows across partitions in a round-robin fashion**. it **ignores keys** and just tries to balance load.

**use case:**  
- when reading large datasets and you want balanced partitions (e.g., before training an ML model)

**example:**  
```python
df = spark.read.csv("employees.csv", header=True, inferSchema=True)
df.repartition(10).explain(True)
```
**query plan output:**
```plaintext
== Physical Plan ==
Exchange roundrobinpartitioning(10), REPARTITION_BY_NUM
+- FileScan csv [name#21, department#23, salary#25]
```
### **what’s happening here?**
- **`FileScan csv`** → reads the file
- **`Exchange roundrobinpartitioning(10)`** → reshuffles data evenly across **10 partitions**  

**why use this?**
- good when **no natural partitioning key exists** but you want balanced partitions
- not efficient for **joins or aggregations** since it ignores data relationships

---

## **3. `hashpartitioning` (grouping similar data together)**
this partitioning method **distributes data based on a hash of a column value**.  

**use case:**  
- used in **joins** and **groupBys** where we need the same key in the same partition  

**example:**  
```python
df1 = spark.read.csv("employees.csv", header=True, inferSchema=True)
df2 = spark.read.csv("departments.csv", header=True, inferSchema=True)

df1.join(df2, "department").explain(True)
```
**query plan output:**
```plaintext
== Physical Plan ==
*(5) SortMergeJoin [department#23], [department#44], Inner
:- Exchange hashpartitioning(department#23, 200), REPARTITION_BY_NUM
+- *(2) Scan csv [department#23, name#21]
+- Exchange hashpartitioning(department#44, 200), REPARTITION_BY_NUM
   +- *(4) Scan csv [department#44, location#45]
```
### **what’s happening here?**
- `Scan csv` → reads both datasets
- `Exchange hashpartitioning(department, 200)` → **shuffles data so that all rows with the same department go to the same partition**
- `SortMergeJoin` → now joins can happen efficiently since related rows are co-located

**why use this?**
- avoids **full table scans** by ensuring related rows are in the same partition
- reduces **network traffic** during joins

---

## **4. `hashaggregate` (optimized aggregation)**
this is an **optimized aggregation technique** that avoids full shuffle when possible.

**example:**  
```python
df.groupBy("department").agg({"salary": "sum"}).explain(True)
```
**query plan output:**
```plaintext
== Physical Plan ==
*(2) HashAggregate(keys=[department#23], functions=[sum(salary#25)])
+- Exchange hashpartitioning(department#23, 200), REPARTITION_BY_NUM
   +- *(1) HashAggregate(keys=[department#23], functions=[partial_sum(salary#25)])
      +- FileScan csv [department#23, salary#25]
```
### **what’s happening here?**
1. **`HashAggregate (partial_sum)`** → spark first **partially** sums values locally
2. **`Exchange hashpartitioning(department, 200)`** → shuffles data so that all `department` rows are together
3. **`HashAggregate (final sum)`** → computes the **final sum** after shuffling

**why use this?**
- faster than naive aggregation (e.g., sort-based aggregation)
- avoids **unnecessary sorting**, which saves time

---

## **summary of key concepts**
| term | what it does | when is it used? |
|------|-------------|------------------|
| **exchange** | shuffles data across partitions | joins, groupBy, repartition |
| **roundrobinpartitioning** | distributes rows evenly across partitions (ignoring keys) | load balancing, ML preprocessing |
| **hashpartitioning** | distributes rows based on a hash of a key column | joins, groupBy, aggregations |
| **hashaggregate** | optimizes aggregations by avoiding full shuffle | sum, count, avg, etc. |

---

## **final thoughts**
- `exchange` = **shuffle**, which can be expensive, so minimize it
- `roundrobinpartitioning` = **balance load** but **not good for joins**
- `hashpartitioning` = **good for joins and aggregations** (avoids full table scans)
- `hashaggregate` = **faster aggregation without sorting**

if you want to avoid excessive shuffling, **always check `df.explain(True)`** to understand what spark is doing. 🚀

# video Summary

Here are the notes from the YouTube video "Master Reading Spark Query Plans":
*   It is crucial to understand Spark query plans before doing any kind of Spark optimization.
*   Spark query plans help in understanding how Spark functions internally.

**Spark Query Plan Generation**
*   Spark checks the syntax of the code.
*   Spark generates an unresolved logical plan.
*   Spark uses a catalog to maintain details about tables, data frames, columns, and data types.
*   Spark verifies the existence of tables and columns.
*   Spark generates a logical plan.
*   The Catalyst Optimizer optimizes the logical plan.
*   **Filter push down** optimization pushes filters to the data source.
*   **Push down projection** optimization selects only the columns used in the query.
*   An optimized logical plan is converted into several physical plans.
*   Physical plans run on the cluster.
*   An internal cost model chooses the most efficient physical plan.
*   The final query plan is executed on the cluster.

**Narrow Transformations**
*   Narrow transformations are operations that don't involve a shuffle. Examples: filter operation, adding a new column, or selecting columns.
*   The physical plan is the one executed on the cluster.
*   The steps in the physical plan include file scan, columnar to row conversion, filter operation, and project.
*   Spark adds "is not null" checks as part of its optimization.

**Wide Transformations**
*   Wide transformations involve shuffle.
*   **Repartitioning** redistributes data to a specified number of partitions.
*   Repartitioning involves shuffling data across partitions.
*   **Round Robin partitioning** is a partitioning scheme that distributes data row by row to each partition.
*   **Adaptive Query Execution (AQE)** uses runtime statistics to select the most efficient query plan.
*   AQE considers runtime statistics like the number of byte rate and number of partitions.

**Coalesce**
*   Coalesce helps reduce the number of partitions.
*   Coalesce tries to avoid a shuffle unless it has to do it in some extreme edge cases.
*   Coalesce merges partitions within the same executor to avoid data transfer between executors.
*   In cases of aggressive reduction in the number of partitions, coalesce may involve shuffling.
*   When repartitioning, a partitioning scheme is involved (e.g., round robin partitioning).
*   When doing a coalesce, there is no partitioning scheme involved because there is no shuffle.

**Join**
*   The query plan is read from bottom to top.
*   The steps in the physical plan for a join include file scan, filter (not null), shuffle, sort, and sort merge join.
*   **Hash partitioning** ensures that the same keys end up in the same partition.
*   Hash partitioning involves hashing the join key and using the modulo operator to determine the partition.

**Group By**
*   The physical plan for a group by operation includes file read and hash aggregate.
*   Hash aggregate involves doing a local count before shuffling the data.
*   After shuffling, the same keys end up in the same partition.
*   A final count is performed after the shuffle.
*   When counting distinct values, the query plan involves multiple hash aggregates and shuffles.
*   The distinct count is performed at the local level before shuffling and then at the global level.

**Observations on Narrow Transformation**
*   Spark adds a filter step for the filter added in the code.
*   Spark pushes filters to the source during the file scan.
*   Spark puts another filter to guarantee correctness.
*   Operating on a smaller dataset after filtering at the start makes the subsequent filter operation less redundant.
*   Predicate pushdown may not work when the column is a map type or when filtering on an unsupported expression (e.g., a Cast Operation).
*   Whether filters can be pushed down depends on the data source.


# MCQ Questions

Here are some multiple-choice questions (MCQs) based on the concepts from the YouTube video "Master Reading Spark Query Plans" to help you revise:

1.  Which of the following is the **first step** in Spark query plan generation, according to the video?
    *   A) Generating physical plans
    *   B) Checking the syntax of the code
    *   C) Optimizing the logical plan
    *   D) Executing the final query plan

2.  What is the purpose of the **Catalyst Optimizer** in Spark query plan generation?
    *   A) To check the syntax of the code
    *   B) To maintain details about tables and data frames
    *   C) To optimize the logical plan
    *   D) To execute the final query plan

3.  Which of the following is an example of a **narrow transformation** in Spark?
    *   A) Repartitioning
    *   B) Shuffling
    *   C) Adding a new column
    *   D) Coalesce with aggressive reduction

4.  What does **"filter push down"** optimization do?
    *   A) Selects only the columns used in the query
    *   B) Pushes filters to the data source
    *   C) Redistributes data to a specified number of partitions
    *   D) Merges partitions within the same executor

5.  Which of the following is **NOT** a characteristic of **coalesce**?
    *   A) It helps reduce the number of partitions
    *   B) It tries to avoid a shuffle
    *   C) It always involves a partitioning scheme
    *   D) It merges partitions within the same executor

6.  What does **Adaptive Query Execution (AQE)** use to select the most efficient query plan?
    *   A) Syntax of the code
    *   B) Logical plan
    *   C) Runtime statistics
    *   D) Physical plan

7.  In a Spark query plan for a join operation, what does **hash partitioning** ensure?
    *   A) That the data is sorted
    *   B) That null values are filtered out
    *   C) That the same keys end up in the same partition
    *   D) That the data is redistributed evenly

8.  What does **"hash aggregate"** involve in a group by operation?
    *   A) Shuffling all of the data before counting
    *   B) Doing a local count before shuffling the data
    *   C) Sorting the data
    *   D) Filtering null values

9.  According to the video, why does Spark add an additional filter step even if filters are pushed down to the source?
    *   A) To make the query plan more complex
    *   B) To operate on a larger dataset
    *   C) To guarantee correctness
    *   D) To avoid predicate pushdown

10. In which of the following scenarios might **predicate pushdown NOT work**?
    *   A) When filtering on an integer column
    *   B) When filtering on a string column
    *   C) When the column is a map type
    *   D) When filtering on a column that is not null


Answers :
Here are the answers to the multiple-choice questions, with explanations:

1.  B) Checking the syntax of the code. **Spark first checks if the syntax is correct** before generating any plan.
2.  C) To optimize the logical plan. The Catalyst Optimizer's main function is to **optimize the logical plan** by applying techniques like filter and projection pushdown.
3.  C) Adding a new column. Narrow transformations don't involve shuffling. Adding a column is a narrow transformation, whereas repartitioning involves a shuffle and is thus a wide transformation.
4.  B) Pushes filters to the data source. Filter push down is an optimization technique that **pushes filter operations to the data source** to reduce the amount of data read.
5.  C) It always involves a partitioning scheme. **Coalesce tries to avoid shuffles** and thus doesn't always have a partitioning scheme. Repartitioning always involves a shuffle.
6.  C) Runtime statistics. AQE uses runtime statistics such as number of byte rate and number of partitions to choose the most efficient query plan.
7.  C) That the same keys end up in the same partition. Hash partitioning ensures that rows with **the same key (e.g., join key)** end up in the same partition after a shuffle.
8.  B) Doing a local count before shuffling the data. Hash aggregate first performs a **partial count locally** on each partition before the shuffle.
9.  C) To guarantee correctness. Spark adds an additional filter as a **fail-safe** to ensure data integrity and correctness, because some filters cannot be pushed down.
10. C) When the column is a map type. Predicate pushdown might not work when filtering on certain data types or unsupported expressions. An example provided in the video is filtering on a **map type column**.
