In [0]:
df=spark.read.csv('/FileStore/tables/output_file.csv',header=True,inferSchema=True)

In [0]:
df.show()


+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
|          6|       0|     3|    Moran, Mr. James|  male|NULL|    0|    0|      

In [0]:
df.createOrReplaceTempView('Titanic')

In [0]:
spark.sql('select Survived,Name,Sex from Titanic where Survived=1').show()

+--------+--------------------+------+
|Survived|                Name|   Sex|
+--------+--------------------+------+
|       1|Cumings, Mrs. Joh...|female|
|       1|Heikkinen, Miss. ...|female|
|       1|Futrelle, Mrs. Ja...|female|
|       1|Johnson, Mrs. Osc...|female|
|       1|Nasser, Mrs. Nich...|female|
|       1|Sandstrom, Miss. ...|female|
|       1|Bonnell, Miss. El...|female|
|       1|Hewlett, Mrs. (Ma...|female|
|       1|Williams, Mr. Cha...|  male|
|       1|Masselmani, Mrs. ...|female|
|       1|Beesley, Mr. Lawr...|  male|
|       1|"McGowan, Miss. A...|female|
|       1|Sloper, Mr. Willi...|  male|
|       1|Asplund, Mrs. Car...|female|
|       1|"O'Dwyer, Miss. E...|female|
|       1|Spencer, Mrs. Wil...|female|
|       1|Glynn, Miss. Mary...|female|
|       1|    Mamee, Mr. Hanna|  male|
|       1|Nicola-Yarred, Mi...|female|
|       1|Laroche, Miss. Si...|female|
+--------+--------------------+------+
only showing top 20 rows



Caching Data In Memory

In [0]:

# Cache the table
df.cache()


DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

Coalesce

In [0]:
# to get initial number of partitions
print("Initial number of partitions:", df.rdd.getNumPartitions())

Initial number of partitions: 1


In [0]:
# Use of coalesce to reduce the number of partitions
df_coalesced = df.coalesce(2)  # 2 is thedesired number of partitions

In [0]:
# Show the final number of partitions
print("Final number of partitions:", df_coalesced.rdd.getNumPartitions())

#we are already at minimun number

Final number of partitions: 1


Breadcast joins

In [0]:
data=[(1,'not survived'),(2,'survived'),(3,'survived'),(4,'survived'),(5,'not survived'),(6,'not survived'),(7,'not survived'),(8,'not survived'),(9,'survived'),(10,'survived')]
column=['PassengerId','survival']
df2=spark.createDataFrame(data,column)
df2.show()

+-----------+------------+
|PassengerId|    survival|
+-----------+------------+
|          1|not survived|
|          2|    survived|
|          3|    survived|
|          4|    survived|
|          5|not survived|
|          6|not survived|
|          7|not survived|
|          8|not survived|
|          9|    survived|
|         10|    survived|
+-----------+------------+



In [0]:
df2.createOrReplaceTempView('Titanic_survivals')

In [0]:
# Show the execution plan before optimization
spark.sql("SELECT * FROM Titanic t JOIN Titanic_survivals ts ON ts.PassengerId=t.PassengerId").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [cast(PassengerId#369 as bigint)], [PassengerId#3910L], Inner, BuildLeft, false
   :- Exchange SinglePartition, EXECUTOR_BROADCAST, [plan_id=3108]
   :  +- ColumnarToRow
   :     +- PhotonResultStage
   :        +- PhotonFilter isnotnull(PassengerId#369)
   :           +- PhotonRowToColumnar
   :              +- Scan In-memory table Titanic [PassengerId#369, Survived#370, Pclass#371, Name#372, Sex#373, Age#374, SibSp#375, Parch#376, Ticket#377, Fare#378, Cabin#379, Embarked#380], [isnotnull(PassengerId#369)]
   :                    +- InMemoryRelation [PassengerId#369, Survived#370, Pclass#371, Name#372, Sex#373, Age#374, SibSp#375, Parch#376, Ticket#377, Fare#378, Cabin#379, Embarked#380], StorageLevel(disk, memory, deserialized, 1 replicas)
   :                          +- *(1) ColumnarToRow
   :                             +- PhotonResultStage
   :                                +- PhotonRowToColumnar
   : 

In [0]:
spark.sql("SELECT /*+ BROADCAST(Titanic_survivals) */ * FROM Titanic t JOIN Titanic_survivals ts ON ts.PassengerId=t.PassengerId").createOrReplaceTempView("broadcast_table")

In [0]:
# Show the execution plan after optimization
spark.sql("SELECT * FROM broadcast_table").show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----------+------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|PassengerId|    survival|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----------+------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|          1|not survived|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|          2|    survived|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|          3|    survived|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|          4|    survived|
|          5|       

In [0]:
# Use a hint to influence the Catalyst optimizer
optimized_df = spark.sql("SELECT /*+ BROADCASTJOIN(Titanic) */ * FROM Titanic WHERE Age > 20")

# Show the execution plan
optimized_df.explain()

== Physical Plan ==
*(1) ColumnarToRow
+- PhotonResultStage
   +- PhotonFilter (isnotnull(Age#374) AND (Age#374 > 20.0))
      +- PhotonRowToColumnar
         +- Scan In-memory table Titanic [PassengerId#369, Survived#370, Pclass#371, Name#372, Sex#373, Age#374, SibSp#375, Parch#376, Ticket#377, Fare#378, Cabin#379, Embarked#380], [isnotnull(Age#374), (Age#374 > 20.0)]
               +- InMemoryRelation [PassengerId#369, Survived#370, Pclass#371, Name#372, Sex#373, Age#374, SibSp#375, Parch#376, Ticket#377, Fare#378, Cabin#379, Embarked#380], StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- *(1) ColumnarToRow
                        +- PhotonResultStage
                           +- PhotonRowToColumnar
                              +- FileScan csv [PassengerId#369,Survived#370,Pclass#371,Name#372,Sex#373,Age#374,SibSp#375,Parch#376,Ticket#377,Fare#378,Cabin#379,Embarked#380] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[d

In [0]:
# Set the autoBroadcastJoinThreshold property
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10m")


Applying intermidiate Cache layer

In [0]:
# Writing DataFrame to Delta Lake table
df.write.format("delta").mode("overwrite").save("/delta//FileStore/tables")

In [0]:
# Reading Delta Lake table into a DataFrame
cached_df = spark.read.format("delta").table("Titanic")

In [0]:
# Performing Spark SQL queries on the cached DataFrame
result = spark.sql("SELECT * FROM Titanic WHERE Age > 25")

In [0]:
# Write the result back to the Delta Lake table to update the cache
result.write.format("delta").mode("overwrite").save("/delta/FileStore/tables")