In [0]:
%sql
CREATE VOLUME IF NOT EXISTS workspace.ecommerce.bronze;
CREATE VOLUME IF NOT EXISTS workspace.ecommerce.silver;
CREATE VOLUME IF NOT EXISTS workspace.ecommerce.gold;


In [0]:
%sql 
SHOW VOLUMES IN workspace.ecommerce;

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, to_timestamp

# Step 1: Read raw CSV files (Bronze ingestion)
df = spark.read.csv(
    [
        "/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv",
        "/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv"
    ],
    header=True,
    inferSchema=True
)

# Step 2: Cast event_time column to timestamp
raw = df.withColumn(
    "event_time",   #DataFrame me event_time column ko update (change) karna.
    to_timestamp(col("event_time"))   #event_time ko TEXT se DATE-TIME banana
)

# Step 3: Add ingestion timestamp for audit and tracking
bronze = raw.withColumn(
    "ingestion_ts",  # DataFrame me ek naya column ingestion_ts add (create) karna.
     F.current_timestamp() 
)

# Step 4: Write data to Bronze layer in Delta format (append only)
bronze.write.format("delta") \
    .mode("append") \
    .save("/Volumes/workspace/ecommerce/ecommerce_data/bronze") 


In [0]:
from pyspark.sql import functions as F

# Read Bronze layer
bronze = (
    spark.read
    .format("delta")
    .load("/Volumes/workspace/ecommerce/ecommerce_data/bronze")
)

# Silver transformations
silver = (
    bronze
    .filter(F.col("price").isNotNull())
    .filter((F.col("price") > 0) & (F.col("price") < 10000))
    .dropDuplicates(["user_id", "product_id", "event_time"])
    .withColumn("event_date", F.to_date("event_time"))
    .withColumn(
        "price_tier",
        F.when(F.col("price") < 10, "budget")
         .when(F.col("price") < 50, "mid")
         .otherwise("premium")
    )
)

# Write Silver layer
(
    silver.write
    .format("delta")
    .mode("overwrite")
    .partitionBy("event_date")
    .save("/Volumes/workspace/ecommerce/ecommerce_data/silver")
)


In [0]:
from pyspark.sql import functions as F

# Read Silver layer
silver = spark.read.format("delta") \
    .load("/Volumes/workspace/ecommerce/ecommerce_data/silver")

# Gold aggregations (business metrics)
product_perf = (
    silver.groupBy("brand")
    .agg(
        F.countDistinct(
            F.when(F.col("event_type") == "view", F.col("user_id"))
        ).alias("views"),

        F.countDistinct(
            F.when(F.col("event_type") == "purchase", F.col("user_id"))
        ).alias("purchases"),

        F.sum(
            F.when(F.col("event_type") == "purchase", F.col("price"))
        ).alias("revenue")
    )
    .withColumn(
        "conversion_rate",
        F.when(
            F.col("views") > 0,
            (F.col("purchases") / F.col("views")) * 100
        ).otherwise(0)
    )
    .orderBy("brand")
)

# Write Gold layer
product_perf.write.format("delta") \ 
    .mode("overwrite") \ 
    .save("/Volumes/workspace/ecommerce/ecommerce_data/gold") 
gold = spark.read.format("delta").load("/Volumes/workspace/ecommerce/ecommerce_data/gold")
display(gold) 