In [1]:
#Create a Spark Session
from pyspark.sql import SparkSession
import findspark

findspark.init()

#Note that we have set parallelism to 8
spark = SparkSession\
            .builder\
            .appName("OptimizeProcessingJob")\
            .config("spark.sql.shuffle.partitions", 8)\
            .config("spark.default.parallelism", 8)\
            .config("spark.sql.warehouse.dir", "spark-warehouse") \
            .enableHiveSupport() \
            .master("local[2]")\
            .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/13 09:24:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### 05.01 Pushing down Projections
When downstream queries/processing only looks for a subset of columns, Spark optimizer is smart enough to identify them and only read those columns into the in-memory data frame. This saves on I/O and memory. This is called Projection Push down. While building data pipelines, it helps to be aware of how Spark works and take advantage of this for optimization.

In [2]:
sales_data = spark\
                .read\
                .parquet("dummy_hdfs/partitioned_parquet")

#show the execution plan
print("\n--------------------------EXPLAIN--------------------------")
sales_data.select("Product","Quantity").explain()
print("-------------------------END EXPLAIN-----------------------\n")


--------------------------EXPLAIN--------------------------
== Physical Plan ==
*(1) Project [Product#6, Quantity#3]
+- *(1) ColumnarToRow
   +- FileScan parquet [Quantity#3,Product#6] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/linkedin/ExerciseFiles/dummy_hdfs/partitioned_parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Quantity:int>


-------------------------END EXPLAIN-----------------------



### 05.02 Pushing down Filters
When downstream queries/processing only looks for a subset of subset, Spark optimizer is smart enough to identify them and only read those columns into the in-memory data frame. This saves on I/O and memory. This is called Filter Push down. This works for both partition columns and non-partition columns. While building data pipelines, it helps to be aware of how Spark works and take advantage of this for optimization.

In [3]:
from pyspark.sql.functions import col

#Use a partition attribute for filtering
mouse_df = sales_data.where(col("Product") == 'Mouse')
mouse_df.show(5)

#show the execution plan
print("\n--------------------------EXPLAIN--------------------------")
mouse_df.explain()
print("-------------------------END EXPLAIN-----------------------\n")


+---+--------+----------+--------+-----+--------------------+-------+
| ID|Customer|      Date|Quantity| Rate|                Tags|Product|
+---+--------+----------+--------+-----+--------------------+-------+
|  6|  Google|2019/11/23|       5|40.58|                NULL|  Mouse|
|  8|  Google|2019/11/13|       1|46.79|Urgent:Discount:P...|  Mouse|
| 14|   Apple|2019/11/09|       4|40.27|            Discount|  Mouse|
| 15|   Apple|2019/11/25|       5|38.89|                NULL|  Mouse|
| 20|LinkedIn|2019/11/25|       4|36.77|       Urgent:Pickup|  Mouse|
+---+--------+----------+--------+-----+--------------------+-------+
only showing top 5 rows


--------------------------EXPLAIN--------------------------
== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [ID#0,Customer#1,Date#2,Quantity#3,Rate#4,Tags#5,Product#6] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/linkedin/ExerciseFiles/dummy_hdfs/partitioned_parquet], PartitionFi

In [4]:
google_df = sales_data.where(col("Customer") == 'Google')
google_df.show(5)

#show the execution plan
print("\n--------------------------EXPLAIN--------------------------")
google_df.explain()
print("-------------------------END EXPLAIN-----------------------\n")

+---+--------+----------+--------+-----+--------------------+-------+
| ID|Customer|      Date|Quantity| Rate|                Tags|Product|
+---+--------+----------+--------+-----+--------------------+-------+
|  6|  Google|2019/11/23|       5|40.58|                NULL|  Mouse|
|  8|  Google|2019/11/13|       1|46.79|Urgent:Discount:P...|  Mouse|
| 35|  Google|2019/11/17|       2|49.33|              Pickup|  Mouse|
| 51|  Google|2019/11/27|       4| 32.8|              Urgent|  Mouse|
| 57|  Google|2019/11/21|       5| 32.0|              Pickup|  Mouse|
+---+--------+----------+--------+-----+--------------------+-------+
only showing top 5 rows


--------------------------EXPLAIN--------------------------
== Physical Plan ==
*(1) Filter (isnotnull(Customer#1) AND (Customer#1 = Google))
+- *(1) ColumnarToRow
   +- FileScan parquet [ID#0,Customer#1,Date#2,Quantity#3,Rate#4,Tags#5,Product#6] Batched: true, DataFilters: [isnotnull(Customer#1), (Customer#1 = Google)], Format: Parquet, Loca

### 05.03 Partitioning and coalescing
While performing actions, Spark creates results with the default partition count. In the case of Local mode, its usually equal to the number of cores. In the case of Clusters, the default is 200. This can be too much, if the number of cores in the cluster is significantly less than the number of partitions. So repartitioning helps to set the optimal number of partitions.

Repartition does a full reshuffle and can be used for increasing/decreasing partitions.

Coalasce simply consolidates existing partitions and avoids a full reshuffle. It can be used to decrease the number of partitions.

Repartition and Coalasce themselves take significant time and resources. Do them only if multiple steps downstream will benefit from them.

In [5]:
print("Default parallelism from Spark Session :", 
      spark.sparkContext.defaultParallelism)

#Read a file with default parallelism
raw_sales_data = spark\
                .read\
                .option("inferSchema", "true")\
                .option("header", "true")\
                .csv("datasets/sales_orders.csv")

#Partitions in sales data partitioned by product (read previously)
#1 partition per product
print("\nPartitions in hdfs data with parquet : ",
      sales_data.rdd.getNumPartitions())

#Raw partition count
print("\nPartitions in data frame for raw CSV read : ", 
      raw_sales_data.rdd.getNumPartitions())

#Repartition to 5 partitions
partitioned_sales_data = raw_sales_data.repartition(5)

print("\nPartitions in raw data frame after repartitioning : ", 
      partitioned_sales_data.rdd.getNumPartitions())

#coalesce to 3 partitions
coalesced_sales_data = partitioned_sales_data.coalesce(3)

print("\nPartitions in raw data frame after coalesce : ", 
      coalesced_sales_data.rdd.getNumPartitions())

Default parallelism from Spark Session : 8

Partitions in hdfs data with parquet :  4

Partitions in data frame for raw CSV read :  1

Partitions in raw data frame after repartitioning :  5

Partitions in raw data frame after coalesce :  3


### 05.04 Optimizing Joins
By default, joining two data frames require a lot of shuffling. If one data frame is considerably small, a better option is to broadcast that data frame to all the executors and then use those copies to join locally. Spark Optimizer chooses Broadcast joins when possible. Data frames within spark.sql.autoBroadcastJoinThreshold are automatically broadcasted

In [6]:
from pyspark.sql.functions import broadcast

product_data = spark\
                .read\
                .option("inferSchema", "true")\
                .option("header", "true")\
                .csv("datasets/product_vendor.csv")
product_data.show(5)

#Broadcast product data
broadcast_product=broadcast(product_data)

#Join with broadcasted local copy of product data
joined_data = sales_data.join(broadcast_product,"Product")

joined_data.show(5)

#show the execution plan
print("\n--------------------------EXPLAIN--------------------------")
joined_data.explain()
print("-------------------------END EXPLAIN-----------------------\n")

+--------+-----------+
| Product|     Vendor|
+--------+-----------+
|   Mouse|   Logitech|
|Keyboard|  Microsoft|
|  Webcam|   Logitech|
| Headset|Plantronics|
+--------+-----------+

+-------+---+--------+----------+--------+-----+--------------------+--------+
|Product| ID|Customer|      Date|Quantity| Rate|                Tags|  Vendor|
+-------+---+--------+----------+--------+-----+--------------------+--------+
|  Mouse|  6|  Google|2019/11/23|       5|40.58|                NULL|Logitech|
|  Mouse|  8|  Google|2019/11/13|       1|46.79|Urgent:Discount:P...|Logitech|
|  Mouse| 14|   Apple|2019/11/09|       4|40.27|            Discount|Logitech|
|  Mouse| 15|   Apple|2019/11/25|       5|38.89|                NULL|Logitech|
|  Mouse| 20|LinkedIn|2019/11/25|       4|36.77|       Urgent:Pickup|Logitech|
+-------+---+--------+----------+--------+-----+--------------------+--------+
only showing top 5 rows


--------------------------EXPLAIN--------------------------
== Physical Plan =

### 05.05 Storing Intermediate Results
By default, every time an action is performed, Spark executes all the previous steps right from the data read. This can end up being very expensive, especially while using Spark in a development or interactive mode. A better option is to cache intermediate results. Spark can cache in memory. It can also persist in both memory and disk. While running under YARN, persistance happens in HDFS by default.

In [7]:
print("Plan before caching intermediate results:\n-------------------------")
data_before = coalesced_sales_data.where(col("Customer") == 'Google')
data_before.explain()

#store intermediate results on disk
coalesced_sales_data.persist()

print("Plan after caching intermediate results:\n-------------------------")
data_after = coalesced_sales_data.where(col("Customer") == 'Google')
data_after.explain()


Plan before caching intermediate results:
-------------------------
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Coalesce 3
   +- Exchange RoundRobinPartitioning(5), REPARTITION_BY_NUM, [plan_id=270]
      +- Filter (isnotnull(Customer#93) AND (Customer#93 = Google))
         +- FileScan csv [ID#92,Customer#93,Product#94,Date#95,Quantity#96,Rate#97,Tags#98] Batched: false, DataFilters: [isnotnull(Customer#93), (Customer#93 = Google)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/linkedin/ExerciseFiles/datasets/sales_orders.csv], PartitionFilters: [], PushedFilters: [IsNotNull(Customer), EqualTo(Customer,Google)], ReadSchema: struct<ID:int,Customer:string,Product:string,Date:string,Quantity:int,Rate:double,Tags:string>


Plan after caching intermediate results:
-------------------------
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Filter (isnotnull(Customer#93) AND (Customer#93 = Google))
   +- InMemoryTableScan [ID#92, Customer#93, Product#9