In [0]:
#importing the data
sales_transactions_raw = spark.read.option("header", True) \
    .csv("/Volumes/main/default/data_files_scenario-3/sales_transactions_jan_2025.csv")
product_master_raw = spark.read.option("header", True) \
    .csv("/Volumes/main/default/data_files_scenario-3/product_master.csv")
store_master_raw = spark.read.option("header", True) \
    .csv("/Volumes/main/default/data_files_scenario-3/store_master.csv")


In [0]:
display(sales_transactions_raw.head())
display(product_master_raw.head())
display(store_master_raw.head())

Row(transaction_id='6e8944c8-2358-4066-b47a-f9fb5cfaaca8', transaction_timestamp='2025-01-01 06:32:00', store_id='STR007', product_id='PRD104', quantity='2', unit_price='979.15', discount='0', total_amount='1958.3', currency='EUR')

Row(product_id='PRD101', product_name='iPhone 14', category='Electronics', brand='Apple', standard_price='999.0')

Row(store_id='STR001', store_name='Hyderabad Central', region='South', country='India')

In [0]:
#takes the raw csv data and creates delta table
#Bronze layer
sales_transactions_raw.write.format("delta").mode("overwrite") \
    .saveAsTable("main.default.bronze_sales_s3")

product_master_raw.write.format("delta").mode("overwrite") \
    .saveAsTable("main.default.bronze_product_s3")

store_master_raw.write.format("delta").mode("overwrite") \
    .saveAsTable("main.default.bronze_store_s3")

In [0]:
#Read the bronze tables
from pyspark.sql import functions as F
from pyspark.sql.window import Window

bronze_sales = spark.table("main.default.bronze_sales_s3")
bronze_products = spark.table("main.default.bronze_product_s3")
bronze_stores = spark.table("main.default.bronze_store_s3")


In [0]:
#Adding the watermark logic so we dont have to reprocess the data every time a new transaction adds
from pyspark.sql import functions as F

last_processed_ts = None

if spark.catalog.tableExists("main.default.silver_sales_s3"):
    last_processed_ts = spark.sql("""
        SELECT max(transaction_timestamp)
        FROM main.default.silver_sales_s3
    """).collect()[0][0]
if last_processed_ts:
    bronze_sales = bronze_sales.filter(
        F.to_timestamp("transaction_timestamp") > F.lit(last_processed_ts)
    )


In [0]:
from pyspark.sql import functions as F

silver_casted = (
    bronze_sales
    .withColumn("quantity", F.col("quantity").cast("int"))
    .withColumn("unit_price", F.col("unit_price").cast("double"))
    .withColumn("discount", F.col("discount").cast("double"))
    .withColumn("total_amount", F.col("total_amount").cast("double"))
    .withColumn(
        "transaction_timestamp",
        F.to_timestamp("transaction_timestamp")
    )
)


In [0]:
#Handlng the null values
silver_casted = (
    silver_casted.filter(
        F.col("transaction_id").isNotNull() &
        F.col("transaction_timestamp").isNotNull() &
        F.col("store_id").isNotNull() &
        F.col("product_id").isNotNull()
    )
)

In [0]:
#SILVER LAYER
#finding the total_amount
silver_prepared = (
    silver_casted
    .withColumn(
        "transaction_timestamp",
        F.to_utc_timestamp("transaction_timestamp", "UTC")
    )
    .withColumn(
        "total_amount",
        F.round(
            F.col("quantity") * F.col("unit_price") - F.col("discount"), 2
        )
    )
)


In [0]:
#DQ quarantine code
dq_invalid = silver_prepared.filter(
    (F.col("quantity") <= 0) |
    (F.col("unit_price") <= 0) |
    (F.col("total_amount") < 0)
)


In [0]:
#removing invalid data
dq_valid = silver_prepared.filter(
    (F.col("quantity") > 0) &
    (F.col("unit_price") > 0) &
    (F.col("total_amount") >= 0)
)

invalid_store = (
    dq_valid
    .join(
        bronze_stores.select("store_id"),
        on="store_id",
        how="left_anti"
    )
)



In [0]:
#removes invalid store_id (in the data we have STR999 which is an invalid store, so we remove it)
silver_valid = (
    dq_valid
    .join(
        bronze_products.select("product_id"),
        on="product_id",
        how="inner"
    )
    .join(
        bronze_stores.select("store_id", "region", "country"),
        on="store_id",
        how="inner"
    )
)

In [0]:
display(silver_valid)

store_id,product_id,transaction_id,transaction_timestamp,quantity,unit_price,discount,total_amount,currency,region,country
STR007,PRD104,6e8944c8-2358-4066-b47a-f9fb5cfaaca8,2025-01-01T06:32:00.000Z,2,979.15,0.0,1958.3,EUR,Europe,Germany
STR006,PRD102,5a54ddad-1d0a-441b-8cc9-7e309c409dc6,2025-01-01T04:53:00.000Z,5,533.05,0.0,2665.25,EUR,Europe,UK
STR005,PRD107,c930ad8a-b0e4-4f8c-9142-c985aea6f06b,2025-01-01T12:47:00.000Z,4,1189.88,0.0,4759.52,INR,West,USA
STR001,PRD105,64110360-7a32-461d-b0cd-34a010873ff1,2025-01-01T20:31:00.000Z,2,1176.87,20.0,2333.74,EUR,South,India
STR006,PRD109,c58bbeb5-5b7a-4ee8-85ba-0e8a27322a8c,2025-01-01T14:41:00.000Z,3,133.43,20.0,380.29,INR,Europe,UK
STR001,PRD105,b05f7cf2-d5d7-409e-9db7-63f9dfd16895,2025-01-01T16:49:00.000Z,3,838.26,0.0,2514.78,INR,South,India
STR008,PRD103,980273d3-85ff-4af0-b450-68e00a86740d,2025-01-01T10:09:00.000Z,5,1425.77,50.0,7078.85,EUR,APAC,Singapore
STR008,PRD101,ed6d7c72-ec1a-437f-9adf-5d000b2f2c9e,2025-01-01T12:02:00.000Z,2,573.7,10.0,1137.4,USD,APAC,Singapore
STR003,PRD106,1006b00d-8252-4ba6-9309-60de4a2809c8,2025-01-01T03:51:00.000Z,2,332.5,10.0,655.0,USD,West,India
STR006,PRD108,92eb1964-df8f-4f43-a5a1-6a56f39f4e42,2025-01-01T05:44:00.000Z,5,941.54,50.0,4657.7,EUR,Europe,UK


In [0]:
#removing duplicates of each repeated transaction and use only its recent transaction data
window_spec = (
    Window
    .partitionBy("transaction_id")
    .orderBy(F.col("transaction_timestamp").desc())
)

silver_deduped = (
    silver_valid
    .withColumn("row_num", F.row_number().over(window_spec))
    .filter(F.col("row_num") == 1)
    .drop("row_num")
)

In [0]:
silver_deduped.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("main.default.silver_sales_s3")


In [0]:
#writing quarantine tables
dq_invalid.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("main.default.silver_sales_dq_quarantine_s3")

invalid_store.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("main.default.silver_sales_invalid_store_s3")

In [0]:
display(spark.table("main.default.silver_sales_dq_quarantine_s3"))

transaction_id,transaction_timestamp,store_id,product_id,quantity,unit_price,discount,total_amount,currency
88e76830-9a92-4228-8db7-b76e05817f60,2025-01-01T11:53:00.000Z,STR999,PRD109,-1,1453.23,10.0,-1463.23,INR
79408888-ce1c-4d4d-aaee-179c69614c18,2025-01-01T20:18:00.000Z,STR999,PRD101,-1,1380.77,20.0,-1400.77,USD
7e556afc-cd98-4f43-a0e6-de0b20979521,2025-01-01T07:48:00.000Z,STR010,PRD105,-2,1468.15,10.0,-2946.3,EUR
8be760bf-e7bd-486a-83fe-7f31443d15d0,2025-01-01T22:38:00.000Z,STR003,PRD105,-3,383.66,20.0,-1170.98,INR
065ec6a1-a5bc-4c34-b08a-1af04134f633,2025-01-01T09:50:00.000Z,STR008,PRD107,-1,805.73,20.0,-825.73,USD
da726b3c-ce6f-439f-a014-f302a574db2d,2025-01-01T04:54:00.000Z,STR003,PRD110,-4,987.03,50.0,-3998.12,INR
383f8ae3-fee3-4c42-8757-88c7e90fced5,2025-01-01T03:38:00.000Z,STR006,PRD101,-5,930.45,10.0,-4662.25,USD
f687df60-7113-49df-a8a9-89b9675e94ea,2025-01-01T09:22:00.000Z,STR002,PRD107,-5,1100.16,50.0,-5550.8,EUR
e1494b5d-d340-4e76-8b20-c64c30f7077c,2025-01-01T02:54:00.000Z,STR004,PRD109,-1,76.95,10.0,-86.95,USD
556778eb-1caa-4676-8c08-08f662d6343e,2025-01-01T03:55:00.000Z,STR002,PRD101,-5,167.0,0.0,-835.0,EUR


In [0]:
#Gold layer
#analysis about daily sales
silver_sales = spark.table("main.default.silver_sales_s3")

gold_daily_sales = (
    silver_sales
    .withColumn("sales_date", F.to_date("transaction_timestamp"))
    .groupBy("sales_date", "region", "country")
    .agg(
        F.sum("total_amount").alias("total_revenue"),
        F.countDistinct("transaction_id").alias("transactions"),
        F.sum("quantity").alias("units_sold")
    )
    .withColumn(
        "avg_order_value",
        F.round(F.col("total_revenue") / F.col("transactions"), 2)
    )
)

gold_daily_sales.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("main.default.gold_daily_sales_s3")


In [0]:
display(silver_sales)

store_id,product_id,transaction_id,transaction_timestamp,quantity,unit_price,discount,total_amount,currency,region,country
STR002,PRD104,0009c93e-ce32-486b-8640-c36e31749af8,2025-01-22T21:32:00.000Z,2,1275.03,50.0,2500.06,INR,South,India
STR002,PRD104,000abdb8-45af-4cff-8f60-2cca9dafe67d,2025-01-20T11:36:00.000Z,5,1366.49,20.0,6812.45,INR,South,India
STR010,PRD107,000ee135-f3df-43f2-ab5c-125dd611f344,2025-01-21T02:56:00.000Z,3,1086.96,10.0,3250.88,USD,APAC,Japan
STR010,PRD105,000f460e-3689-4773-92ad-78d5299e4126,2025-01-30T06:28:00.000Z,5,1118.42,50.0,5542.1,EUR,APAC,Japan
STR009,PRD106,0015c76c-a3f2-47c1-ad4f-1c3273b660ef,2025-01-04T09:32:00.000Z,3,504.81,10.0,1504.43,USD,APAC,Australia
STR001,PRD108,00169841-3d76-4a4d-89e0-8476f2bc0545,2025-01-09T12:10:00.000Z,5,98.77,10.0,483.85,INR,South,India
STR008,PRD105,00195996-a825-4444-9640-db8a0d3aff55,2025-01-27T20:15:00.000Z,4,676.38,20.0,2685.52,INR,APAC,Singapore
STR007,PRD108,001e31f1-c07e-420d-8e71-5196a357565a,2025-01-13T18:30:00.000Z,3,785.23,0.0,2355.69,INR,Europe,Germany
STR001,PRD101,002266b5-3bea-4c60-bc6e-f019d108ca42,2025-01-17T14:53:00.000Z,2,477.95,0.0,955.9,EUR,South,India
STR007,PRD110,0029d0a1-fe2b-41ce-96d4-33b86e8fd64d,2025-01-06T01:41:00.000Z,2,264.82,0.0,529.64,EUR,Europe,Germany


In [0]:
display(gold_daily_sales)

sales_date,region,country,total_revenue,transactions,units_sold,avg_order_value
2025-01-22,South,India,363837.6499999997,152,463,2393.67
2025-01-20,South,India,356941.44000000006,152,457,2348.3
2025-01-21,APAC,Japan,188628.41999999995,78,247,2418.31
2025-01-30,APAC,Japan,139447.30000000002,64,191,2178.86
2025-01-04,APAC,Australia,162506.4100000001,70,211,2321.52
2025-01-09,South,India,365979.67999999993,152,466,2407.76
2025-01-27,APAC,Singapore,144371.12000000002,67,204,2154.79
2025-01-13,Europe,Germany,172302.83999999997,67,212,2571.68
2025-01-17,South,India,380191.8399999999,152,470,2501.26
2025-01-06,Europe,Germany,161827.1,71,209,2279.25


In [0]:
#analysis about monthly product performance
silver_sales = spark.table("main.default.silver_sales_s3")
product_master = spark.table("main.default.bronze_product_s3")

gold_monthly_product_performance = (
    silver_sales
    .join(
        product_master.select("product_id", "product_name", "category"),
        on="product_id",
        how="inner"
    )
    .withColumn("month", F.date_format("transaction_timestamp", "yyyy-MM"))
    .groupBy("month", "product_id", "product_name", "category")
    .agg(
        F.sum("total_amount").alias("revenue"),
        F.sum("quantity").alias("units_sold")
    )
    .withColumn(
        "avg_selling_price",
        F.round(F.col("revenue") / F.col("units_sold"), 2)
    )
)

gold_monthly_product_performance.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("main.default.gold_monthly_product_performance_s3")


In [0]:
display(spark.table("main.default.gold_monthly_product_performance_s3"))

month,product_id,product_name,category,revenue,units_sold,avg_selling_price
2025-01,PRD109,HP Pavilion,Electronics,4947584.149999999,6287,786.95
2025-01,PRD108,Dell XPS 13,Electronics,4656984.38000001,6123,760.57
2025-01,PRD106,Levi's Jeans,Apparel,5124241.010000004,6542,783.28
2025-01,PRD104,Nike Air Max,Sportswear,4889665.359999999,6232,784.61
2025-01,PRD107,Sony Headphones,Electronics,5039678.880000005,6492,776.29
2025-01,PRD105,Adidas Ultraboost,Sportswear,4803211.660000002,6321,759.88
2025-01,PRD110,Apple Watch,Wearables,5030212.23,6589,763.43
2025-01,PRD103,AirPods Pro,Accessories,4847516.060000007,6327,766.16
2025-01,PRD102,Galaxy S23,Electronics,5002204.8199999975,6568,761.6
2025-01,PRD101,iPhone 14,Electronics,5003427.279999996,6459,774.64


In [0]:
#revenue by regional sales summary
from pyspark.sql import functions as F

silver_sales = spark.table("main.default.silver_sales_s3")
gold_region_sales_summary = (
    silver_sales
    .groupBy("region", "country")
    .agg(
        F.sum("total_amount").alias("total_revenue"),
        F.countDistinct("transaction_id").alias("total_transactions"),
        F.sum("quantity").alias("total_units_sold")
    )
    .withColumn(
        "avg_transaction_value",
        F.round(
            F.col("total_revenue") / F.col("total_transactions"), 2
        )
    )
)

gold_region_sales_summary.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("main.default.gold_region_sales_summary_s3")

In [0]:
display(spark.table("main.default.gold_region_sales_summary_s3"))

region,country,total_revenue,total_transactions,total_units_sold,avg_transaction_value
West,India,5013076.549999993,2159,6507,2321.94
South,India,9918290.199999962,4337,13023,2286.9
APAC,Australia,4720895.249999995,2039,6128,2315.3
APAC,Japan,4835172.909999994,2088,6257,2315.7
East,USA,5149536.399999999,2200,6687,2340.7
West,USA,4877316.310000004,2121,6317,2299.54
Europe,Germany,4870998.239999991,2091,6320,2329.51
APAC,Singapore,4957390.429999991,2159,6455,2296.15
Europe,UK,5002049.540000001,2082,6246,2402.52


In [0]:
%sql
--demonstrating time travel
DESCRIBE HISTORY main.default.silver_sales_s3;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
2,2025-12-21T18:00:31.000Z,70588280163463,aspranavi1110@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(4054221922185729),1221-175710-7z9hfgf3-v2n,1.0,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 1, numRemovedBytes -> 1140886, numDeletionVectorsRemoved -> 0, numOutputRows -> 21276, numOutputBytes -> 1140886)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
1,2025-12-21T14:50:34.000Z,70588280163463,aspranavi1110@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(2076262709341777),1221-141552-yk1k5gcs-v2n,0.0,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 1, numRemovedBytes -> 1140886, numDeletionVectorsRemoved -> 0, numOutputRows -> 21276, numOutputBytes -> 1140886)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
0,2025-12-21T14:42:33.000Z,70588280163463,aspranavi1110@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(2076262709341777),1221-141552-yk1k5gcs-v2n,,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 0, numRemovedBytes -> 0, numDeletionVectorsRemoved -> 0, numOutputRows -> 21276, numOutputBytes -> 1140886)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13


In [0]:
%sql
--demonstrating version rollback
RESTORE TABLE main.default.silver_sales_s3
TO VERSION AS OF 2;

In [0]:
%sql
--verifying the rollback
SELECT COUNT(*) FROM main.default.silver_sales_s3;


COUNT(*)
21276


In [0]:
%sql
--optimizing table
OPTIMIZE main.default.silver_sales_s3;

path,metrics
,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 0, null, null, 0, 0, 1, 1, true, 0, 0, 1766341273742, 1766341274592, 8, 0, null, List(0, 0), null, 11, 11, 0, 0, null)"


In [0]:
#logging and error handling(error handling has already been implemented in the previous code)
from pyspark.sql import Row
from pyspark.sql import functions as F
from datetime import datetime

def log_pipeline_run(
    pipeline_name,
    layer,
    start_time,
    end_time,
    source_count,
    output_count,
    rejected_count,
    status,
    error_message=None
):
    log_row = [
        Row(
            pipeline_name=pipeline_name,
            layer=layer,
            start_time=start_time,
            end_time=end_time,
            source_count=source_count,
            output_count=output_count,
            rejected_count=rejected_count,
            status=status,
            error_message=error_message,
            run_timestamp=datetime.now()
        )
    ]

    spark.createDataFrame(log_row).write \
        .format("delta") \
        .mode("append") \
        .saveAsTable("main.default.pipeline_logs")
log_pipeline_run(
    pipeline_name="Retail_Analytics_Pipeline",
    layer="Silver",
    start_time=datetime.now(),
    end_time=datetime.now(),
    source_count=spark.table("main.default.bronze_sales_s3").count(),
    output_count=spark.table("main.default.silver_sales_s3").count(),
    rejected_count=spark.table("main.default.silver_sales_dq_quarantine_s3").count(),
    status="SUCCESS"
)



[0;31m---------------------------------------------------------------------------[0m
[0;31mPySparkValueError[0m                         Traceback (most recent call last)
File [0;32m<command-6707673468359202>, line 36[0m
[1;32m     17[0m     log_row [38;5;241m=[39m [
[1;32m     18[0m         Row(
[1;32m     19[0m             pipeline_name[38;5;241m=[39mpipeline_name,
[0;32m   (...)[0m
[1;32m     29[0m         )
[1;32m     30[0m     ]
[1;32m     32[0m     spark[38;5;241m.[39mcreateDataFrame(log_row)[38;5;241m.[39mwrite \
[1;32m     33[0m         [38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m) \
[1;32m     34[0m         [38;5;241m.[39mmode([38;5;124m"[39m[38;5;124mappend[39m[38;5;124m"[39m) \
[1;32m     35[0m         [38;5;241m.[39msaveAsTable([38;5;124m"[39m[38;5;124mmain.default.pipeline_logs[39m[38;5;124m"[39m)
[0;32m---> 36[0m log_pipeline_run(
[1;32m     37[0m     pipeline_name[38;5;241m=[39m[38

In [0]:
%sql
--we can now verify that the logs are now added into our pipeline_logs
SELECT * FROM main.default.pipeline_logs;


pipeline_name,layer,start_time,end_time,source_count,output_count,rejected_count,status,error_message,run_timestamp
Retail_Silver_Pipeline,Silver,2025-12-21T18:31:16.449Z,2025-12-21T18:31:27.739Z,0,0,0,FAILED,[CANNOT_DETERMINE_TYPE] Some of types cannot be determined after inferring.,2025-12-21T18:31:27.739Z
