### Creating Medallion Layer Schemas

In this step, creating separate schemas for the Medallion Architecture layers: Bronze, Silver, and Gold.

This would help organize tables cleanly, keeps raw/clean/aggregated data separated, and makes the pipeline easy to maintain.

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS ecom_bronze;
CREATE SCHEMA IF NOT EXISTS ecom_silver;
CREATE SCHEMA IF NOT EXISTS ecom_gold;

### Load Source Dataset


Loading the source dataset from the existing table default.ecommerce_transactions.

Also previewing a few records to understand the data structure and confirming the available columns.

In [0]:
events = spark.table("default.ecommerce_transactions")
display(events.limit(5))

Transaction_ID,User_Name,Age,Country,Product_Category,Purchase_Amount,Payment_Method,Transaction_Date
1,Ava Hall,63,Mexico,Clothing,780.69,Debit Card,2023-04-14
2,Sophia Hall,59,India,Beauty,738.56,PayPal,2023-07-30
3,Elijah Thompson,26,France,Books,178.34,Credit Card,2023-09-17
4,Elijah White,43,Mexico,Sports,401.09,UPI,2023-06-21
5,Ava Harris,48,Germany,Beauty,594.83,Net Banking,2024-10-29


### Inspecting Schema and Row Count

In this step, validating the source dataset by checking:

Total number of rows,Column names and Data types (schema)

In [0]:
print(events.count())
print(events.columns)
events.printSchema()

50000
['Transaction_ID', 'User_Name', 'Age', 'Country', 'Product_Category', 'Purchase_Amount', 'Payment_Method', 'Transaction_Date']
root
 |-- Transaction_ID: long (nullable = true)
 |-- User_Name: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Country: string (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Purchase_Amount: double (nullable = true)
 |-- Payment_Method: string (nullable = true)
 |-- Transaction_Date: date (nullable = true)



### BRONZE Layer: Raw Ingestion

This step creates the Bronze layer table, which stores the raw data exactly as received from the source.

We also add ingestion metadata columns like ingestion timestamp and source name for traceability and debugging.

In [0]:
from pyspark.sql.functions import current_timestamp, lit, to_date
events = spark.table("default.ecommerce_transactions")
bronze_df = (
    events
    .withColumn("ingest_ts", current_timestamp())
    .withColumn("source_name", lit("default.ecommerce_transactions"))
    .withColumn("ingest_date", to_date(current_timestamp()))
)
bronze_df.write.format("delta").mode("overwrite").saveAsTable("ecom_bronze.transactions_bronze")

In [0]:
bronze = spark.table("ecom_bronze.transactions_bronze")
print("Bronze count:", bronze.count())
display(bronze.limit(5))

Bronze count: 50000


Transaction_ID,User_Name,Age,Country,Product_Category,Purchase_Amount,Payment_Method,Transaction_Date,ingest_ts,source_name,ingest_date
1,Ava Hall,63,Mexico,Clothing,780.69,Debit Card,2023-04-14,2026-01-14T04:10:55.799Z,default.ecommerce_transactions,2026-01-14
2,Sophia Hall,59,India,Beauty,738.56,PayPal,2023-07-30,2026-01-14T04:10:55.799Z,default.ecommerce_transactions,2026-01-14
3,Elijah Thompson,26,France,Books,178.34,Credit Card,2023-09-17,2026-01-14T04:10:55.799Z,default.ecommerce_transactions,2026-01-14
4,Elijah White,43,Mexico,Sports,401.09,UPI,2023-06-21,2026-01-14T04:10:55.799Z,default.ecommerce_transactions,2026-01-14
5,Ava Harris,48,Germany,Beauty,594.83,Net Banking,2024-10-29,2026-01-14T04:10:55.799Z,default.ecommerce_transactions,2026-01-14


### Data Quality Checks on Bronze

In this step, it is important to perform basic data quality checks on the Bronze layer.

Verifying row counts, uniqueness of Transaction_ID, null checks, and invalid purchase amount checks.

This helps to ensure we understand the data quality before cleaning it in Silver.

In [0]:
from pyspark.sql import functions as F
bronze = spark.table("ecom_bronze.transactions_bronze")
display(
    bronze.select(
        F.count("*").alias("rows"),
        F.countDistinct("Transaction_ID").alias("distinct_txn_id"),
        F.sum((F.col("Transaction_ID").isNull()).cast("int")).alias("null_txn_id"),
        F.sum((F.col("Purchase_Amount").isNull()).cast("int")).alias("null_purchase_amount"),
        F.sum((F.col("Transaction_Date").isNull()).cast("int")).alias("null_transaction_date"),
        F.sum((F.col("Purchase_Amount") <= 0).cast("int")).alias("invalid_purchase_amount")
        
    )
)

rows,distinct_txn_id,null_txn_id,null_purchase_amount,null_transaction_date,invalid_purchase_amount
50000,50000,0,0,0,0


### SILVER Layer: Clean and Standardize Data

This step creates the Silver layer table by cleaning and standardizing the Bronze data.

Applying basic validation rules, trim string columns, add reusable derived columns (year/month), and remove duplicates to produce a trusted dataset for analytics.

In [0]:
from pyspark.sql import functions as F
bronze = spark.table("ecom_bronze.transactions_bronze")
silver_df = (
    bronze
    # 1) adding this basic validity filter even already 0 invalid purchase amount
    .filter(F.col("Purchase_Amount") > 0)
    # 2) standardizing strings using trim()
    .withColumn("User_Name", F.trim(F.col("User_Name")))
    .withColumn("Country", F.trim(F.col("Country")))
    .withColumn("Product_Category", F.trim(F.col("Product_Category")))
    .withColumn("Payment_Method", F.trim(F.col("Payment_Method")))
    # 3) derived columns for analytics - year and month from Transaction_Date
    .withColumn("transaction_year", F.year("Transaction_Date"))
    .withColumn("transaction_month", F.month("Transaction_Date"))
    # 4) dedupe safety - drop duplicates
    .dropDuplicates(["Transaction_ID"])
)
# Write Silver table
silver_df.write.format("delta").mode("overwrite").saveAsTable("ecom_silver.transactions_silver")

### Validate Silver Table

After creating the Silver table, validating the transformation by checking row count and previewing key columns.

This ensures our cleaned dataset is ready for Gold-level aggregations.

In [0]:
silver = spark.table("ecom_silver.transactions_silver")
print("Silver count:", silver.count())
display(silver.select("Transaction_ID","User_Name","Purchase_Amount","Transaction_Date","transaction_year","transaction_month").limit(5))

Silver count: 50000


Transaction_ID,User_Name,Purchase_Amount,Transaction_Date,transaction_year,transaction_month
271,Noah Thompson,97.84,2025-01-04,2025,1
1785,Emma Harris,55.06,2023-08-04,2023,8
1898,Sophia Thompson,323.11,2024-06-18,2024,6
2851,James Clark,179.78,2024-05-21,2024,5
7312,Elijah Rodriguez,117.59,2024-02-07,2024,2


### GOLD Layer: Daily Revenue 

This Gold table aggregates Silver level data.

Calcuating key metrics like total revenue, total transactions, and unique users per day.


In [0]:
from pyspark.sql import functions as F

silver = spark.table("ecom_silver.transactions_silver")

# 1) Daily Revenue
gold_daily_revenue = (
    silver.groupBy("Transaction_Date")
    .agg(
        F.sum("Purchase_Amount").alias("total_revenue"),
        F.count("*").alias("total_transactions"),
        F.countDistinct("User_Name").alias("unique_users")
    )
    .orderBy("Transaction_Date")
)

gold_daily_revenue.write.format("delta").mode("overwrite").saveAsTable("ecom_gold.daily_revenue")

print("daily_revenue:", spark.table("ecom_gold.daily_revenue").count())
display(spark.table("ecom_gold.daily_revenue").orderBy("Transaction_Date").limit(5))

daily_revenue: 731


Transaction_Date,total_revenue,total_transactions,unique_users
2023-03-09,34211.91999999999,64,44
2023-03-10,35190.869999999995,75,54
2023-03-11,33670.81999999999,72,51
2023-03-12,40101.44,74,48
2023-03-13,32031.16000000001,68,50


### GOLD Layer: Revenue by Product Category

In [0]:
# 2) Revenue by Product Category
gold_category_revenue = (
    silver.groupBy("Product_Category")
    .agg(
        F.sum("Purchase_Amount").alias("total_revenue"),
        F.count("*").alias("total_transactions"),
        F.countDistinct("User_Name").alias("unique_users")
    )
    .orderBy(F.desc("total_revenue"))
)

gold_category_revenue.write.format("delta").mode("overwrite").saveAsTable("ecom_gold.category_revenue")

print("category_revenue:", spark.table("ecom_gold.category_revenue").count())
display(spark.table("ecom_gold.category_revenue").limit(5))

category_revenue: 8


Product_Category,total_revenue,total_transactions,unique_users
Sports,3195335.9,6312,100
Toys,3185652.36,6392,100
Books,3181897.3,6253,100
Clothing,3171225.9599999995,6224,100
Electronics,3133965.039999998,6320,100


### GOLD Layer: Revenue by Country

In [0]:
# 3) Revenue by Country
gold_country_revenue = (
    silver.groupBy("Country")
    .agg(
        F.sum("Purchase_Amount").alias("total_revenue"),
        F.count("*").alias("total_transactions"),
        F.countDistinct("User_Name").alias("unique_users")
    )
    .orderBy(F.desc("total_revenue"))
)
gold_country_revenue.write.format("delta").mode("overwrite").saveAsTable("ecom_gold.country_revenue")
print("country_revenue:", spark.table("ecom_gold.country_revenue").count())
display(spark.table("ecom_gold.country_revenue").limit(5))


country_revenue: 10


Country,total_revenue,total_transactions,unique_users
France,2545739.1900000004,4993,100
Canada,2544335.12,5082,100
USA,2541220.22,4979,100
Mexico,2534475.6699999995,5059,100
Australia,2514911.65,4985,100
