In [2]:
data = [
    ("P101", "Laptop X1", "Electronics", 85000, 15, "S001", 4.5, "2023-06-10", "2026-06-10", "Mumbai"),
    ("P102", "Phone Z5", "Electronics", 60000, 30, "S002", 4.3, "2024-01-05", "2026-01-05", "Delhi"),
    ("P103", "Office Chair", "Furniture", 7000, 45, "S003", 4.2, "2023-10-20", "2027-10-20", "Pune"),
    ("P104", "Coffee Maker", "Kitchen", 4000, 20, "S004", 3.8, "2023-09-01", "2026-09-01", "Chennai"),
    ("P105", "TV 55inch", "Electronics", 95000, 10, "S001", 4.6, "2022-12-15", "2026-12-15", "Mumbai"),
    ("P106", "Water Bottle", "Kitchen", 250, 200, "S005", 4.1, "2024-02-11", "2026-02-11", "Delhi"),
    ("P107", "Desk Lamp", "Furniture", 1500, 70, "S003", 3.9, "2024-03-10", "2026-03-10", "Pune"),
    ("P108", "Gaming Console", "Electronics", 48000, 25, "S006", 4.8, "2024-05-20", "2027-05-20", "Hyderabad"),
    ("P109", "Notebook", "Stationery", 100, 500, "S007", 4.0, "2024-04-15", "2028-04-15", "Bengaluru"),
    ("P110", "Blender Pro", "Kitchen", 5500, 18, "S004", 4.4, "2023-11-10", "2026-11-10", "Chennai")
]

columns = ["product_id", "product_name", "category", "price", "quantity", "supplier_id", "rating",
           "manufacture_date", "expiry_date", "location"]

df = spark.createDataFrame(data, columns)
df.show()

+----------+--------------+-----------+-----+--------+-----------+------+----------------+-----------+---------+
|product_id|  product_name|   category|price|quantity|supplier_id|rating|manufacture_date|expiry_date| location|
+----------+--------------+-----------+-----+--------+-----------+------+----------------+-----------+---------+
|      P101|     Laptop X1|Electronics|85000|      15|       S001|   4.5|      2023-06-10| 2026-06-10|   Mumbai|
|      P102|      Phone Z5|Electronics|60000|      30|       S002|   4.3|      2024-01-05| 2026-01-05|    Delhi|
|      P103|  Office Chair|  Furniture| 7000|      45|       S003|   4.2|      2023-10-20| 2027-10-20|     Pune|
|      P104|  Coffee Maker|    Kitchen| 4000|      20|       S004|   3.8|      2023-09-01| 2026-09-01|  Chennai|
|      P105|     TV 55inch|Electronics|95000|      10|       S001|   4.6|      2022-12-15| 2026-12-15|   Mumbai|
|      P106|  Water Bottle|    Kitchen|  250|     200|       S005|   4.1|      2024-02-11| 2026-

In [3]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StringType

# Check data
df.show(5)
df.printSchema()


+----------+------------+-----------+-----+--------+-----------+------+----------------+-----------+--------+
|product_id|product_name|   category|price|quantity|supplier_id|rating|manufacture_date|expiry_date|location|
+----------+------------+-----------+-----+--------+-----------+------+----------------+-----------+--------+
|      P101|   Laptop X1|Electronics|85000|      15|       S001|   4.5|      2023-06-10| 2026-06-10|  Mumbai|
|      P102|    Phone Z5|Electronics|60000|      30|       S002|   4.3|      2024-01-05| 2026-01-05|   Delhi|
|      P103|Office Chair|  Furniture| 7000|      45|       S003|   4.2|      2023-10-20| 2027-10-20|    Pune|
|      P104|Coffee Maker|    Kitchen| 4000|      20|       S004|   3.8|      2023-09-01| 2026-09-01| Chennai|
|      P105|   TV 55inch|Electronics|95000|      10|       S001|   4.6|      2022-12-15| 2026-12-15|  Mumbai|
+----------+------------+-----------+-----+--------+-----------+------+----------------+-----------+--------+
only showi

In [6]:
#1. Filter by Category – Electronics with price > 70,000
df.filter((df.category == "Electronics") & (df.price > 70000)).show()


+----------+------------+-----------+-----+--------+-----------+------+----------------+-----------+--------+
|product_id|product_name|   category|price|quantity|supplier_id|rating|manufacture_date|expiry_date|location|
+----------+------------+-----------+-----+--------+-----------+------+----------------+-----------+--------+
|      P101|   Laptop X1|Electronics|85000|      15|       S001|   4.5|      2023-06-10| 2026-06-10|  Mumbai|
|      P105|   TV 55inch|Electronics|95000|      10|       S001|   4.6|      2022-12-15| 2026-12-15|  Mumbai|
+----------+------------+-----------+-----+--------+-----------+------+----------------+-----------+--------+



In [8]:
#2. Add Total Value Column (price × quantity)
df = df.withColumn("total_value", df.price * df.quantity)
df.show()

+----------+--------------+-----------+-----+--------+-----------+------+----------------+-----------+---------+-----------+
|product_id|  product_name|   category|price|quantity|supplier_id|rating|manufacture_date|expiry_date| location|total_value|
+----------+--------------+-----------+-----+--------+-----------+------+----------------+-----------+---------+-----------+
|      P101|     Laptop X1|Electronics|85000|      15|       S001|   4.5|      2023-06-10| 2026-06-10|   Mumbai|    1275000|
|      P102|      Phone Z5|Electronics|60000|      30|       S002|   4.3|      2024-01-05| 2026-01-05|    Delhi|    1800000|
|      P103|  Office Chair|  Furniture| 7000|      45|       S003|   4.2|      2023-10-20| 2027-10-20|     Pune|     315000|
|      P104|  Coffee Maker|    Kitchen| 4000|      20|       S004|   3.8|      2023-09-01| 2026-09-01|  Chennai|      80000|
|      P105|     TV 55inch|Electronics|95000|      10|       S001|   4.6|      2022-12-15| 2026-12-15|   Mumbai|     950000|


In [9]:
#3. Average Price per Category
df.groupBy("category").agg(F.avg("price").alias("avg_price")).show()


+-----------+---------+
|   category|avg_price|
+-----------+---------+
|    Kitchen|   3250.0|
|Electronics|  72000.0|
|  Furniture|   4250.0|
| Stationery|    100.0|
+-----------+---------+



In [10]:
#4. Top Rated Products (Top 3)
df.orderBy(F.col("rating").desc()).show(3)

+----------+--------------+-----------+-----+--------+-----------+------+----------------+-----------+---------+-----------+
|product_id|  product_name|   category|price|quantity|supplier_id|rating|manufacture_date|expiry_date| location|total_value|
+----------+--------------+-----------+-----+--------+-----------+------+----------------+-----------+---------+-----------+
|      P108|Gaming Console|Electronics|48000|      25|       S006|   4.8|      2024-05-20| 2027-05-20|Hyderabad|    1200000|
|      P105|     TV 55inch|Electronics|95000|      10|       S001|   4.6|      2022-12-15| 2026-12-15|   Mumbai|     950000|
|      P101|     Laptop X1|Electronics|85000|      15|       S001|   4.5|      2023-06-10| 2026-06-10|   Mumbai|    1275000|
+----------+--------------+-----------+-----+--------+-----------+------+----------------+-----------+---------+-----------+
only showing top 3 rows



In [11]:
#5. Count Products by Location
df.groupBy("location").count().show()


+---------+-----+
| location|count|
+---------+-----+
|  Chennai|    2|
|   Mumbai|    2|
|     Pune|    2|
|    Delhi|    2|
|Bengaluru|    1|
|Hyderabad|    1|
+---------+-----+



In [12]:
#6. Products Manufactured in 2024
df.filter(F.year("manufacture_date") == 2024).show()

+----------+--------------+-----------+-----+--------+-----------+------+----------------+-----------+---------+-----------+
|product_id|  product_name|   category|price|quantity|supplier_id|rating|manufacture_date|expiry_date| location|total_value|
+----------+--------------+-----------+-----+--------+-----------+------+----------------+-----------+---------+-----------+
|      P102|      Phone Z5|Electronics|60000|      30|       S002|   4.3|      2024-01-05| 2026-01-05|    Delhi|    1800000|
|      P106|  Water Bottle|    Kitchen|  250|     200|       S005|   4.1|      2024-02-11| 2026-02-11|    Delhi|      50000|
|      P107|     Desk Lamp|  Furniture| 1500|      70|       S003|   3.9|      2024-03-10| 2026-03-10|     Pune|     105000|
|      P108|Gaming Console|Electronics|48000|      25|       S006|   4.8|      2024-05-20| 2027-05-20|Hyderabad|    1200000|
|      P109|      Notebook| Stationery|  100|     500|       S007|   4.0|      2024-04-15| 2028-04-15|Bengaluru|      50000|


In [13]:
#7. Handle Missing Prices → Fill with category avg
avg_prices = df.groupBy("category").agg(F.avg("price").alias("avg_price"))
df = df.join(avg_prices, "category", "left") \
       .withColumn("price", F.when(F.col("price").isNull(), F.col("avg_price")).otherwise(F.col("price"))) \
       .drop("avg_price")


In [15]:
#8. Expiring Soon (< 2026-01-01)
df.filter(df.expiry_date < "2026-01-01").show()

+--------+----------+------------+-----+--------+-----------+------+----------------+-----------+--------+-----------+
|category|product_id|product_name|price|quantity|supplier_id|rating|manufacture_date|expiry_date|location|total_value|
+--------+----------+------------+-----+--------+-----------+------+----------------+-----------+--------+-----------+
+--------+----------+------------+-----+--------+-----------+------+----------------+-----------+--------+-----------+



In [16]:
#9. Rename Columns
df = df.withColumnRenamed("product_name", "name") \
       .withColumnRenamed("supplier_id", "supplier")


In [17]:
#10. Supplier-wise Summary
df.groupBy("supplier").agg(
    F.sum("quantity").alias("total_qty"),
    F.avg("rating").alias("avg_rating")
).show()

+--------+---------+----------+
|supplier|total_qty|avg_rating|
+--------+---------+----------+
|    S004|       38|       4.1|
|    S001|       25|      4.55|
|    S002|       30|       4.3|
|    S003|      115|      4.05|
|    S005|      200|       4.1|
|    S006|       25|       4.8|
|    S007|      500|       4.0|
+--------+---------+----------+



In [18]:
#11. Add Category Prefix Code
df = df.withColumn("product_code", F.concat(F.substring("category", 1, 3), F.lit("_"), F.col("product_id")))


In [19]:
#12. Products with Low Stock (< 20)
df.filter(df.quantity < 20).orderBy("quantity").show()


+-----------+----------+-----------+-------+--------+--------+------+----------------+-----------+--------+-----------+------------+
|   category|product_id|       name|  price|quantity|supplier|rating|manufacture_date|expiry_date|location|total_value|product_code|
+-----------+----------+-----------+-------+--------+--------+------+----------------+-----------+--------+-----------+------------+
|Electronics|      P105|  TV 55inch|95000.0|      10|    S001|   4.6|      2022-12-15| 2026-12-15|  Mumbai|     950000|    Ele_P105|
|Electronics|      P101|  Laptop X1|85000.0|      15|    S001|   4.5|      2023-06-10| 2026-06-10|  Mumbai|    1275000|    Ele_P101|
|    Kitchen|      P110|Blender Pro| 5500.0|      18|    S004|   4.4|      2023-11-10| 2026-11-10| Chennai|      99000|    Kit_P110|
+-----------+----------+-----------+-------+--------+--------+------+----------------+-----------+--------+-----------+------------+



In [20]:
#13. Join with Supplier Details
suppliers = [
    ("S001", "Global Electronics", "Mumbai"),
    ("S002", "Phone Hub", "Delhi"),
    ("S003", "Comfort Furnishings", "Pune"),
    ("S004", "Kitchen Plus", "Chennai"),
    ("S005", "Aqua Traders", "Delhi"),
    ("S006", "Game World", "Hyderabad"),
    ("S007", "Paper House", "Bengaluru"),
]
sup_df = spark.createDataFrame(suppliers, ["supplier", "supplier_name", "city"])
df.join(sup_df, "supplier").show()


+--------+-----------+----------+--------------+-------+--------+------+----------------+-----------+---------+-----------+------------+-------------------+---------+
|supplier|   category|product_id|          name|  price|quantity|rating|manufacture_date|expiry_date| location|total_value|product_code|      supplier_name|     city|
+--------+-----------+----------+--------------+-------+--------+------+----------------+-----------+---------+-----------+------------+-------------------+---------+
|    S001|Electronics|      P105|     TV 55inch|95000.0|      10|   4.6|      2022-12-15| 2026-12-15|   Mumbai|     950000|    Ele_P105| Global Electronics|   Mumbai|
|    S001|Electronics|      P101|     Laptop X1|85000.0|      15|   4.5|      2023-06-10| 2026-06-10|   Mumbai|    1275000|    Ele_P101| Global Electronics|   Mumbai|
|    S002|Electronics|      P102|      Phone Z5|60000.0|      30|   4.3|      2024-01-05| 2026-01-05|    Delhi|    1800000|    Ele_P102|          Phone Hub|    Delhi

In [21]:
#14. Discounted Price for Electronics (10%)
df = df.withColumn("discount_price",
                   F.when(df.category == "Electronics", df.price * 0.9).otherwise(df.price))


In [22]:
#15. Window Function – Top 2 Highest-Priced Products per Category
w = Window.partitionBy("category").orderBy(F.col("price").desc())
df.withColumn("rank", F.row_number().over(w)).filter(F.col("rank") <= 2).show()


+-----------+----------+------------+-------+--------+--------+------+----------------+-----------+---------+-----------+------------+--------------+----+
|   category|product_id|        name|  price|quantity|supplier|rating|manufacture_date|expiry_date| location|total_value|product_code|discount_price|rank|
+-----------+----------+------------+-------+--------+--------+------+----------------+-----------+---------+-----------+------------+--------------+----+
|Electronics|      P105|   TV 55inch|95000.0|      10|    S001|   4.6|      2022-12-15| 2026-12-15|   Mumbai|     950000|    Ele_P105|       85500.0|   1|
|Electronics|      P101|   Laptop X1|85000.0|      15|    S001|   4.5|      2023-06-10| 2026-06-10|   Mumbai|    1275000|    Ele_P101|       76500.0|   2|
|  Furniture|      P103|Office Chair| 7000.0|      45|    S003|   4.2|      2023-10-20| 2027-10-20|     Pune|     315000|    Fur_P103|        7000.0|   1|
|  Furniture|      P107|   Desk Lamp| 1500.0|      70|    S003|   3.9|

In [23]:
#16. Average Rating by City (after join)
df.join(sup_df, "supplier").groupBy("city").agg(F.avg("rating")).show()


+---------+-----------------+
|     city|      avg(rating)|
+---------+-----------------+
|  Chennai|              4.1|
|   Mumbai|             4.55|
|     Pune|             4.05|
|    Delhi|4.199999999999999|
|Bengaluru|              4.0|
|Hyderabad|              4.8|
+---------+-----------------+



In [24]:
#17. Extract Brand Name (first word)
df = df.withColumn("brand", F.split(df.name, " ").getItem(0))

In [25]:
#18. Create Expiry Flag
df = df.withColumn("expired", df.expiry_date < F.current_date())


In [26]:
#19. Pivot Monthly Stock
sales = [
    ("P101", "Jan", 5), ("P101", "Feb", 7), ("P102", "Jan", 3),
    ("P103", "Feb", 2), ("P104", "Mar", 4),
]
sales_df = spark.createDataFrame(sales, ["product_id", "month", "sold_quantity"])
sales_df.groupBy("product_id").pivot("month").sum("sold_quantity").show()


+----------+----+----+----+
|product_id| Feb| Jan| Mar|
+----------+----+----+----+
|      P102|NULL|   3|NULL|
|      P103|   2|NULL|NULL|
|      P104|NULL|NULL|   4|
|      P101|   7|   5|NULL|
+----------+----+----+----+



In [27]:
#20. UDF for Price Category
def price_cat(price):
    if price >= 50000: return "High"
    elif price >= 5000: return "Medium"
    else: return "Low"

price_udf = F.udf(price_cat, StringType())
df = df.withColumn("price_category", price_udf(df.price))


In [28]:
#21. Stock Value per Supplier
df.withColumn("stock_value", df.price * df.quantity) \
  .groupBy("supplier").agg(F.sum("stock_value").alias("total_stock_value")).show()


+--------+-----------------+
|supplier|total_stock_value|
+--------+-----------------+
|    S004|         179000.0|
|    S001|        2225000.0|
|    S002|        1800000.0|
|    S005|          50000.0|
|    S006|        1200000.0|
|    S003|         420000.0|
|    S007|          50000.0|
+--------+-----------------+



In [29]:
#22. Products Without Expiry (filter nulls out)
df.filter(df.expiry_date.isNotNull()).show()


+-----------+----------+--------------+-------+--------+--------+------+----------------+-----------+---------+-----------+------------+--------------+--------+-------+--------------+
|   category|product_id|          name|  price|quantity|supplier|rating|manufacture_date|expiry_date| location|total_value|product_code|discount_price|   brand|expired|price_category|
+-----------+----------+--------------+-------+--------+--------+------+----------------+-----------+---------+-----------+------------+--------------+--------+-------+--------------+
|    Kitchen|      P104|  Coffee Maker| 4000.0|      20|    S004|   3.8|      2023-09-01| 2026-09-01|  Chennai|      80000|    Kit_P104|        4000.0|  Coffee|  false|           Low|
|Electronics|      P101|     Laptop X1|85000.0|      15|    S001|   4.5|      2023-06-10| 2026-06-10|   Mumbai|    1275000|    Ele_P101|       76500.0|  Laptop|  false|          High|
|Electronics|      P102|      Phone Z5|60000.0|      30|    S002|   4.3|      20

In [30]:
#23. Write Cleaned Data in Parquet Partitioned by Category
df.write.mode("overwrite").partitionBy("category").parquet("/output/processed_products/")
