# Catalyst Optimizer

**Technical Accomplishments:**
* Understanding about what is the Catalyst Optimizer?
* Understanding the different stages of the Catalyst Optimizer
* Example of Physical Plan Optimization (x2)
* Example of Predicate Pushdown

In [2]:
# Because we will need it later...
from pyspark.sql.functions import *
from pyspark.sql.types import *

## Catalyst Optimizer

* Catalyst Optimize is the fundamental to the `SQL` and `DataFrames` API.
* It is an **extensible query optimizer**.
* It actually contains a **general library for representing trees and applying rules** to manipulate them.
* Several public extension points, including external data sources and user-defined types.

<a href="https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html" target="_blank">Deep Dive into Spark SQL’s Catalyst Optimizer</a> (April 13, 2015)

Processing is broken down into several stages as we can see here:

![Catalyst](https://files.training.databricks.com/images/105/catalyst-diagram.png)

## Optimized Logical Plan

**Rewriting our code** is one of the many optimizations performed by the Catalyst Optimizer.
  
For this, we will see **two basic examples** involving the rewriting of the filters.

First one is an **innocent mistake** almost most every new Spark developer makes.

The second "mistake" is **really bad**, but Spark can fix it.

### Example #1: Innocent Mistake(The First One)

Don't start any project with **en.zero**.

There are **always better ways of doing this**, as in it can be done with a single condition.

But, here we would make **8 passes** on the data **with 8 different filters**.

After that every individual pass, we will **go back over the remaining dataset** to filter out the next set of records.

In [3]:
allDF = spark.read.parquet("data/staging_parquet_en_only_clean/")

pass1 = allDF.filter( col("project") != "en.zero")
pass2 = pass1.filter( col("project") != "en.zero.n")
pass3 = pass2.filter( col("project") != "en.zero.s")
pass4 = pass3.filter( col("project") != "en.zero.d")
pass5 = pass4.filter( col("project") != "en.zero.voy")
pass6 = pass5.filter( col("project") != "en.zero.b")
pass7 = pass6.filter( col("project") != "en.zero.v")
pass8 = pass7.filter( col("project") != "en.zero.q")

print("Pass 1: {0:,}".format( pass1.count() ))
print("Pass 2: {0:,}".format( pass2.count() ))
print("Pass 3: {0:,}".format( pass3.count() ))
print("Pass 4: {0:,}".format( pass4.count() ))
print("Pass 5: {0:,}".format( pass5.count() ))
print("Pass 6: {0:,}".format( pass6.count() ))
print("Pass 7: {0:,}".format( pass7.count() ))
print("Pass 8: {0:,}".format( pass8.count() ))

Pass 1: 2,288,138
Pass 2: 2,288,123
Pass 3: 2,288,071
Pass 4: 2,287,665
Pass 5: 2,287,641
Pass 6: 2,287,314
Pass 7: 2,287,271
Pass 8: 2,287,196


**Logically**, the code above is the same as the code below.

The only real difference is that we are **not asking for a count** after every filter.

In [4]:
innocentDF = (spark.read.parquet("data/staging_parquet_en_only_clean/")
  .filter( col("project") != "en.zero")
  .filter( col("project") != "en.zero.n")
  .filter( col("project") != "en.zero.s")
  .filter( col("project") != "en.zero.d")
  .filter( col("project") != "en.zero.voy")
  .filter( col("project") != "en.zero.b")
  .filter( col("project") != "en.zero.v")
  .filter( col("project") != "en.zero.q")
)
print("Final Count: {0:,}".format( innocentDF.count() ))

Final Count: 2,287,196


R=There is no need to execute the code to see what is **logically** or **physically** taking place under the hood.

We can directly use the `explain(..)` command.

In [5]:
innocentDF.explain(True)

== Parsed Logical Plan ==
'Filter NOT ('project = en.zero.q)
+- Filter NOT (project#96 = en.zero.v)
   +- Filter NOT (project#96 = en.zero.b)
      +- Filter NOT (project#96 = en.zero.voy)
         +- Filter NOT (project#96 = en.zero.d)
            +- Filter NOT (project#96 = en.zero.s)
               +- Filter NOT (project#96 = en.zero.n)
                  +- Filter NOT (project#96 = en.zero)
                     +- RelationV2[project#96, article#97, requests#98, bytes_served#99L] parquet hdfs://training.io:8020/user/training/data/staging_parquet_en_only_clean

== Analyzed Logical Plan ==
project: string, article: string, requests: int, bytes_served: bigint
Filter NOT (project#96 = en.zero.q)
+- Filter NOT (project#96 = en.zero.v)
   +- Filter NOT (project#96 = en.zero.b)
      +- Filter NOT (project#96 = en.zero.voy)
         +- Filter NOT (project#96 = en.zero.d)
            +- Filter NOT (project#96 = en.zero.s)
               +- Filter NOT (project#96 = en.zero.n)
                

Of course, if we were to write this the correct way, the first time, ignoring the fact that there are better methods, it would look something like:

In [6]:
betterDF = (spark.read.parquet("data/staging_parquet_en_only_clean/")
  .filter( (col("project").isNotNull()) &
           (col("project") != "en.zero") & 
           (col("project") != "en.zero.n") & 
           (col("project") != "en.zero.s") & 
           (col("project") != "en.zero.d") & 
           (col("project") != "en.zero.voy") & 
           (col("project") != "en.zero.b") & 
           (col("project") != "en.zero.v") & 
           (col("project") != "en.zero.q")
        )
)

print("Final: {0:,}".format( betterDF.count() ))

betterDF.explain(True)

Final: 2,287,196
== Parsed Logical Plan ==
'Filter ((((((((isnotnull('project) AND NOT ('project = en.zero)) AND NOT ('project = en.zero.n)) AND NOT ('project = en.zero.s)) AND NOT ('project = en.zero.d)) AND NOT ('project = en.zero.voy)) AND NOT ('project = en.zero.b)) AND NOT ('project = en.zero.v)) AND NOT ('project = en.zero.q))
+- RelationV2[project#118, article#119, requests#120, bytes_served#121L] parquet hdfs://training.io:8020/user/training/data/staging_parquet_en_only_clean

== Analyzed Logical Plan ==
project: string, article: string, requests: int, bytes_served: bigint
Filter ((((((((isnotnull(project#118) AND NOT (project#118 = en.zero)) AND NOT (project#118 = en.zero.n)) AND NOT (project#118 = en.zero.s)) AND NOT (project#118 = en.zero.d)) AND NOT (project#118 = en.zero.voy)) AND NOT (project#118 = en.zero.b)) AND NOT (project#118 = en.zero.v)) AND NOT (project#118 = en.zero.q))
+- RelationV2[project#118, article#119, requests#120, bytes_served#121L] parquet hdfs://traini

### Example #2: Bad Programmer(The Second One)

This time we intendedly are going to do something **REALLY** bad.

Even if the compiler combines these filters into a single filter, **we still have five different tests** for any column that doesn't have the value "whatever".

In [7]:
stupidDF = (spark.read.parquet("data/staging_parquet_en_only_clean/")
  .filter( col("project") != "whatever")
  .filter( col("project") != "whatever")
  .filter( col("project") != "whatever")
  .filter( col("project") != "whatever")
  .filter( col("project") != "whatever")
)

stupidDF.explain(True)

== Parsed Logical Plan ==
'Filter NOT ('project = whatever)
+- Filter NOT (project#140 = whatever)
   +- Filter NOT (project#140 = whatever)
      +- Filter NOT (project#140 = whatever)
         +- Filter NOT (project#140 = whatever)
            +- RelationV2[project#140, article#141, requests#142, bytes_served#143L] parquet hdfs://training.io:8020/user/training/data/staging_parquet_en_only_clean

== Analyzed Logical Plan ==
project: string, article: string, requests: int, bytes_served: bigint
Filter NOT (project#140 = whatever)
+- Filter NOT (project#140 = whatever)
   +- Filter NOT (project#140 = whatever)
      +- Filter NOT (project#140 = whatever)
         +- Filter NOT (project#140 = whatever)
            +- RelationV2[project#140, article#141, requests#142, bytes_served#143L] parquet hdfs://training.io:8020/user/training/data/staging_parquet_en_only_clean

== Optimized Logical Plan ==
Filter (isnotnull(project#140) AND NOT (project#140 = whatever))
+- RelationV2[project#140, art

***Note:*** `explain()` is not the only way to get access to this level of detail.<br/>
But, we can also see it in the **Spark UI**.

## Columnar Predicate Pushdown

It takes place when a filter can be pushed down to the original data source, such as the database server.

For this example, we are going to compare `DataFrames` from two different sources:
* JDBC - where a predicate pushdown **WILL** take place.
* CSV - where a predicate pushdown will **NOT** take place.

In each case, we can see evidence of the pushdown (or lack of it) in the **Physical Plan**.

### Example #3: JDBC

We will start by initializing the JDBC driver.

Next, we can create a `DataFrame` via JDBC and then filter by **gender**.

In [8]:
jdbcURL = "jdbc:mysql://localhost/retail_db"

# Username and Password w/read-only rights
connProperties = {
  "user" : "root",
  "password" : "password",
  "driver": "com.mysql.jdbc.Driver"
}

ppExampleThreeDF = (spark.read.jdbc(
    url=jdbcURL,                  # the JDBC URL
    table="orders",   # the name of the table
    column="order_id",                  # the name of a column of an integral type that will be used for partitioning
    lowerBound=1,                 # the minimum value of columnName used to decide partition stride
    upperBound=1000000,           # the maximum value of columnName used to decide partition stride
    numPartitions=8,              # the number of partitions/connections
    properties=connProperties     # the connection properties
  )
  .filter(col("order_status") == "COMPLETE")   # Filter the data by gender
)

With the `DataFrame` that is created, we can ask Spark to `explain()` the **Physical Plan**.

What we are looking for in this Physical Plan:
* is the lack of a **Filter** and
* the presence of a **PushedFilters** in the **Scan**

In [9]:
ppExampleThreeDF.explain()

== Physical Plan ==
*(1) Scan JDBCRelation(orders) [numPartitions=8] [order_id#152,order_date#153,order_customer_id#154,order_status#155] PushedFilters: [*IsNotNull(order_status), *EqualTo(order_status,COMPLETE)], ReadSchema: struct<order_id:int,order_date:timestamp,order_customer_id:int,order_status:string>




This will make a little more sense if we **compare it to some examples** that don't push down the filter.

### Example #4: Cached JDBC

In this example, we are going to cache our data before filtering and thus eliminating the possibility for the predicate push down:

In [10]:
ppExampleFourCachedDF = (spark.read.jdbc(
    url=jdbcURL,                  # the JDBC URL
    table="orders",   # the name of the table
    column="order_id",                  # the name of a column of an integral type that will be used for partitioning
    lowerBound=1,                 # the minimum value of columnName used to decide partition stride
    upperBound=1000000,           # the maximum value of columnName used to decide partition stride
    numPartitions=8,              # the number of partitions/connections
    properties=connProperties     # the connection properties
  ))

(ppExampleFourCachedDF
  .cache()                        # cache the data
  .count())                       # materialize the cache

ppExampleFourFilteredDF = (ppExampleFourCachedDF
  .filter(col("order_status") == "COMPLETE"))  # Filter the data by gender

Now that we have cached the data and Then filtered it. We have to eliminate the possibility to bennifet from the predicate push down.

And so that it's easier to compare the two examples, we can re-print the physical plan for the previous example too.

In [11]:
print("****Example Three****\n")
ppExampleThreeDF.explain()

print("\n****Example Four****\n")
ppExampleFourFilteredDF.explain()

****Example Three****

== Physical Plan ==
*(1) Scan JDBCRelation(orders) [numPartitions=8] [order_id#152,order_date#153,order_customer_id#154,order_status#155] PushedFilters: [*IsNotNull(order_status), *EqualTo(order_status,COMPLETE)], ReadSchema: struct<order_id:int,order_date:timestamp,order_customer_id:int,order_status:string>



****Example Four****

== Physical Plan ==
*(1) Filter (isnotnull(order_status#163) AND (order_status#163 = COMPLETE))
+- InMemoryTableScan [order_id#160, order_date#161, order_customer_id#162, order_status#163], [isnotnull(order_status#163), (order_status#163 = COMPLETE)]
      +- InMemoryRelation [order_id#160, order_date#161, order_customer_id#162, order_status#163], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(1) Scan JDBCRelation(orders) [numPartitions=8] [order_id#160,order_date#161,order_customer_id#162,order_status#163] PushedFilters: [], ReadSchema: struct<order_id:int,order_date:timestamp,order_customer_id:int,order_status

It should be clearer now...

In the first example we see only the **Scan** which is the JDBC read.

In the second example, you can see the **Scan** but you also see the **InMemoryTableScan** followed by a **Filter** which means Spark had to filter ALL the data from RAM instead of in the Database.

### Example #5: CSV File

This example is identical to the previous one, but only changes are:
* this is a CSV file instead of JDBC source
* we are filtering on **site**

In [12]:
schema = StructType(
  [
    StructField("timestamp", StringType(), False),
    StructField("site", StringType(), False),
    StructField("requests", IntegerType(), False)
  ]
)

ppExampleThreeDF = (spark.read
   .option("header", "true")
   .option("sep", "\t")
   .schema(schema)
   .csv("data/pageviews_by_second.tsv")
   .filter(col("site") == "desktop")
)

Once the `DataFrame` created, we can ask Spark to `explain(..)` the **Physical Plan**.

What we are looking in Physical Plan:
* is the presence of a **Filter** and
* is the presence of a **PushedFilters** in the **FileScan csv**

And again, we see **PushedFilters** because Spark is *trying* to push down to the CSV file.

But that doesn't work here and so we see that just like in the last example, we have a **Filter** after the **FileScan**, actually an **InMemoryFileIndex**.

In [13]:
ppExampleThreeDF.explain()

== Physical Plan ==
*(1) Project [timestamp#317, site#318, requests#319]
+- *(1) Filter (isnotnull(site#318) AND (site#318 = desktop))
   +- BatchScan[timestamp#317, site#318, requests#319] CSVScan Location: InMemoryFileIndex[hdfs://training.io:8020/user/training/data/pageviews_by_second.tsv], ReadSchema: struct<timestamp:string,site:string,requests:int>




## End of Exercise