In [0]:
data = [
    (1001, "Electronics", "Mobile", 25000, "Mumbai", "2026-01-01"),
    (1002, "Electronics", "Laptop", 65000, "Delhi", "2026-01-02"),
    (1003, "Fashion", "Shoes", 3000, "Bangalore", "2026-01-02"),
    (1004, "Home", "Mixer", 4500, "Mumbai", "2026-01-03"),
    (1005, "Fashion", "T-Shirt", 1500, "Delhi", "2026-01-03"),
    (1006, "Electronics", "Headphones", 2000, "Pune", "2026-01-04"),
    (1007, "Home", "Vacuum Cleaner", 12000, "Bangalore", "2026-01-04")
]

In [0]:
columns = ["order_id", "category", "product", "price", "city", "order_date"]
df_orders= spark.createDataFrame(data, columns)
#df_orders.createOrReplaceTempView("orders")
display(df_orders)

order_id,category,product,price,city,order_date
1001,Electronics,Mobile,25000,Mumbai,2026-01-01
1002,Electronics,Laptop,65000,Delhi,2026-01-02
1003,Fashion,Shoes,3000,Bangalore,2026-01-02
1004,Home,Mixer,4500,Mumbai,2026-01-03
1005,Fashion,T-Shirt,1500,Delhi,2026-01-03
1006,Electronics,Headphones,2000,Pune,2026-01-04
1007,Home,Vacuum Cleaner,12000,Bangalore,2026-01-04


In [0]:
category_data = [
    ("Electronics", "High Value"),
    ("Fashion", "Medium Value"),
    ("Home", "Medium Value")
]

category_columns = ["category", "category_type"]

df_category = spark.createDataFrame(category_data, category_columns)
#df_category.createOrReplaceTempView("category")
display(df_category)

category,category_type
Electronics,High Value
Fashion,Medium Value
Home,Medium Value


In [0]:
#innerjoin
df_orders.join(df_category, df_orders.category == df_category.category, "inner").display()
#leftjoin
#df_orders.join(df_category)

order_id,category,product,price,city,order_date,category.1,category_type
1001,Electronics,Mobile,25000,Mumbai,2026-01-01,Electronics,High Value
1002,Electronics,Laptop,65000,Delhi,2026-01-02,Electronics,High Value
1003,Fashion,Shoes,3000,Bangalore,2026-01-02,Fashion,Medium Value
1004,Home,Mixer,4500,Mumbai,2026-01-03,Home,Medium Value
1005,Fashion,T-Shirt,1500,Delhi,2026-01-03,Fashion,Medium Value
1006,Electronics,Headphones,2000,Pune,2026-01-04,Electronics,High Value
1007,Home,Vacuum Cleaner,12000,Bangalore,2026-01-04,Home,Medium Value


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, row_number, rank
window_spec=Window.partitionBy("city").orderBy("order_date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_orders.withColumn("row_number", row_number().over(window_spec)).display()
from pyspark.sql.functions import sum, row_number

order_id,category,product,price,city,order_date,row_number
1003,Fashion,Shoes,3000,Bangalore,2026-01-02,1
1007,Home,Vacuum Cleaner,12000,Bangalore,2026-01-04,2
1002,Electronics,Laptop,65000,Delhi,2026-01-02,1
1005,Fashion,T-Shirt,1500,Delhi,2026-01-03,2
1001,Electronics,Mobile,25000,Mumbai,2026-01-01,1
1004,Home,Mixer,4500,Mumbai,2026-01-03,2
1006,Electronics,Headphones,2000,Pune,2026-01-04,1


In [0]:
rank_window=Window.partitionBy("category").orderBy(df_orders.price.desc())
df_orders.withColumn("rank", rank().over(rank_window)).display()
from pyspark.sql.functions import sum, row_number

order_id,category,product,price,city,order_date,rank
1002,Electronics,Laptop,65000,Delhi,2026-01-02,1
1001,Electronics,Mobile,25000,Mumbai,2026-01-01,2
1006,Electronics,Headphones,2000,Pune,2026-01-04,3
1003,Fashion,Shoes,3000,Bangalore,2026-01-02,1
1005,Fashion,T-Shirt,1500,Delhi,2026-01-03,2
1007,Home,Vacuum Cleaner,12000,Bangalore,2026-01-04,1
1004,Home,Mixer,4500,Mumbai,2026-01-03,2


In [0]:
from pyspark.sql.functions import when
df_orders.withColumn(
    "price_bucket",
    when(df_orders.price > 20000, "High")
    .when(df_orders.price >= 5000, "Medium")
    .otherwise("Low")
).display()

order_id,category,product,price,city,order_date,price_bucket
1001,Electronics,Mobile,25000,Mumbai,2026-01-01,High
1002,Electronics,Laptop,65000,Delhi,2026-01-02,High
1003,Fashion,Shoes,3000,Bangalore,2026-01-02,Low
1004,Home,Mixer,4500,Mumbai,2026-01-03,Low
1005,Fashion,T-Shirt,1500,Delhi,2026-01-03,Low
1006,Electronics,Headphones,2000,Pune,2026-01-04,Low
1007,Home,Vacuum Cleaner,12000,Bangalore,2026-01-04,Medium


In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def city_region(city):
    if city in ["Mumbai", "Delhi", "Pune"]:
        return "North/West"
    else:
        return "South"

city_region_udf = udf(city_region, StringType())

In [0]:
df_orders.withColumn(
    "region",
    city_region_udf(df_orders.city)
).show()

+--------+-----------+--------------+-----+---------+----------+----------+
|order_id|   category|       product|price|     city|order_date|    region|
+--------+-----------+--------------+-----+---------+----------+----------+
|    1001|Electronics|        Mobile|25000|   Mumbai|2026-01-01|North/West|
|    1002|Electronics|        Laptop|65000|    Delhi|2026-01-02|North/West|
|    1003|    Fashion|         Shoes| 3000|Bangalore|2026-01-02|     South|
|    1004|       Home|         Mixer| 4500|   Mumbai|2026-01-03|North/West|
|    1005|    Fashion|       T-Shirt| 1500|    Delhi|2026-01-03|North/West|
|    1006|Electronics|    Headphones| 2000|     Pune|2026-01-04|North/West|
|    1007|       Home|Vacuum Cleaner|12000|Bangalore|2026-01-04|     South|
+--------+-----------+--------------+-----+---------+----------+----------+

