![title](img/this-is-fine-spark.jpeg)

## 🔥 Spark fires 🔥 - caching stopping all the pushdowns

In this scenario, we will demonstrate how caching of dataframes can stop pushdown of partition-pruning, filters, etc, which can have pretty catastrophic impacts on the amount of data you read.

### Bootstrapping

In [None]:
import os

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = (
    SparkSession
    .builder.master("spark://spark:7077")
    # .config("spark.eventLog.enabled", "true")
    # .config("spark.eventLog.dir", "/data/tmp/spark-events")
    .appName("spark-fires-premature-caching")
    .getOrCreate()
)

spark.version

### Let's prep our data

We are going to borrow some test data from the excellent _Spark, The Definitive Guide_ Git repo.

In [None]:
# # comment this section in and execute to fetch the data
# !mkdir -p /data/bike-data
# !wget https://raw.githubusercontent.com/udacity/data-analyst/master/projects/bike_sharing/201508_station_data.csv -P /data/bike-data
# !wget https://raw.githubusercontent.com/udacity/data-analyst/master/projects/bike_sharing/201508_trip_data.csv -P /data/bike-data

In [None]:
!ls /data

In [None]:
input_data_path = '/data/bike-data-partitioned'
output_data_path = '/data/bike-data-partitioned-out'

Next we will create some fake partitioning for demonstration purposes.

In [None]:
if not os.path.exists(input_data_path):
    df = spark.read.option("header", True).csv("/data/bike-data/201508_trip_data.csv")
    df = df.withColumn('start_terminal', F.col('Start Terminal'))
    df.write.format('parquet').partitionBy('start_terminal').save(input_data_path)

### Now let's do some data processing

For this scenario we are only interested in the data from a single partition, _start_terminal_, which we select in our filter/where clause.

In [None]:
%%time

df = spark.read.parquet(input_data_path)
# df = df.cache()  # -- change 1 - remove this caching and see what happens

out_df = df \
    .filter(F.col('start_terminal') == 83) \
    .select([
        'Trip ID',
        'Start Station',
        'End Station',
        'End Terminal',
        'start_terminal'
    ])
out_df.cache()  # -- change 1 - add this line in
out_df.write.mode('overwrite').parquet(output_data_path)

### Putting the fire out  🔥🔥🔥🧯🧯🧯

So when dig into the Spark UI SQL tab, at http://localhost:4040/SQL/execution/?id=2 (your ids may differ but you want the one with a description which starts with *parquet at ...*), we see the following
 * number of files read: 420
 * scan time total (min, med, max )
 * 57.8 s (0 ms, 506 ms, 7.6 s )
 * dynamic partition pruning time: 0 ms
 * metadata time: 18 ms
 * size of files read: 10.3 MiB
 * number of output rows: 354,152
 * number of partitions read: 70

Oh, how disappointing, we appear to be reading all files within all partitions despite our filter on *start_terminal*, oof! 😞

But it's okay, we can move our caching at `-- change 1` changes, commenting one line out and the other in. Restart the kernel, recreate our Spark session and run again. Boom, now we are only reading 6 files, down from 400. In terms of bytes, we are reading << 1% of the data we were originally reading. 
 
* number of files read: 6
* scan time total (min, med, max )
* 8.8 s (100 ms, 1.8 s, 2.0 s )
* dynamic partition pruning time: 0 ms
* metadata time: 29 ms
* size of files read: 28.0 KiB
* number of output rows: 212
* number of partitions read: 1
 
In our noddy example, with all data accessed locally, **we cut the runtime by about a third**. That's not too shabby. But for real-world examples the impact can be much larger due to increased datasets and the impact of accessing data over the network from remote file-systems or object stores.

Note, we get a dual impact from this anti-pattern:
1. We read unneeded data, and so hit all the associated I/O costs.
2. We hit all the I/O, serializations costs, etc, associated with caching data across our executors.

It is worth noting the caching will impact not only partition-pruning but also column-pruning and predicate pushdown. So where possible we want to avoid caching dataframes. Where we need to cache dataframes we want to do it after our filters have been applied to avoid reading and processing unneccessary data.

In [None]:
# spark.stop()