In [1]:
from pyspark.sql import *
from pyspark.sql.functions import spark_partition_id

In [4]:
spark = SparkSession.builder.appName("Spark Optimization") \
.config("spark.sql.ui.explainMode", "extended").getOrCreate()

df = spark.read.csv("BigMart Sales.csv", header=True, inferSchema=True)

In [5]:
df.show(5)

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Superma

In [6]:
df.rdd.getNumPartitions()

1

# Changing Partition Size to 128KB

In [7]:
spark.conf.set("spark.sql.files.maxPartitionBytes", 131072)  # 128KB

In [8]:
df = spark.read.csv("BigMart Sales.csv", header=True, inferSchema=True)
df.rdd.getNumPartitions()

7

# Repartition

In [9]:
df = df.repartition(10)
df.rdd.getNumPartitions()

10

In [10]:
df.withColumn("partition_id", spark_partition_id()).show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|partition_id|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+------------+
|          FDS52|       NULL|         Low Fat|    0.005448005|        Frozen Foods|102.1016|           OUT027|                     1985|     Medium|              Tier 3|Supermarket Type3|         3542.056|           0|
|          FDL22|      16.85|         Low Fat|    0.036390174|         Snack Foods| 91.4488|           OUT046|              

# save data into parquet

In [11]:
df.write.format("parquet").mode("append") \
.option("path", "outputData/output_parquet").save()

# New data reading

In [12]:
df_new = spark.read.format("parquet").load("outputData/output_parquet")
df_new = df_new.filter(df_new.Outlet_Location_Type == 'Tier 1')
df_new.show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDK14|       6.98|         Low Fat|    0.041105789|              Canned| 83.4934|           OUT046|                     1997|      Small|              Tier 1|Supermarket Type1|        1555.9746|
|          FDJ08|       NULL|         Low Fat|    0.193772568|Fruits and Vegeta...|190.3846|           OUT019|                     1985|      Small|              Tier 1|    Gro

# scanning optimization

In [None]:
# if we use Outlet_Location_Type as a filter,
# then we can partition the data based on this column

In [13]:
df.write.format("parquet").mode("overwrite") \
.partitionBy("Outlet_Location_Type") \
.option("path", "outputData/output_parquet_optimized") \
.save()

In [14]:
df_optimized = spark.read.format("parquet").load("outputData/output_parquet_optimized")
df_optimized = df_optimized.filter(df_optimized.Outlet_Location_Type == 'Tier 1')
df_optimized.show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+-----------------+-----------------+--------------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|      Outlet_Type|Item_Outlet_Sales|Outlet_Location_Type|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+-----------------+-----------------+--------------------+
|          FDH28|      15.85|         Regular|    0.110030997|        Frozen Foods| 37.2506|           OUT046|                     1997|      Small|Supermarket Type1|         265.6542|              Tier 1|
|          NCW53|       NULL|         Low Fat|    0.053392944|  Health and Hygiene|193.8162|           OUT019|                     1985|      Small|    Grocery Store|         3