# Imports & Configuration

In [0]:
import os
import warnings
warnings.filterwarnings("ignore")

In [0]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'

In [0]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

# Reading File

In [0]:
transactions_fp = os.path.abspath("../data/data_skew/transactions.parquet")
transactions_fp = "file:" + transactions_fp
df_transactions = spark.read.parquet(transactions_fp)

In [0]:
df_transactions.display(5, False)

In [0]:
customers_fp = os.path.abspath("../data/data_skew/customers.parquet")
customers_fp = "file:" + customers_fp
df_customers = spark.read.parquet(customers_fp)

In [0]:
df_customers.display(5, False)

# Spark's Query Plans

Image Credits: `Databricks`

![Spark Execution](../data/spark-execution.png)

# Narrow Transformations
- `filter` rows where `city='boston'`
- `add` a new column: adding `first_name` and `last_name`
- `alter` an exisitng column: adding 5 to `age` column
- `select` relevant columns

In [0]:
df_narrow_transform = (
    df_customers
    .filter(F.col("city") == "boston")
    .withColumn("first_name", F.split("name", " ").getItem(0))
    .withColumn("last_name", F.split("name", " ").getItem(1))
    .withColumn("age", F.col("age") + F.lit(5))
    .select("cust_id", "first_name", "last_name", "age", "gender", "birthday")
)

df_narrow_transform.display(5, False)
df_narrow_transform.explain(True)

# Wide Transformations
1. Repartition
2. Coalesce
3. Joins
4. GroupBy
   - `count`
   - `countDistinct`
   - `sum`

## 1. Repartition

In [0]:
df_transactions.rdd.getNumPartitions()

In [0]:
df_transactions.repartition(24).explain(True)

## 2. Coalesce

In [0]:
df_transactions.coalesce(5).explain(True)

In [0]:
df_transactions.coalesce(1).explain(True)

### Why doesn't `.coalesce()` explicitly show the partitioning scheme?

`.coalesce` doesn't show the partitioning scheme e.g. `RoundRobinPartitioning` because: 
- The operation only minimizes data movement by merging into fewer partitions, it doesn't do any shuffling.
- Because no shuffling is done, the partitioning scheme remains the same as the original DataFrame and Spark doesn't include it explicitly in it's plan as the partitioning scheme is unaffected by `.coalesce`

## 3. Joins

In [0]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [0]:
df_joined = (
    df_transactions.join(
        df_customers,
        how="inner",
        on="cust_id"
    )
)

In [0]:
df_joined.display()

In [0]:
df_joined.explain("FORMATTED")

## 4. GroupBy

In [0]:
df_transactions.printSchema()

### GroupBy Count

In [0]:
df_city_counts = (
    df_transactions
    .groupBy("city")
    .count()
)

In [0]:
df_city_counts.explain(True)

In [0]:
df_txn_amt_city = (
    df_transactions
    .groupBy("city")
    .agg(F.sum("amt").alias("txn_amt"))
)

In [0]:
df_txn_amt_city.explain(True)

### GroupBy Count Distinct 

In [0]:
df_txn_per_city = (
    df_transactions
    .groupBy("cust_id")
    .agg(F.countDistinct("city").alias("city_count"))
)

In [0]:
df_txn_per_city.show(5, False)
df_txn_per_city.explain(True)

# 5. Interesting Observations

### Why is a filter step present despite predicate pushdown? 

This is largely due to the way `Spark's Catalyst Optimizer` works. Specifically, due to two separate stages of the query optimization process: Physical Planning and Logical Planning.

- **Logical Planning**: Catalyst optimizer simplifies the unresolved logical plan (which represents the user's query) by applying various rule-based optimizations. This includes `predicate pushdown`, `projection pushdown` where filter conditions and column projections are moved as close to the data source as possible.

- **Physical Planning** phase is where the logical plan is translated into one or more physical plans, which can actually be executed on the cluster. This includes operations like file `scans`, `filters`, `projections`, etc.

In this case, during the logical planning phase, the predicate (`F.col("city") == "boston"`) has been pushed down and will be applied during the scan of the Parquet file (`PushedFilters: [IsNotNull(city), EqualTo(city,boston)]`), thus improving performance.

Now, during the physical planning phase, the same filter condition (`+- *(1) Filter (isnotnull(city#73) AND (city#73 = boston))`) is applied again to the data that's been loaded into memory. This is because of the following reasons:

1. **Guaranteed Correctness:** It might seem **redundant**, but remember that not all data sources can handle pushed-down predicates, and not all predicates can be pushed down. Therefore, **even if a predicate is pushed down to the data source, Spark still includes the predicate in the physical plan** to cover cases where the data source might not have been able to fully apply the predicate. This is Spark's way of making sure the correct data is always returned, no matter the capabilities of the data source.

2. **No Assumptions**: Spark's Catalyst optimizer doesn't make assumptions about the data source's ability to handle pushed-down predicates. The optimizer aims to generate plans that return correct results across a wide range of scenarios. Even if the filter is pushed down, Spark does not have the feedback from data source whether the pushdown was successful or not, so it includes the filter operation in the physical plan as well.

It is more of a **fail-safe mechanism** to ensure data **integrity** and **correctness**.

---

### In what cases will predicate pushdown not work?

2 Examples where **filter pushdown** will not work:

1. **Complex Data Types**: Spark's Parquet data source does not push down filters that involve **complex types**, such as **arrays**, **maps**, and **structs**. This is because these complex data types can have complicated nested structures that the Parquet reader cannot easily filter on.

Here's an example:

```
root
 |-- Name: string (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+----------+-----------------------------+
|Name      |properties                   |
+----------+-----------------------------+
|Afaque    |[eye -> black, hair -> black]|
|Naved     |[eye ->, hair -> brown]      |
|Ali       |[eye -> black, hair -> red]  |
|Amaan     |[eye -> grey, hair -> grey]  |
|Omaira    |[eye -> , hair -> brown]     |
+----------+-----------------------------+
```

```python
df.filter(df.properties.getItem("eye") == "brown").show()
```

```
== Physical Plan ==
*(1) Filter (metadata#123[key] = value)
+- *(1) ColumnarToRow
   +- FileScan parquet [id#122,metadata#123] Batched: true, DataFilters: [(metadata#123[key] = value)], Format: Parquet, ...
```

------------------------------------------------

3. Unsupported Expressions: 

In Spark, `Parquet` data source does not support pushdown for filters involving a `.cast` operation. The reason for this behaviour is as follows:
- `.cast` changes the datatype of the column, and the Parquet data source may not be able to perform the filter operation correctly on the cast data.

**Note**: This behavior may vary based on the data source. For example, if you're working with a JDBC data source connected to a database that supports SQL-like operations, the `.cast` filter could potentially be pushed down to the database.

### Example of operation where filter pushdown doesn't work

In [0]:
df_customer_gt_50 = (
    df_customers
    .filter(F.col("age").cast("int") > 50)
)
df_customer_gt_50.show(5, False)
df_customer_gt_50.explain(True)