## Caching a DataFrame 

Pyspark cache() method is used to cache the intermediate results of the transformation so that other transformation runs on top of cached will perform faster. Caching the result of the transformation is one of the optimization tricks to improve the performance of the long-running PySpark applications/jobs.

cache() is a lazy evaluation in PySpark meaning it will not cache the results until you call the action operation

In [1]:
import findspark
findspark.init()
findspark.find()

'C:\\Spark\\sparkhome'

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("caching_dataset").getOrCreate()

##### Benefits of Caching in PySpark

Caching a DataFrame that is reused across multiple operations can greatly enhance the performance of PySpark jobs. Here are some key benefits of using the `cache()` method:

- **Cost-Efficient:** Spark computations can be costly, so reusing previously computed results helps in reducing overall costs.
- **Time-Efficient:** By avoiding repeated computations, caching saves a significant amount of time.
- **Reduced Execution Time:** Caching reduces the execution time of jobs, allowing for more efficient use of the cluster and the ability to run additional jobs.

To illustrate the importance of caching, consider a scenario where multiple PySpark transformations are applied in a sequence. Using caching for intermediate results can drastically improve the performance of subsequent transformations that depend on these results.

##### Why Use Caching in PySpark?

To understand the need for caching, let’s first observe the performance of transformations without caching. This will help in identifying the performance issues and demonstrate the benefits of caching.

In [3]:
spark_df = spark.read.csv("employees.csv",header=True,inferSchema=True)

In [4]:
spark_df.show()

+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK|  2600|            - |       124|           50|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|  SH_CLERK|  2600|            - |       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03|   AD_ASST|  4400|            - |       101|           10|
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN| 13000|            - |       100|           20|
|        202|       Pat|      Fay|    PFAY|603.123.6666|17-AUG-05|    MK_REP|  6000|            - |       201|           20|


In [7]:
spark_df.explain(True)

== Parsed Logical Plan ==
Relation [EMPLOYEE_ID#17,FIRST_NAME#18,LAST_NAME#19,EMAIL#20,PHONE_NUMBER#21,HIRE_DATE#22,JOB_ID#23,SALARY#24,COMMISSION_PCT#25,MANAGER_ID#26,DEPARTMENT_ID#27] csv

== Analyzed Logical Plan ==
EMPLOYEE_ID: int, FIRST_NAME: string, LAST_NAME: string, EMAIL: string, PHONE_NUMBER: string, HIRE_DATE: string, JOB_ID: string, SALARY: int, COMMISSION_PCT: string, MANAGER_ID: string, DEPARTMENT_ID: int
Relation [EMPLOYEE_ID#17,FIRST_NAME#18,LAST_NAME#19,EMAIL#20,PHONE_NUMBER#21,HIRE_DATE#22,JOB_ID#23,SALARY#24,COMMISSION_PCT#25,MANAGER_ID#26,DEPARTMENT_ID#27] csv

== Optimized Logical Plan ==
Relation [EMPLOYEE_ID#17,FIRST_NAME#18,LAST_NAME#19,EMAIL#20,PHONE_NUMBER#21,HIRE_DATE#22,JOB_ID#23,SALARY#24,COMMISSION_PCT#25,MANAGER_ID#26,DEPARTMENT_ID#27] csv

== Physical Plan ==
FileScan csv [EMPLOYEE_ID#17,FIRST_NAME#18,LAST_NAME#19,EMAIL#20,PHONE_NUMBER#21,HIRE_DATE#22,JOB_ID#23,SALARY#24,COMMISSION_PCT#25,MANAGER_ID#26,DEPARTMENT_ID#27] Batched: false, DataFilters: [], 

In [8]:
df1 = spark_df.filter(spark_df["HIRE_DATE"]>= "21-SEP-05")

In [9]:
df1.count()

10

In [13]:
df2 = df1.filter(df1["JOB_ID"]=="AC_ACCOUNT")
df2.count()

0

##### What is the issue in the above statement?

Let's assume you have billions of records in `employees.csv`. Since actions trigger transformations, in the given example, `df1.count()` is the first action. This action initiates the execution of reading the CSV file and applying `spark_df.filter()`.

We also have another action, `df2.count()`, which again triggers the execution of reading the file, `spark_df.where()`, and `df1.where()`.

Therefore, in this example, we are reading the file twice and applying `df.filter()` twice. When dealing with a large number of records, this becomes a performance issue. However, it can be easily avoided by caching the results of `spark.read()` and `df2.filter()`. In the following section, I will explain how to use `cache()` to prevent this double execution.

In [16]:
df2.explain(True)

== Parsed Logical Plan ==
Filter (JOB_ID#23 = AC_ACCOUNT)
+- Filter (HIRE_DATE#22 >= 21-SEP-05)
   +- Relation [EMPLOYEE_ID#17,FIRST_NAME#18,LAST_NAME#19,EMAIL#20,PHONE_NUMBER#21,HIRE_DATE#22,JOB_ID#23,SALARY#24,COMMISSION_PCT#25,MANAGER_ID#26,DEPARTMENT_ID#27] csv

== Analyzed Logical Plan ==
EMPLOYEE_ID: int, FIRST_NAME: string, LAST_NAME: string, EMAIL: string, PHONE_NUMBER: string, HIRE_DATE: string, JOB_ID: string, SALARY: int, COMMISSION_PCT: string, MANAGER_ID: string, DEPARTMENT_ID: int
Filter (JOB_ID#23 = AC_ACCOUNT)
+- Filter (HIRE_DATE#22 >= 21-SEP-05)
   +- Relation [EMPLOYEE_ID#17,FIRST_NAME#18,LAST_NAME#19,EMAIL#20,PHONE_NUMBER#21,HIRE_DATE#22,JOB_ID#23,SALARY#24,COMMISSION_PCT#25,MANAGER_ID#26,DEPARTMENT_ID#27] csv

== Optimized Logical Plan ==
Filter ((isnotnull(HIRE_DATE#22) AND isnotnull(JOB_ID#23)) AND ((HIRE_DATE#22 >= 21-SEP-05) AND (JOB_ID#23 = AC_ACCOUNT)))
+- Relation [EMPLOYEE_ID#17,FIRST_NAME#18,LAST_NAME#19,EMAIL#20,PHONE_NUMBER#21,HIRE_DATE#22,JOB_ID#23,SALA

### Below is the syntax of cache() on DataFrame.

###### Syntax

DataFrame.cache()

let’s add cache() statement to spark.read() and spark_df.filter() transformations. When df2.count() executes, this triggers spark.read.csv(..).cache() which reads the file and caches the result in memory. and df.filter(..).cache() also caches the result in memory.

When cache_df.count() executes, it just performs the df2.where() on top of cache results of df2, without re-executing previous transformations.

In [19]:
spark_df = spark.read.csv("employees.csv",header=True,inferSchema=True).cache()

In [20]:
cache_df = spark_df.filter(spark_df["HIRE_DATE"] >= "21-SEP-05").cache()

In [21]:
cache_df.count()

10

In [25]:
cache_df.explain(True)

== Parsed Logical Plan ==
Filter (HIRE_DATE#528 >= 21-SEP-05)
+- Relation [EMPLOYEE_ID#523,FIRST_NAME#524,LAST_NAME#525,EMAIL#526,PHONE_NUMBER#527,HIRE_DATE#528,JOB_ID#529,SALARY#530,COMMISSION_PCT#531,MANAGER_ID#532,DEPARTMENT_ID#533] csv

== Analyzed Logical Plan ==
EMPLOYEE_ID: int, FIRST_NAME: string, LAST_NAME: string, EMAIL: string, PHONE_NUMBER: string, HIRE_DATE: string, JOB_ID: string, SALARY: int, COMMISSION_PCT: string, MANAGER_ID: string, DEPARTMENT_ID: int
Filter (HIRE_DATE#528 >= 21-SEP-05)
+- Relation [EMPLOYEE_ID#523,FIRST_NAME#524,LAST_NAME#525,EMAIL#526,PHONE_NUMBER#527,HIRE_DATE#528,JOB_ID#529,SALARY#530,COMMISSION_PCT#531,MANAGER_ID#532,DEPARTMENT_ID#533] csv

== Optimized Logical Plan ==
InMemoryRelation [EMPLOYEE_ID#523, FIRST_NAME#524, LAST_NAME#525, EMAIL#526, PHONE_NUMBER#527, HIRE_DATE#528, JOB_ID#529, SALARY#530, COMMISSION_PCT#531, MANAGER_ID#532, DEPARTMENT_ID#533], StorageLevel(disk, memory, deserialized, 1 replicas)
   +- *(1) Filter (isnotnull(HIRE_DATE#

In [22]:
salary_department_id_df = cache_df.groupBy("DEPARTMENT_ID").agg({'SALARY':'SUM'})

In [23]:
salary_department_id_df.count()

5

In [26]:
salary_department_id_df.explain(True)

== Parsed Logical Plan ==
'Aggregate ['DEPARTMENT_ID], ['DEPARTMENT_ID, 'sum(SALARY#530) AS sum(SALARY)#903]
+- Filter (HIRE_DATE#528 >= 21-SEP-05)
   +- Relation [EMPLOYEE_ID#523,FIRST_NAME#524,LAST_NAME#525,EMAIL#526,PHONE_NUMBER#527,HIRE_DATE#528,JOB_ID#529,SALARY#530,COMMISSION_PCT#531,MANAGER_ID#532,DEPARTMENT_ID#533] csv

== Analyzed Logical Plan ==
DEPARTMENT_ID: int, sum(SALARY): bigint
Aggregate [DEPARTMENT_ID#533], [DEPARTMENT_ID#533, sum(SALARY#530) AS sum(SALARY)#903L]
+- Filter (HIRE_DATE#528 >= 21-SEP-05)
   +- Relation [EMPLOYEE_ID#523,FIRST_NAME#524,LAST_NAME#525,EMAIL#526,PHONE_NUMBER#527,HIRE_DATE#528,JOB_ID#529,SALARY#530,COMMISSION_PCT#531,MANAGER_ID#532,DEPARTMENT_ID#533] csv

== Optimized Logical Plan ==
Aggregate [DEPARTMENT_ID#533], [DEPARTMENT_ID#533, sum(SALARY#530) AS sum(SALARY)#903L]
+- Project [SALARY#530, DEPARTMENT_ID#533]
   +- InMemoryRelation [EMPLOYEE_ID#523, FIRST_NAME#524, LAST_NAME#525, EMAIL#526, PHONE_NUMBER#527, HIRE_DATE#528, JOB_ID#529, SALAR

### Summary of Each Stage

1. **Parsed Logical Plan**:
   - Represents the initial query parsing, showing a filter operation on the data read from a CSV file.

2. **Analyzed Logical Plan**:
   - Displays the schema of the data and confirms the filter operation.

3. **Optimized Logical Plan**:
   - Indicates that the data is cached in memory, optimizing the filter operation to avoid re-reading from disk.

4. **Physical Plan**:
   - Confirms that subsequent actions will utilize the cached data in memory, ensuring efficient execution without redundant reads.

In [27]:
# Perform subsequent transformations or analyses on cached dataset
# For example: calculating average transaction value
average_transaction_value_df = cache_df \
    .groupBy("job_id","department_id") \
    .agg({"salary": "avg"})

In [28]:
average_transaction_value_df.show()

+----------+-------------+-----------+
|    job_id|department_id|avg(salary)|
+----------+-------------+-----------+
|FI_ACCOUNT|          100|     7950.0|
|  ST_CLERK|           50|     2900.0|
|     AD_VP|           90|    17000.0|
|   IT_PROG|           60|     4800.0|
|  PU_CLERK|           30|     2850.0|
+----------+-------------+-----------+



In [29]:
# Unpersist the cached dataset after use
cache_df.unpersist()

DataFrame[EMPLOYEE_ID: int, FIRST_NAME: string, LAST_NAME: string, EMAIL: string, PHONE_NUMBER: string, HIRE_DATE: string, JOB_ID: string, SALARY: int, COMMISSION_PCT: string, MANAGER_ID: string, DEPARTMENT_ID: int]

In [30]:
# Stop the SparkSession
spark.stop()

PySpark cache() method is used to cache the intermediate results of the transformation into memory so that any future transformations on the results of cached transformation improve the performance. Caching is a lazy evaluation meaning it will not cache the results until you call the action operation and the result of the transformation is one of the optimization tricks to improve the performance of the long-running PySpark applications/jobs.