In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("SparkSession") \
    .getOrCreate()

# Caching

**Technical Accomplishments:**
* Understaning for How caching works?
* Explore the various caching mechanisims

In [None]:
from pyspark.sql.functions import col

df = (
    spark.read.option("header", "true").csv("spark-data/ratings.csv")
    .withColumn("rating", col("rating").cast("float"))
    .filter(col("rating") > 3)
    .groupBy("rating")
    .count()
    .cache()
)

df.count()
df.count()
df.write.mode("overwrite").csv("spark-data/output")

## A Fresh Start
For this section, first of all there is need to clear the existing cache.

There are several ways to accomplish this:
  * Remove each cache one-by-one which is fairly problematic
  * Restart the cluster - takes a fair while to come back online
  * Just blow the entire cache away - this will affect each and every user on the cluster!!

In [5]:
#!!! DO NOT RUN THIS ON A SHARED CLUSTER !!!

spark.catalog.clearCache()

#!!! It will Delete the cache of your system and Your's Co-Worker's !!!

In [6]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

ratingsDF = (spark.read
  .option("header","true")
  .csv("spark-data/ratings.csv")
)

The 646Mb data is currently in HDFS, which means each time you scan through it, your Spark cluster has to read the 646 MB of data remotely over the network.

In [None]:
(ratingsDF
 .count())

The ratings DataFrame contains 25 million rows.

Do Make a note of how long the previous operation takes.

Re-run it several times so as trying to establish an average.

Now Let's try a slightly more complicated operation, such as sorting, which induces an "expensive" shuffle.

In [None]:
(ratingsDF
 .orderBy("movieId")
 .count())

Each and Every time we re-run these operations, it goes all the way back to the original data store.

This requires pulling all the data across the network for every execution.

In most of the cases, this network IO is the most expensive part of a job.

## cache()

We can avoid all of this overhead by caching the data on the executors.

Just go ahead and run the following command.

Don't forget to make a note of how long it takes to execute.

In [None]:
ratingsDF.cache().count()

The last `count()` will take a little longer than normal.

It has to perform the cache and do the work of materializing the cache.

Now the `pageviewsDF` is cached **AND** the cache has been materialized.

Before we rerun our queries, check the **Spark UI** and the **Storage** tab.

Now, run the two queries and compare their execution time to the ones above.

In [None]:
ratingsDF.count()

Was it Faster?

All of our data is being stored in cache on the executors.

We are no longer making network calls. Our plain `count()` should be sub-second.

# Caching Parquet Files

In [11]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

pageViewsDF = (spark.read
  .parquet("spark-data/pageviews_by_second_par")
)

Size on disk of above parquet files is 100 MB. Let's apply the cache and see how much storage is taken by the cache.

In [None]:
pageViewsDF.cache().count()

##### Now go Storage Tab of Spark UI and check the size in memory

##### Did you noticed size in memory is 4 times the size of file on disk?

It is because cache() method save the data in memory in deserialized format.

## persist()

`cache()` is just an alias for the `persist()`

Let's take a look at the API docs for:

* `Dataset.persist()` - Scala
* `DataFrame.persist()` - Python

`persist()` allows one to specify an additional parameter i.e. storage level, indicating how the data is cached:

* DISK_ONLY
* DISK_ONLY_2
* MEMORY_AND_DISK
* MEMORY_AND_DISK_2
* MEMORY_AND_DISK_SER
* MEMORY_AND_DISK_SER_2
* MEMORY_ONLY
* MEMORY_ONLY_2
* MEMORY_ONLY_SER
* MEMORY_ONLY_SER_2
* OFF_HEAP

** *Note:* ** *The default storage level for:*
* *RDDs are **MEMORY_ONLY**.*
* *DataFrames are **MEMORY_AND_DISK**.* 
* *Streaming is **MEMORY_AND_DISK_2**.*

In [None]:
from pyspark import StorageLevel
pageviewsParDF = (spark.read
  .parquet("spark-data/pageviews_by_second_par")
)
pageviewsParDF.persist(StorageLevel.MEMORY_AND_DISK).count()

In [None]:
pageviewsParDF.printSchema()

Go to Spark UI Storage Tab again

##### Did you noticed size in memory is 2 times the size of file on disk?
Since Storage Level  is Memory Disk Serialized

It's bigger in memory than on disk! Why? Due to Java string object storage.

<img src="https://files.training.databricks.com/images/tuning/java-string.png" alt="Java String Memory allocation"/><br/>


- A regular 4 byte string would end up taking 48 bytes. 
- The diagram shows how the 40 bytes are allocated and we also need to round up byte usage to be divisible of 8 due to JVM padding. 
- This is a very bloated representation knowing that of these 48 bytes, we're actually after only 4. 

Let's try with `inferSchema` instead.

In [19]:
newDF = pageviewsParDF.withColumn(
    "timestamp_long",
    to_timestamp("timestamp", "yyyy-MM-dd'T'HH:mm:ss").cast("long")
)

In [None]:
pageviewsParDF.unpersist()

In [None]:
newDF.unpersist()

In [None]:
newDF.persist(StorageLevel.DISK_ONLY).count()

#### Did you noticed that just by changing the type from string to long, there is a difference of almost 50 MB?

## RDD Name

If you haven't noticed yet, the **RDD Name** on the **Storage** tab in the **Spark UI** is a big ugly name.

It's a bit hacky, but there is a workaround for assigning a name.
0. Create your `DataFrame`.
0. From that `DataFrame`, create a temporary view with any name.
0. Specifically, cache the table via the `SparkSession` and its `Catalog`.
0. Materialize the cache.

In [None]:

from pyspark import StorageLevel
pageviewsParDF = (spark.read
  .parquet("spark-data/pageviews_by_second_par")
)
pageviewsParDF.createOrReplaceTempView("Pageviews_DF_Python")
spark.catalog.cacheTable("Pageviews_DF_Python")

pageviewsParDF.count()

In [None]:
pageviewsParDF.unpersist()

Using OffHeap Memory

In [None]:
from pyspark import StorageLevel
pageviewsParDF = (spark.read
  .parquet("spark-data/pageviews_by_second_par")
)
pageviewsParDF.persist(StorageLevel.OFF_HEAP).count()

In [None]:
pageviewsParDF.unpersist()