In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum as _sum, rank, desc, expr
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("Retail360").getOrCreate()

In [0]:
# Customer data
customers = [
    (1, "Arjun Rao", "North", "arjun@example.com"),
    (2, "Sneha Patel", "South", "sneha@example.com"),
    (3, "Rahul Sharma", "East", "rahul@example.com"),
    (4, "Neha Iyer", "West", "neha@example.com")
]
customers_df = spark.createDataFrame(customers, ["customer_id", "name", "region", "email"])

In [0]:

# Orders Day 1
orders_day1 = [
    (1001, 1, "Laptop", 2, 55000, "Completed", "2024-01-15"),
    (1002, 2, "Mobile", 3, 25000, "Completed", "2024-01-16"),
    (1003, 3, "Book", 10, 700, "Pending", "2024-01-16"),
    (1004, 1, "Headphones", 5, 3000, "Completed", "2024-01-17")
]
orders_day1_df = spark.createDataFrame(orders_day1, ["order_id","customer_id","product","quantity","price","status","order_date"])

In [0]:
# Product data
products = [
    ("P001", "Laptop", "Electronics"),
    ("P002", "Mobile", "Electronics"),
    ("P003", "Book", "Stationery"),
    ("P004", "Headphones", "Accessories")
]
products_df = spark.createDataFrame(products, ["product_id","product_name","category"])

In [0]:

bronze_path = "/mnt/retail360/bronze/"

customers_df.write.format("delta").mode("overwrite").save(bronze_path + "customers")
orders_day1_df.write.format("delta").mode("overwrite").save(bronze_path + "orders")
products_df.write.format("delta").mode("overwrite").save(bronze_path + "products")

In [0]:
# Load from Bronze
bronze_customers = spark.read.format("delta").load(bronze_path + "customers")
bronze_orders = spark.read.format("delta").load(bronze_path + "orders")
bronze_products = spark.read.format("delta").load(bronze_path + "products")

In [0]:
clean_orders = bronze_orders.filter((col("status") == "Completed") & (col("price").isNotNull()))

In [0]:
clean_orders = clean_orders.withColumn("total_amount", col("quantity") * col("price"))

In [0]:
silver_orders = (
    clean_orders.join(bronze_customers, "customer_id", "inner")
    .join(bronze_products, clean_orders.product == bronze_products.product_name, "inner")
    .select("order_id","name","region","product_name","category","quantity","price","total_amount","order_date")
)

silver_path = "/mnt/retail360/silver/"
silver_orders.write.format("delta").mode("overwrite").save(silver_path + "orders")


In [0]:
silver_orders_df = spark.read.format("delta").load(silver_path + "orders")

In [0]:

# Total revenue by region
revenue_by_region = silver_orders_df.groupBy("region").agg(_sum("total_amount").alias("total_revenue"))

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, desc

# Assuming your cleaned Silver layer DataFrame is called silver_orders_df
windowSpec = Window.partitionBy("category").orderBy(desc("total_amount"))

top_products = silver_orders_df.withColumn("rank", rank().over(windowSpec))

display(top_products)


order_id,name,region,product_name,category,quantity,price,total_amount,order_date,rank
1004,Arjun Rao,North,Headphones,Accessories,5,3000,15000,2024-01-17,1
1001,Arjun Rao,North,Laptop,Electronics,2,55000,110000,2024-01-15,1
1002,Sneha Patel,South,Mobile,Electronics,3,25000,75000,2024-01-16,2
1005,Sneha Patel,South,Laptop,Electronics,1,55000,55000,2024-01-18,3
1006,Rahul Sharma,East,Book,Stationery,5,700,3500,2024-01-19,1


In [0]:
orders_day2 = [
    (1005, 2, "Laptop", 1, 55000, "Completed", "2024-01-18"),
    (1006, 3, "Book", 5, 700, "Completed", "2024-01-19")
]
orders_day2_df = spark.createDataFrame(orders_day2, ["order_id","customer_id","product","quantity","price","status","order_date"])


In [0]:
# Write incremental into Bronze
orders_day2_df.write.format("delta").mode("append").save(bronze_path + "orders")

# MERGE into Silver (Upsert)
from delta.tables import DeltaTable

silver_table = DeltaTable.forPath(spark, silver_path + "orders")

clean_day2 = orders_day2_df.filter(col("status") == "Completed").withColumn("total_amount", col("quantity") * col("price"))

updated_df = (
    clean_day2.join(bronze_customers, "customer_id", "inner")
    .join(bronze_products, clean_day2.product == bronze_products.product_name, "inner")
    .select("order_id","name","region","product_name","category","quantity","price","total_amount","order_date")
)

silver_table.alias("tgt").merge(
    updated_df.alias("src"),
    "tgt.order_id = src.order_id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()


DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
# Show historical version
gold_df_v1 = spark.read.format("delta").option("versionAsOf", 0).load(
    gold_path + "top_products"
)
display(gold_df_v1)

# Rollback / Time Travel
display(
    spark.sql(f"SELECT * FROM delta.`{gold_path}top_products` VERSION AS OF 0")
)

# VACUUM safely
spark.sql(f"VACUUM delta.`{gold_path}top_products` RETAIN 168 HOURS")  # 7 days retention


product_name,total_sales,rank
Laptop,110000,1
Mobile,75000,2
Headphones,15000,3


product_name,total_sales,rank
Laptop,110000,1
Mobile,75000,2
Headphones,15000,3


DataFrame[path: string]

In [0]:
from pyspark.sql.functions import col, sum as _sum

# Load bronze tables
bronze_orders = spark.read.format("delta").load("/mnt/retail360/bronze/orders")
bronze_customers = spark.read.format("delta").load("/mnt/retail360/bronze/customers")

# Join orders with customers to get the region column
orders_with_region = bronze_orders.join(
    bronze_customers, on="customer_id", how="inner"
)

# Filter completed orders and compute total_amount
silver_orders = orders_with_region.filter(col("status") == "Completed") \
    .withColumn("total_amount", col("quantity") * col("price"))

# Aggregate revenue by region
gold_sales_summary = silver_orders.groupBy("region") \
    .agg(_sum("total_amount").alias("total_revenue"))

display(gold_sales_summary)


region,total_revenue
North,125000
South,130000
East,3500


In [0]:
#Optimize and Z-Order for performance
spark.sql(f"OPTIMIZE delta.`{silver_path}orders` ZORDER BY (region, product_name)")

print("✅ Retail360 Azure Databricks Pipeline Executed Successfully!")

✅ Retail360 Azure Databricks Pipeline Executed Successfully!
