#### PySpark Configurations ####

In [1]:
# Import Libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.storagelevel import StorageLevel

In [3]:
# Initialize Spark session
spark = SparkSession.builder \
            .master("spark://spark-master:7077") \
                .appName("Ansh-Lamba-Apache-Spark") \
                    .config("spark.ui.port", "4040") \
                        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/19 13:38:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Adaptive Query Execution - AQE
# spark.conf.set("spark.sql.adaptive.enabled", False)    # Disable AQE, its enabled by default
# print('Adaptive Query Execution (AQE) enabled:', spark.conf.get("spark.sql.adaptive.enabled"))   # Check if AQE is enabled

' \nAdaptive Query Execution (AQE) is an optimization technique in Spark SQL that makes use of the runtime statistics to choose the most efficient query execution \nplan, which is enabled by default since Apache Spark 3.2.0.\n'

#### Reading data from CSV file ####

In [4]:
# Create root directory
INPUT_DATA_ROOT = "/opt/spark-data/input/ansh-lamba"

In [5]:
# Read CSV file with Infered schema
df_mega_mart = spark.read.format("csv") \
                    .option('inferSchema',True) \
                        .option("header", True) \
                            .load(f"{INPUT_DATA_ROOT}/MegaMart.csv")

                                                                                

In [6]:
# Check first N records
df_mega_mart.show(5, truncate=False)

+--------+-------+----------+----------+----------------+------------+--------+--------------+--------------+------------+
|order_id|user_id|order_date|product_id|product_category|product_name|quantity|price_per_unit|payment_method|order_status|
+--------+-------+----------+----------+----------------+------------+--------+--------------+--------------+------------+
|1001    |U188   |2025-04-20|P940      |Fashion         |Sneakers    |2       |58.53         |PayPal        |Cancelled   |
|1002    |U062   |2025-04-16|P794      |Fashion         |T-Shirt     |3       |83.76         |UPI           |Returned    |
|1003    |U058   |2025-04-18|P326      |Fashion         |Sunglasses  |2       |78.85         |PayPal        |Processing  |
|1004    |U011   |2025-04-10|P574      |Fashion         |Sunglasses  |5       |46.49         |PayPal        |Delivered   |
|1005    |U003   |2025-04-19|P988      |Home Decor      |Photo Frame |2       |78.61         |PayPal        |Returned    |
+--------+------

In [7]:
# Check dataframe schema
df_mega_mart.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price_per_unit: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- order_status: string (nullable = true)



#### Transformations - Narrow & Wide ####

In [8]:
# Select all records where product_category == 'Fashion' using FILTER()
df_fashion = df_mega_mart \
                .filter(col('product_category') == 'Fashion')

In [9]:
# Check first N records
df_fashion.show(5, truncate=False)

+--------+-------+----------+----------+----------------+------------+--------+--------------+--------------+------------+
|order_id|user_id|order_date|product_id|product_category|product_name|quantity|price_per_unit|payment_method|order_status|
+--------+-------+----------+----------+----------------+------------+--------+--------------+--------------+------------+
|1001    |U188   |2025-04-20|P940      |Fashion         |Sneakers    |2       |58.53         |PayPal        |Cancelled   |
|1002    |U062   |2025-04-16|P794      |Fashion         |T-Shirt     |3       |83.76         |UPI           |Returned    |
|1003    |U058   |2025-04-18|P326      |Fashion         |Sunglasses  |2       |78.85         |PayPal        |Processing  |
|1004    |U011   |2025-04-10|P574      |Fashion         |Sunglasses  |5       |46.49         |PayPal        |Delivered   |
|1012    |U148   |2025-04-24|P315      |Fashion         |Sunglasses  |5       |69.14         |Credit Card   |Processing  |
+--------+------

In [10]:
# Check execution plan
df_fashion.explain()

== Physical Plan ==
*(1) Filter (isnotnull(product_category#21) AND (product_category#21 = Fashion))
+- FileScan csv [order_id#17,user_id#18,order_date#19,product_id#20,product_category#21,product_name#22,quantity#23,price_per_unit#24,payment_method#25,order_status#26] Batched: false, DataFilters: [isnotnull(product_category#21), (product_category#21 = Fashion)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/opt/spark-data/input/ansh-lamba/MegaMart.csv], PartitionFilters: [], PushedFilters: [IsNotNull(product_category), EqualTo(product_category,Fashion)], ReadSchema: struct<order_id:int,user_id:string,order_date:date,product_id:string,product_category:string,prod...




In [11]:
# Count all records by product_category using GROUPBY()
df_product_category_summary = df_mega_mart \
                                .groupby('product_category') \
                                    .agg(count(col('order_id')).alias('total_records'))

In [12]:
# Check first N records
df_product_category_summary.show(5, truncate=False)

[Stage 4:>                                                          (0 + 1) / 1]

+----------------+-------------+
|product_category|total_records|
+----------------+-------------+
|Kitchen         |203          |
|Fashion         |188          |
|Electronics     |223          |
|Books           |193          |
|Home Decor      |193          |
+----------------+-------------+



                                                                                

In [13]:
# Check execution plan
df_product_category_summary.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[product_category#21], functions=[count(order_id#17)])
   +- Exchange hashpartitioning(product_category#21, 200), ENSURE_REQUIREMENTS, [plan_id=108]
      +- HashAggregate(keys=[product_category#21], functions=[partial_count(order_id#17)])
         +- FileScan csv [order_id#17,product_category#21] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/opt/spark-data/input/ansh-lamba/MegaMart.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<order_id:int,product_category:string>




#### Transformations - Repartition VS. Coalesce ####

In [14]:
# Get number of Partitions
df_product_category_summary.rdd.getNumPartitions()

1

In [15]:
# Create 3 partitions on dataframe using Repartition
df_product_category_summary = df_product_category_summary.repartition(3)

In [16]:
# Get number of Partitions - Repartition VS. Coalesce
df_product_category_summary.rdd.getNumPartitions()

3

In [17]:
# Check execution plan
df_product_category_summary.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   ShuffleQueryStage 1
   +- Exchange RoundRobinPartitioning(3), REPARTITION_BY_NUM, [plan_id=185]
      +- *(2) HashAggregate(keys=[product_category#21], functions=[count(order_id#17)])
         +- AQEShuffleRead coalesced
            +- ShuffleQueryStage 0
               +- Exchange hashpartitioning(product_category#21, 200), ENSURE_REQUIREMENTS, [plan_id=162]
                  +- *(1) HashAggregate(keys=[product_category#21], functions=[partial_count(order_id#17)])
                     +- FileScan csv [order_id#17,product_category#21] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/opt/spark-data/input/ansh-lamba/MegaMart.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<order_id:int,product_category:string>
+- == Initial Plan ==
   Exchange RoundRobinPartitioning(3), REPARTITION_BY_NUM, [plan_id=150]
   +- HashAggregate(keys=[product_category#21], func

In [18]:
# Return dataframe back to 1 partition using Coalesce
df_product_category_summary = df_product_category_summary.coalesce(1)

In [19]:
# Get number of Partitions - Repartition VS. Coalesce
df_product_category_summary.rdd.getNumPartitions()

1

In [20]:
# Check execution plan
df_product_category_summary.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   Coalesce 1
   +- ShuffleQueryStage 1
      +- Exchange RoundRobinPartitioning(3), REPARTITION_BY_NUM, [plan_id=255]
         +- *(2) HashAggregate(keys=[product_category#21], functions=[count(order_id#17)])
            +- AQEShuffleRead coalesced
               +- ShuffleQueryStage 0
                  +- Exchange hashpartitioning(product_category#21, 200), ENSURE_REQUIREMENTS, [plan_id=228]
                     +- *(1) HashAggregate(keys=[product_category#21], functions=[partial_count(order_id#17)])
                        +- FileScan csv [order_id#17,product_category#21] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/opt/spark-data/input/ansh-lamba/MegaMart.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<order_id:int,product_category:string>
+- == Initial Plan ==
   Coalesce 1
   +- Exchange RoundRobinPartitioning(3), REPARTITION_BY_NUM, [plan_id=21

#### Executor Out of Memory (OOM) - Caching, Persist ####

In [21]:
# Create dummy datasets
siblings = [
            (1, 'Kwaku Jude', 40, 'M', 10_090.50), \
            (2, 'Yaw David', 36, 'M', 9_001.10), \
            (3, 'Kofi Baffuor', 34, 'M', 8_200.99), \
            (4, 'Abena Salo', 32, 'F', 7_905.00), \
            (5, 'Abena Pat', 30, 'F', 7_005.19)
            ]

In [22]:
# Schema definition
siblings_schema = 'Id INT, Name STRING, Age INT, Gender STRING, Salary DOUBLE'

In [23]:
# Create Spark DataFrame from dummy data
df_siblings = spark.createDataFrame(data=siblings, schema=siblings_schema)

In [24]:
# Check first N records
df_siblings.show()

                                                                                

+---+------------+---+------+-------+
| Id|        Name|Age|Gender| Salary|
+---+------------+---+------+-------+
|  1|  Kwaku Jude| 40|     M|10090.5|
|  2|   Yaw David| 36|     M| 9001.1|
|  3|Kofi Baffuor| 34|     M|8200.99|
|  4|  Abena Salo| 32|     F| 7905.0|
|  5|   Abena Pat| 30|     F|7005.19|
+---+------------+---+------+-------+



In [25]:
# Add Country column to dataframe
df_siblings = df_siblings \
                .withColumn('Country', when(col('Name') == 'Kwaku Jude', ('USA')) \
                    .when(col('Name') == 'Yaw David', ('N/A')) \
                            .otherwise('GHANA'))

In [26]:
# Check first N records
df_siblings.show()

+---+------------+---+------+-------+-------+
| Id|        Name|Age|Gender| Salary|Country|
+---+------------+---+------+-------+-------+
|  1|  Kwaku Jude| 40|     M|10090.5|    USA|
|  2|   Yaw David| 36|     M| 9001.1|    N/A|
|  3|Kofi Baffuor| 34|     M|8200.99|  GHANA|
|  4|  Abena Salo| 32|     F| 7905.0|  GHANA|
|  5|   Abena Pat| 30|     F|7005.19|  GHANA|
+---+------------+---+------+-------+-------+



In [27]:
# Cache data frame
# NB: Only Cache smaller sized data frames
# df_siblings.cache()

# Uncache data frame
# df_siblings.uncache()

In [28]:
# Persist data frame
df_siblings.persist(StorageLevel.MEMORY_ONLY)


# Un-persist data frame
# df_siblings.unpersist()

DataFrame[Id: int, Name: string, Age: int, Gender: string, Salary: double, Country: string]

In [29]:
# Filter out all records where Gender = 'M'
df_male_siblings = df_siblings.filter(col('Gender') == 'M')

In [30]:
# Check first N records
df_male_siblings.show()

+---+------------+---+------+-------+-------+
| Id|        Name|Age|Gender| Salary|Country|
+---+------------+---+------+-------+-------+
|  1|  Kwaku Jude| 40|     M|10090.5|    USA|
|  2|   Yaw David| 36|     M| 9001.1|    N/A|
|  3|Kofi Baffuor| 34|     M|8200.99|  GHANA|
+---+------------+---+------+-------+-------+



In [31]:
# Check execution plan
df_male_siblings.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Filter (isnotnull(Gender#188) AND (Gender#188 = M))
   +- InMemoryTableScan [Id#185, Name#186, Age#187, Gender#188, Salary#189, Country#216], [isnotnull(Gender#188), (Gender#188 = M)]
         +- InMemoryRelation [Id#185, Name#186, Age#187, Gender#188, Salary#189, Country#216], StorageLevel(memory, 1 replicas)
               +- *(1) Project [Id#185, Name#186, Age#187, Gender#188, Salary#189, CASE WHEN (Name#186 = Kwaku Jude) THEN USA WHEN (Name#186 = Yaw David) THEN N/A ELSE GHANA END AS Country#216]
                  +- *(1) Scan ExistingRDD[Id#185,Name#186,Age#187,Gender#188,Salary#189]




#### Writing data to Parquet files - Partion pruning & Dynamic Partition pruning ####

In [32]:
# Create root directory
OUTPUT_DATA_ROOT = "/opt/spark-data/output/ansh-lamba"
OUTPUT_DATA_ALT = "/opt/spark-data/output/ansh-lamba/siblings-dataset.parquet"

MODE = "OVERWRITE"  # MODES = APPEND, OVERWRITE, ERROR, IGNORE

In [33]:
# Save dataframe to Parquet file
partition_by_columns = ["Gender"]

df_siblings \
    .write \
        .format("parquet") \
            .partitionBy(*partition_by_columns) \
                .mode(MODE) \
                    .save(f"{OUTPUT_DATA_ROOT}/siblings-dataset.parquet")

                                                                                

In [34]:
# Read data frame from disk - Without Partitions
df_siblings_no_partition = spark.read.format("parquet") \
                    .option('inferSchema',True) \
                        .option("header", True) \
                            .load(OUTPUT_DATA_ALT)

In [35]:
# Check first N records
df_siblings_no_partition.show()

                                                                                

+---+------------+---+-------+-------+------+
| Id|        Name|Age| Salary|Country|Gender|
+---+------------+---+-------+-------+------+
|  3|Kofi Baffuor| 34|8200.99|  GHANA|     M|
|  4|  Abena Salo| 32| 7905.0|  GHANA|     F|
|  5|   Abena Pat| 30|7005.19|  GHANA|     F|
|  1|  Kwaku Jude| 40|10090.5|    USA|     M|
|  2|   Yaw David| 36| 9001.1|    N/A|     M|
+---+------------+---+-------+-------+------+



In [36]:
# Read data frame from disk - With Partitions
df_siblings_partitions = spark.read.format("parquet") \
                    .option('inferSchema',True) \
                        .option("header", True) \
                            .load(OUTPUT_DATA_ALT) \
                                .filter(col('Gender') == 'F')

In [37]:
# Check first N records
df_siblings_partitions.show()

+---+----------+---+-------+-------+------+
| Id|      Name|Age| Salary|Country|Gender|
+---+----------+---+-------+-------+------+
|  4|Abena Salo| 32| 7905.0|  GHANA|     F|
|  5| Abena Pat| 30|7005.19|  GHANA|     F|
+---+----------+---+-------+-------+------+



In [38]:
# Join df_siblings_no_partition & df_siblings_partitions data frames
"""
df_siblings_join = df_siblings_no_partition \
                    .join(df_siblings_partitions, (df_siblings_no_partition['Name'] == df_siblings_partitions['Name']) & (df_siblings_no_partition['Gender'] == df_siblings_partitions['Gender'])), how='inner')

"""

"\ndf_siblings_join = df_siblings_no_partition                     .join(df_siblings_partitions, (df_siblings_no_partition['Name'] == df_siblings_partitions['Name']) & (df_siblings_no_partition['Gender'] == df_siblings_partitions['Gender'])), how='inner')\n\n"