### Cache And Persist Techniques

In [2]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Understand Caching")
    .master("local[*]")
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/21 12:42:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df_movies = spark.read.format("csv").option("header",True).load("data/ImdbMovieDataset.csv")

In [4]:
df_movies.count()

                                                                                

1048575

In [5]:
df_movies.show()

+------+--------------------+------------+----------+--------+------------+----------+-------+-----+---------+---------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    id|               title|vote_average|vote_count|  status|release_date|   revenue|runtime|adult|   budget|  imdb_id|original_language|      original_title|            overview|          popularity|             tagline|              genres|production_companies|production_countries|    spoken_languages|            keywords|
+------+--------------------+------------+----------+--------+------------+----------+-------+-----+---------+---------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| 27205|        

In [6]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
df_movies_cast = df_movies.withColumn("vote_average", df_movies["vote_average"].cast(IntegerType()))
# df_movies_cast = df_movies.withColumn(col("vote_average")).cast("decimal(38,6)")

In [8]:
df_movies_cast.where(col("vote_average") > 8).show()

+------+--------------------+------------+----------+--------+------------+--------+-------+-----+-------+----------+-----------------+-----------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    id|               title|vote_average|vote_count|  status|release_date| revenue|runtime|adult| budget|   imdb_id|original_language|               original_title|            overview|          popularity|             tagline|              genres|production_companies|production_countries|    spoken_languages|            keywords|
+------+--------------------+------------+----------+--------+------------+--------+-------+-----+-------+----------+-----------------+-----------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------

In [9]:
# cache dataset
df_movies_cast.cache().count() # always prefer count as action to cache since it necessiates going throgh the whole dataset and caching properly

25/02/21 12:43:00 WARN MemoryStore: Not enough space to cache rdd_33_2 in memory! (computed 106.1 MiB so far)
25/02/21 12:43:00 WARN BlockManager: Persisting block rdd_33_2 to disk instead.
25/02/21 12:43:01 WARN MemoryStore: Not enough space to cache rdd_33_2 in memory! (computed 106.1 MiB so far)
25/02/21 12:43:02 WARN MemoryStore: Not enough space to cache rdd_33_2 in memory! (computed 106.1 MiB so far)
                                                                                

1048575

Go to storage tab, 

Check the cache

**Storage Level** : Disk Memory Deserialized 1x Replicated	

**Size in Memory** and **Size spilled to disk**

In [10]:
# Again running the filter
df_movies_cast.where(col("vote_average") > 8).show()

# This ran much faster because the data is cached. 2s vs 0.3 ms

+------+--------------------+------------+----------+--------+------------+--------+-------+-----+-------+----------+-----------------+-----------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    id|               title|vote_average|vote_count|  status|release_date| revenue|runtime|adult| budget|   imdb_id|original_language|               original_title|            overview|          popularity|             tagline|              genres|production_companies|production_countries|    spoken_languages|            keywords|
+------+--------------------+------------+----------+--------+------------+--------+-------+-----+-------+----------+-----------------+-----------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------

If you go to SQL/Dataframe and check the DAG, it doesnt read the data from the csv file rather it does from the in memory table scan

Default memory for cache is MEMORY_AND_DISK for Dataframe and for rdd its MEMORY

**Remove from Cache and then perform filter again**

In [11]:
df_movies_cast.unpersist()

DataFrame[id: string, title: string, vote_average: int, vote_count: string, status: string, release_date: string, revenue: string, runtime: string, adult: string, budget: string, imdb_id: string, original_language: string, original_title: string, overview: string, popularity: string, tagline: string, genres: string, production_companies: string, production_countries: string, spoken_languages: string, keywords: string]

In [12]:
# Create new dataset df_cache from df_movies_cast
df_cache = df_movies_cast.cache()

In [13]:
df_cache.count()

25/02/21 12:43:39 WARN MemoryStore: Not enough space to cache rdd_51_2 in memory! (computed 106.1 MiB so far)
25/02/21 12:43:39 WARN BlockManager: Persisting block rdd_51_2 to disk instead.
25/02/21 12:43:41 WARN MemoryStore: Not enough space to cache rdd_51_2 in memory! (computed 106.1 MiB so far)
25/02/21 12:43:41 WARN MemoryStore: Not enough space to cache rdd_51_2 in memory! (computed 106.1 MiB so far)
                                                                                

1048575

In [14]:
# Read from the original dataset df_movies_cast
df_movies_cast.where("vote_average > 8").show()

+------+--------------------+------------+----------+--------+------------+--------+-------+-----+-------+----------+-----------------+-----------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    id|               title|vote_average|vote_count|  status|release_date| revenue|runtime|adult| budget|   imdb_id|original_language|               original_title|            overview|          popularity|             tagline|              genres|production_companies|production_countries|    spoken_languages|            keywords|
+------+--------------------+------------+----------+--------+------------+--------+-------+-----+-------+----------+-----------------+-----------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------

We got the data instantly. So even if we read from original dataset it picks up data from the cache itself.

### Caching data after filter

In [15]:
df_cache.unpersist()

DataFrame[id: string, title: string, vote_average: int, vote_count: string, status: string, release_date: string, revenue: string, runtime: string, adult: string, budget: string, imdb_id: string, original_language: string, original_title: string, overview: string, popularity: string, tagline: string, genres: string, production_companies: string, production_countries: string, spoken_languages: string, keywords: string]

In [16]:
df_cache_filter = df_movies_cast.where(col("vote_average") > 8).cache()

In [17]:
df_cache_filter.count()

                                                                                

35373

Observe not all the data has been cached here, count is not total_count

Now let's again filter something else

In [18]:
df_cache_filter_new = df_movies_cast.where(col("vote_average") < 6).show()

+------+--------------------+------------+----------+--------+------------+----------+-------+-----+---------+---------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    id|               title|vote_average|vote_count|  status|release_date|   revenue|runtime|adult|   budget|  imdb_id|original_language|      original_title|            overview|          popularity|             tagline|              genres|production_companies|production_countries|    spoken_languages|            keywords|
+------+--------------------+------------+----------+--------+------------+----------+-------+-----+---------+---------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|297761|       S

Observe that now if you check the DAG, original csv is read to filter and not cached data. 

So this effects performance.

Hence be careful while caching data

### Persist

In [24]:
# Remove cache
spark.catalog.clearCache()

In [22]:
# MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK_2
import pyspark

df_persist = df_movies.persist(pyspark.StorageLevel.MEMORY_ONLY)

In [23]:
df_persist.write.format("noop").mode("overwrite").save()

                                                                                

Now check 'Storage' tab. You can see that entire storage is on the memory and nothing has spilled to disk + the data is serialized now unlike persist

Since the data is serialized it can be stored completely in memory.

**MEMORY_AND_DISK** and **MEMORY_ONLY_SER** cannot be used in pyspark

**MEMORY_ONLY_2** - Creates two replicas of the cache on each executor

In [25]:
# MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK_2
import pyspark

df_persist = df_movies.persist(pyspark.StorageLevel.MEMORY_ONLY_2)

In [26]:
df_persist.write.format("noop").mode("overwrite").save()

25/02/21 13:00:35 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/02/21 13:00:35 WARN BlockManager: Block rdd_92_0 replicated to only 0 peer(s) instead of 1 peers
25/02/21 13:00:35 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/02/21 13:00:35 WARN BlockManager: Block rdd_92_1 replicated to only 0 peer(s) instead of 1 peers
25/02/21 13:00:43 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/02/21 13:00:43 WARN BlockManager: Block rdd_92_2 replicated to only 0 peer(s) instead of 1 peers
                                                                                

Observe we can see 'Memory Serialized 2x Replicated' now

In [27]:
spark.stop()