# Query Plan

In [0]:
from pyspark.sql.types import *
import pyspark.sql.functions as F

## Load data into Data frame. Reading Parquet Files

In [0]:
transactions_file = "dbfs:/mnt/data/data/data_skew/transactions.parquet"
df_transactions = spark.read.parquet(transactions_file)

In [0]:
df_transactions.show(5, False)

+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
|cust_id   |start_date|end_date  |txn_id         |date      |year|month|day|expense_type |amt   |city       |
+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
|C0YDPQWPBJ|2010-07-01|2018-12-01|TZ5SMKZY9S03OQJ|2018-10-07|2018|10   |7  |Entertainment|10.42 |boston     |
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYIAPPNU066CJ5R|2016-03-27|2016|3    |27 |Motor/Travel |44.34 |portland   |
|C0YDPQWPBJ|2010-07-01|2018-12-01|TETSXIK4BLXHJ6W|2011-04-11|2011|4    |11 |Entertainment|3.18  |chicago    |
|C0YDPQWPBJ|2010-07-01|2018-12-01|TQKL1QFJY3EM8LO|2018-02-22|2018|2    |22 |Groceries    |268.97|los_angeles|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYL6DFP09PPXMVB|2010-10-16|2010|10   |16 |Entertainment|2.66  |chicago    |
+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
only showi

In [0]:
customers_file = "dbfs:/mnt/data/data/data_skew/customers.parquet"
df_customers = spark.read.parquet(customers_file)

In [0]:
df_customers.show(5, False)

+----------+-------------+---+------+----------+-----+-----------+
|cust_id   |name         |age|gender|birthday  |zip  |city       |
+----------+-------------+---+------+----------+-----+-----------+
|C007YEYTX9|Aaron Abbott |34 |Female|7/13/1991 |97823|boston     |
|C00B971T1J|Aaron Austin |37 |Female|12/16/2004|30332|chicago    |
|C00WRSJF1Q|Aaron Barnes |29 |Female|3/11/1977 |23451|denver     |
|C01AZWQMF3|Aaron Barrett|31 |Male  |7/9/1998  |46613|los_angeles|
|C01BKUFRHA|Aaron Becker |54 |Male  |11/24/1979|40284|san_diego  |
+----------+-------------+---+------+----------+-----+-----------+
only showing top 5 rows



# Narrow Transformations
- `filter` rows where `city='boston'`
- `add` a new column: adding `first_name` and `last_name`
- `alter` an exisitng column: adding 5 to `age` column
- `select` relevant columns

In [0]:
df_narrow_transform = (
    df_customers
    .filter(F.col("city") == "boston")
    .withColumn("first_name", F.split("name", " ").getItem(0))
    .withColumn("last_name", F.split("name", " ").getItem(1))
    .withColumn("age", F.col("age") + F.lit(5))
    .select("cust_id", "first_name", "last_name", "age", "gender", "birthday")
)

In [0]:
# Trigger and action
df_narrow_transform.show(5, False)

+----------+----------+---------+----+------+---------+
|cust_id   |first_name|last_name|age |gender|birthday |
+----------+----------+---------+----+------+---------+
|C007YEYTX9|Aaron     |Abbott   |39.0|Female|7/13/1991|
|C08XAQUY73|Aaron     |Lambert  |59.0|Female|11/5/1966|
|C094P1VXF9|Aaron     |Lindsey  |29.0|Male  |9/21/1990|
|C097SHE1EF|Aaron     |Lopez    |27.0|Female|4/18/2001|
|C0DTC6436T|Aaron     |Schwartz |57.0|Female|7/9/1962 |
+----------+----------+---------+----+------+---------+
only showing top 5 rows



In [0]:
# See the query plan
df_narrow_transform.explain(True)

== Parsed Logical Plan ==
'Project ['cust_id, 'first_name, 'last_name, 'age, 'gender, 'birthday]
+- Project [cust_id#70, name#71, (cast(age#72 as double) + cast(5 as double)) AS age#133, gender#73, birthday#74, zip#75, city#76, first_name#114, last_name#123]
   +- Project [cust_id#70, name#71, age#72, gender#73, birthday#74, zip#75, city#76, first_name#114, split(name#71,  , -1)[1] AS last_name#123]
      +- Project [cust_id#70, name#71, age#72, gender#73, birthday#74, zip#75, city#76, split(name#71,  , -1)[0] AS first_name#114]
         +- Filter (city#76 = boston)
            +- Relation [cust_id#70,name#71,age#72,gender#73,birthday#74,zip#75,city#76] parquet

== Analyzed Logical Plan ==
cust_id: string, first_name: string, last_name: string, age: double, gender: string, birthday: string
Project [cust_id#70, first_name#114, last_name#123, age#133, gender#73, birthday#74]
+- Project [cust_id#70, name#71, (cast(age#72 as double) + cast(5 as double)) AS age#133, gender#73, birthday#74, 

# Wide Transformations
1. Repartition
2. Coalesce
3. Joins
4. GroupBy
   - `count`
   - `countDistinct`
   - `sum`

## 1. Repartition

In [0]:
df_transactions.rdd.getNumPartitions()

Out[9]: 12

In [0]:
df_transactions.repartition(24).explain(True)

== Parsed Logical Plan ==
Repartition 24, true
+- Relation [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] parquet

== Analyzed Logical Plan ==
cust_id: string, start_date: string, end_date: string, txn_id: string, date: string, year: string, month: string, day: string, expense_type: string, amt: string, city: string
Repartition 24, true
+- Relation [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] parquet

== Optimized Logical Plan ==
Repartition 24, true
+- Relation [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] parquet

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange RoundRobinPartitioning(24), REPARTITION_BY_NUM, [plan_id=75]
   +- FileScan parquet [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] Batched: true, DataFilters: [], Format: Parquet,

## 2. Coalesce

In [0]:
df_transactions.coalesce(5).explain(True)

== Parsed Logical Plan ==
Repartition 5, false
+- Relation [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] parquet

== Analyzed Logical Plan ==
cust_id: string, start_date: string, end_date: string, txn_id: string, date: string, year: string, month: string, day: string, expense_type: string, amt: string, city: string
Repartition 5, false
+- Relation [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] parquet

== Optimized Logical Plan ==
Repartition 5, false
+- Relation [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] parquet

== Physical Plan ==
Coalesce 5
+- *(1) ColumnarToRow
   +- FileScan parquet [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[dbfs:/mnt/data/data/data_skew/transact

### Why doesn't `.coalesce()` explicitly show the partitioning scheme?

`.coalesce` doesn't show the partitioning scheme e.g. `RoundRobinPartitioning` because: 
- The operation only minimizes data movement by merging into fewer partitions, it doesn't do any shuffling.
- Because no shuffling is done, the partitioning scheme remains the same as the original DataFrame and Spark doesn't include it explicitly in it's plan as the partitioning scheme is unaffected by `.coalesce`

## 3. Joins

In [0]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [0]:
df_joined = (
    df_transactions.join(
        df_customers,
        how="inner",
        on="cust_id"
    )
)

In [0]:
df_joined.explain(True)

== Parsed Logical Plan ==
'Join UsingJoin(Inner,Buffer(cust_id))
:- Relation [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] parquet
+- Relation [cust_id#70,name#71,age#72,gender#73,birthday#74,zip#75,city#76] parquet

== Analyzed Logical Plan ==
cust_id: string, start_date: string, end_date: string, txn_id: string, date: string, year: string, month: string, day: string, expense_type: string, amt: string, city: string, name: string, age: string, gender: string, birthday: string, zip: string, city: string
Project [cust_id#2, start_date#3, end_date#4, txn_id#5, date#6, year#7, month#8, day#9, expense_type#10, amt#11, city#12, name#71, age#72, gender#73, birthday#74, zip#75, city#76]
+- Join Inner, (cust_id#2 = cust_id#70)
   :- Relation [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] parquet
   +- Relation [cust_id#70,name#71,age#72,gender#73,birthday#74,zip#75,city#76] parquet

## 4. GroupBy

In [0]:
df_transactions.printSchema()

root
 |-- cust_id: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- txn_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- expense_type: string (nullable = true)
 |-- amt: string (nullable = true)
 |-- city: string (nullable = true)



### GroupBy Count

In [0]:
df_city_counts = (
    df_transactions
    .groupBy("city")
    .count()
)

In [0]:
df_city_counts.explain(True)

== Parsed Logical Plan ==
'Aggregate ['city], ['city, count(1) AS count#209L]
+- Relation [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] parquet

== Analyzed Logical Plan ==
city: string, count: bigint
Aggregate [city#12], [city#12, count(1) AS count#209L]
+- Relation [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] parquet

== Optimized Logical Plan ==
Aggregate [city#12], [city#12, count(1) AS count#209L]
+- Project [city#12]
   +- Relation [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] parquet

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[city#12], functions=[finalmerge_count(merge count#213L) AS count(1)#208L], output=[city#12, count#209L])
   +- Exchange hashpartitioning(city#12, 200), ENSURE_REQUIREMENTS, [plan_id=200]
      +- HashAggregate(keys=[city#12], functions=[partial_coun

In [0]:
df_txn_amt_city = (
    df_transactions
    .groupBy("city")
    .agg(F.sum("amt").alias("txn_amt"))
)

In [0]:
df_txn_amt_city.explain(True)

== Parsed Logical Plan ==
'Aggregate ['city], ['city, sum('amt) AS txn_amt#229]
+- Relation [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] parquet

== Analyzed Logical Plan ==
city: string, txn_amt: double
Aggregate [city#12], [city#12, sum(cast(amt#11 as double)) AS txn_amt#229]
+- Relation [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] parquet

== Optimized Logical Plan ==
Aggregate [city#12], [city#12, sum(cast(amt#11 as double)) AS txn_amt#229]
+- Project [amt#11, city#12]
   +- Relation [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] parquet

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[city#12], functions=[finalmerge_sum(merge sum#233) AS sum(cast(amt#11 as double))#228], output=[city#12, txn_amt#229])
   +- Exchange hashpartitioning(city#12, 200), ENSURE_REQUIREMENTS, [plan_id=2

### GroupBy Count Distinct 

In [0]:
df_txn_per_city = (
    df_transactions
    .groupBy("cust_id")
    .agg(F.countDistinct("city").alias("city_count"))
)

In [0]:
df_txn_per_city.show(5, False)
df_txn_per_city.explain(True)

+----------+----------+
|cust_id   |city_count|
+----------+----------+
|CPP8BY8U93|10        |
|CYB8BX9LU1|10        |
|CFRT841CCD|10        |
|CA0TSNMYDK|10        |
|COZ8NONEVZ|10        |
+----------+----------+
only showing top 5 rows

== Parsed Logical Plan ==
'Aggregate ['cust_id], ['cust_id, 'count(distinct 'city) AS city_count#252]
+- Relation [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] parquet

== Analyzed Logical Plan ==
cust_id: string, city_count: bigint
Aggregate [cust_id#2], [cust_id#2, count(distinct city#12) AS city_count#252L]
+- Relation [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] parquet

== Optimized Logical Plan ==
Aggregate [cust_id#2], [cust_id#2, count(distinct city#12) AS city_count#252L]
+- Project [cust_id#2, city#12]
   +- Relation [cust_id#2,start_date#3,end_date#4,txn_id#5,date#6,year#7,month#8,day#9,expense_type#10,amt#11,city#12] parqu

# 5. Interesting Observations

### Why is a filter step present despite predicate pushdown? 

This is largely due to the way `Spark's Catalyst Optimizer` works. Specifically, due to two separate stages of the query optimization process: Physical Planning and Logical Planning.

- **Logical Planning**: Catalyst optimizer simplifies the unresolved logical plan (which represents the user's query) by applying various rule-based optimizations. This includes `predicate pushdown`, `projection pushdown` where filter conditions and column projections are moved as close to the data source as possible.

- **Physical Planning** phase is where the logical plan is translated into one or more physical plans, which can actually be executed on the cluster. This includes operations like file `scans`, `filters`, `projections`, etc.

In this case, during the logical planning phase, the predicate (`F.col("city") == "boston"`) has been pushed down and will be applied during the scan of the Parquet file (`PushedFilters: [IsNotNull(city), EqualTo(city,boston)]`), thus improving performance.

Now, during the physical planning phase, the same filter condition (`+- *(1) Filter (isnotnull(city#73) AND (city#73 = boston))`) is applied again to the data that's been loaded into memory. This is because of the following reasons:

1. **Guaranteed Correctness:** It might seem **redundant**, but remember that not all data sources can handle pushed-down predicates, and not all predicates can be pushed down. Therefore, **even if a predicate is pushed down to the data source, Spark still includes the predicate in the physical plan** to cover cases where the data source might not have been able to fully apply the predicate. This is Spark's way of making sure the correct data is always returned, no matter the capabilities of the data source.

2. **No Assumptions**: Spark's Catalyst optimizer doesn't make assumptions about the data source's ability to handle pushed-down predicates. The optimizer aims to generate plans that return correct results across a wide range of scenarios. Even if the filter is pushed down, Spark does not have the feedback from data source whether the pushdown was successful or not, so it includes the filter operation in the physical plan as well.

It is more of a **fail-safe mechanism** to ensure data **integrity** and **correctness**.

---

### In what cases will predicate pushdown not work?

2 Examples where **filter pushdown** will not work:

1. **Complex Data Types**: Spark's Parquet data source does not push down filters that involve **complex types**, such as **arrays**, **maps**, and **structs**. This is because these complex data types can have complicated nested structures that the Parquet reader cannot easily filter on.

Here's an example:

```
root
 |-- Name: string (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+----------+-----------------------------+
|Name      |properties                   |
+----------+-----------------------------+
|Afaque    |[eye -> black, hair -> black]|
|Naved     |[eye ->, hair -> brown]      |
|Ali       |[eye -> black, hair -> red]  |
|Amaan     |[eye -> grey, hair -> grey]  |
|Omaira    |[eye -> , hair -> brown]     |
+----------+-----------------------------+
```

```python
df.filter(df.properties.getItem("eye") == "brown").show()
```

```
== Physical Plan ==
*(1) Filter (metadata#123[key] = value)
+- *(1) ColumnarToRow
   +- FileScan parquet [id#122,metadata#123] Batched: true, DataFilters: [(metadata#123[key] = value)], Format: Parquet, ...
```

------------------------------------------------

3. Unsupported Expressions: 

In Spark, `Parquet` data source does not support pushdown for filters involving a `.cast` operation. The reason for this behaviour is as follows:
- `.cast` changes the datatype of the column, and the Parquet data source may not be able to perform the filter operation correctly on the cast data.

**Note**: This behavior may vary based on the data source. For example, if you're working with a JDBC data source connected to a database that supports SQL-like operations, the `.cast` filter could potentially be pushed down to the database.

### Example of operation where filter pushdown doesn't work

In [0]:
df_customer_gt_50 = (
    df_customers
    .filter(F.col("age").cast("int") > 50)
)
df_customer_gt_50.show(5, False)
df_customer_gt_50.explain(True)

+----------+--------------+---+------+----------+-----+------------+
|cust_id   |name          |age|gender|birthday  |zip  |city        |
+----------+--------------+---+------+----------+-----+------------+
|C01BKUFRHA|Aaron Becker  |54 |Male  |11/24/1979|40284|san_diego   |
|C01WMZQ7PN|Aaron Brady   |51 |Female|8/20/1994 |52204|philadelphia|
|C021567NJZ|Aaron Briggs  |57 |Male  |3/10/1990 |22008|philadelphia|
|C02JNTM46B|Aaron Chambers|51 |Male  |1/6/2001  |63337|new_york    |
|C030A69V1L|Aaron Clarke  |55 |Male  |4/28/1999 |77176|philadelphia|
+----------+--------------+---+------+----------+-----+------------+
only showing top 5 rows

== Parsed Logical Plan ==
'Filter (cast('age as int) > 50)
+- Relation [cust_id#70,name#71,age#72,gender#73,birthday#74,zip#75,city#76] parquet

== Analyzed Logical Plan ==
cust_id: string, name: string, age: string, gender: string, birthday: string, zip: string, city: string
Filter (cast(age#72 as int) > 50)
+- Relation [cust_id#70,name#71,age#72,gend