## Bronze to Silver: Data Cleansing and Transformation

In [0]:
from pyspark.sql.types import StringType, IntegerType, DateType, BooleanType
import pyspark.sql.functions as F
from delta.tables import DeltaTable

#### Create Widgets

In [0]:
dbutils.widgets.text("catalog_name", "ecommerce", "Catalog Name")
dbutils.widgets.text("storage_account_name", "ecommcistorage05", "Storage Account Name")
dbutils.widgets.text("container_name", "ecomm-raw-data", "Container Name")

In [0]:
catalog_name = dbutils.widgets.get("catalog_name")
storage_account_name = dbutils.widgets.get("storage_account_name")
container_name = dbutils.widgets.get("container_name")

### Stream Bronze Table in a Dataframe

In [0]:
df = spark.readStream \
.format("delta") \
.table(f"{catalog_name}.bronze.brz_order_items")

display(df.limit(20))

dt,order_ts,customer_id,order_id,item_seq,product_id,quantity,unit_price_currency,unit_price,discount_pct,tax_amount,channel,coupon_code,_rescued_data,ingest_timestamp,source_file
2025-08-30,2025-08-30T12:46:50.000Z,CUST000000114495,676395,1,2000000136295,1,GBP,13,16%,1,app,NEW10,,2025-10-14T17:10:28.503Z,abfss://ecomm-raw-data@stgcodebadlsdevus001.dfs.core.windows.net/order_items/landing/order_items_2025-08-30.csv
2025-08-30,2025-08-30T19:00:46.000Z,CUST000000167574,676396,1,2000000125442,1,AUD,23,7%,3,web,PRIME5,,2025-10-14T17:10:28.503Z,abfss://ecomm-raw-data@stgcodebadlsdevus001.dfs.core.windows.net/order_items/landing/order_items_2025-08-30.csv
2025-08-30,2025-08-30T19:00:46.000Z,CUST000000167574,676396,2,2000000319490,1,AUD,487,4%,24,web,PRIME5,,2025-10-14T17:10:28.503Z,abfss://ecomm-raw-data@stgcodebadlsdevus001.dfs.core.windows.net/order_items/landing/order_items_2025-08-30.csv
2025-08-30,2025-08-30T21:50:52.000Z,CUST000000091703,676397,1,2000000213705,1,INR,3199,2%,158,web,FEST20,,2025-10-14T17:10:28.503Z,abfss://ecomm-raw-data@stgcodebadlsdevus001.dfs.core.windows.net/order_items/landing/order_items_2025-08-30.csv
2025-08-30,2025-08-30T23:23:09.000Z,CUST000000027113,676398,1,2000000383569,1,INR,1696,16%,72,app,,,2025-10-14T17:10:28.503Z,abfss://ecomm-raw-data@stgcodebadlsdevus001.dfs.core.windows.net/order_items/landing/order_items_2025-08-30.csv
2025-08-30,2025-08-30T04:52:11.000Z,CUST000000128388,676399,1,2000000333991,1,INR,31798,3%,5566,app,,,2025-10-14T17:10:28.503Z,abfss://ecomm-raw-data@stgcodebadlsdevus001.dfs.core.windows.net/order_items/landing/order_items_2025-08-30.csv
2025-08-30,2025-08-30T16:11:21.000Z,CUST000000105911,676400,1,2000000017921,1,INR,12964,17%,1930,app,,,2025-10-14T17:10:28.503Z,abfss://ecomm-raw-data@stgcodebadlsdevus001.dfs.core.windows.net/order_items/landing/order_items_2025-08-30.csv
2025-08-30,2025-08-30T17:17:40.000Z,CUST000000273320,676401,1,2000000275246,Two,GBP,294,0%,30,web,,,2025-10-14T17:10:28.503Z,abfss://ecomm-raw-data@stgcodebadlsdevus001.dfs.core.windows.net/order_items/landing/order_items_2025-08-30.csv
2025-08-30,2025-08-30T10:26:35.000Z,CUST000000012580,676402,1,2000000013053,Two,INR,2171,13%,455,web,,,2025-10-14T17:10:28.503Z,abfss://ecomm-raw-data@stgcodebadlsdevus001.dfs.core.windows.net/order_items/landing/order_items_2025-08-30.csv
2025-08-30,2025-08-30T10:26:35.000Z,CUST000000012580,676402,2,2000000005201,1,INR,1680,6%,286,web,,,2025-10-14T17:10:28.503Z,abfss://ecomm-raw-data@stgcodebadlsdevus001.dfs.core.windows.net/order_items/landing/order_items_2025-08-30.csv


### Perform Transformations and Cleaning

In [0]:
df = df.dropDuplicates(["order_id", "item_seq"])

# Transformation: Convert 'Two' → 2 and cast to Integer
df = df.withColumn(
    "quantity",
    F.when(F.col("quantity") == "Two", 2).otherwise(F.col("quantity")).cast("int")
)

# Transformation : Remove any '$' or other symbols from unit_price, keep only numeric
df = df.withColumn(
    "unit_price",
    F.regexp_replace("unit_price", "[$]", "").cast("double")
)

# Transformation : Remove '%' from discount_pct and cast to double
df = df.withColumn(
    "discount_pct",
    F.regexp_replace("discount_pct", "%", "").cast("double")
)

# Transformation : coupon code processing (convert to lower)
df = df.withColumn(
    "coupon_code", F.lower(F.trim(F.col("coupon_code")))
)

# Transformation : channel processing 
df = df.withColumn(
    "channel",
    F.when(F.col("channel") == "web", "Website")
    .when(F.col("channel") == "app", "Mobile")
    .otherwise(F.col("channel")),
)

#Transformation : Add processed time 
df = df.withColumn(
    "processed_time", F.current_timestamp()
)


In [0]:
display(df.limit(20))

dt,order_ts,customer_id,order_id,item_seq,product_id,quantity,unit_price_currency,unit_price,discount_pct,tax_amount,channel,coupon_code,_rescued_data,ingest_timestamp,source_file,processed_time
2025-08-30,2025-08-30T10:26:35.000Z,CUST000000012580,676402,1,2000000013053,2,INR,2171.0,13.0,455,Website,,,2025-10-14T17:10:28.503Z,abfss://ecomm-raw-data@stgcodebadlsdevus001.dfs.core.windows.net/order_items/landing/order_items_2025-08-30.csv,2025-10-14T20:11:23.810Z
2025-08-30,2025-08-30T09:23:27.000Z,CUST000000170265,676557,1,2000000111933,2,INR,1475.0,18.0,121,Mobile,,,2025-10-14T17:10:28.503Z,abfss://ecomm-raw-data@stgcodebadlsdevus001.dfs.core.windows.net/order_items/landing/order_items_2025-08-30.csv,2025-10-14T20:11:23.810Z
2025-08-30,2025-08-30T22:53:14.000Z,CUST000000264382,676571,1,2000000073903,3,AED,143.0,9.0,48,Website,save50,,2025-10-14T17:10:28.503Z,abfss://ecomm-raw-data@stgcodebadlsdevus001.dfs.core.windows.net/order_items/landing/order_items_2025-08-30.csv,2025-10-14T20:11:23.810Z
2025-08-30,2025-08-30T07:10:14.000Z,CUST000000076239,676584,2,2000000251424,1,INR,7499.0,3.0,871,Website,,,2025-10-14T17:10:28.503Z,abfss://ecomm-raw-data@stgcodebadlsdevus001.dfs.core.windows.net/order_items/landing/order_items_2025-08-30.csv,2025-10-14T20:11:23.810Z
2025-08-30,2025-08-30T06:25:37.000Z,CUST000000146122,676760,1,2000000194394,1,USD,58.0,6.0,10,Website,fest20,,2025-10-14T17:10:28.503Z,abfss://ecomm-raw-data@stgcodebadlsdevus001.dfs.core.windows.net/order_items/landing/order_items_2025-08-30.csv,2025-10-14T20:11:23.810Z
2025-08-30,2025-08-30T11:42:12.000Z,CUST000000272446,677021,2,2000000008127,1,AED,812.0,7.0,38,Mobile,,,2025-10-14T17:10:28.503Z,abfss://ecomm-raw-data@stgcodebadlsdevus001.dfs.core.windows.net/order_items/landing/order_items_2025-08-30.csv,2025-10-14T20:11:23.810Z
2025-08-30,2025-08-30T11:22:17.000Z,CUST000000212361,677092,2,2000000499420,3,USD,51.0,19.0,7,Mobile,prime5,,2025-10-14T17:10:28.503Z,abfss://ecomm-raw-data@stgcodebadlsdevus001.dfs.core.windows.net/order_items/landing/order_items_2025-08-30.csv,2025-10-14T20:11:23.810Z
2025-08-30,2025-08-30T08:30:43.000Z,CUST000000052514,677140,1,2000000135441,1,GBP,44.0,20.0,5,Mobile,,,2025-10-14T17:10:28.503Z,abfss://ecomm-raw-data@stgcodebadlsdevus001.dfs.core.windows.net/order_items/landing/order_items_2025-08-30.csv,2025-10-14T20:11:23.810Z
2025-08-30,2025-08-30T08:48:50.000Z,CUST000000235908,677156,2,2000000168272,1,INR,3980.0,7.0,186,Website,fest20,,2025-10-14T17:10:28.503Z,abfss://ecomm-raw-data@stgcodebadlsdevus001.dfs.core.windows.net/order_items/landing/order_items_2025-08-30.csv,2025-10-14T20:11:23.810Z
2025-08-30,2025-08-30T05:14:30.000Z,CUST000000191667,677331,3,2000000308555,1,USD,2.0,16.0,1,Website,,,2025-10-14T17:10:28.503Z,abfss://ecomm-raw-data@stgcodebadlsdevus001.dfs.core.windows.net/order_items/landing/order_items_2025-08-30.csv,2025-10-14T20:11:23.810Z


### Save to Silver Table

In [0]:
silver_checkpoint_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/checkpoint/silver/fact_order_items/"
print(silver_checkpoint_path)

abfss://ecomm-raw-data@stgcodebadlsdevus001.dfs.core.windows.net/checkpoint/silver/fact_order_items/


In [0]:
def upsert_to_silver(microBatchDF, batchId):
    table_name = f"{catalog_name}.silver.slv_order_items"
    if not spark.catalog.tableExists(table_name):
        print("creating new table")
        microBatchDF.write.format("delta").mode("overwrite").saveAsTable(table_name)
        spark.sql(
            f"ALTER TABLE {table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)"
        )
    else:
        deltaTable = DeltaTable.forName(spark, table_name)
        deltaTable.alias("silver_table").merge(
            microBatchDF.alias("batch_table"),
            "silver_table.order_id = batch_table.order_id AND silver_table.item_seq = batch_table.item_seq",
        ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()    

    



In [0]:
# This line is running a Structured Streaming job that:
# - Reads incremental data from Bronze (df).
# - For each batch → applies upsert_to_silver (update if exists, insert if not).
# - Writes into a Silver Delta table with schema evolution enabled.
# - Uses checkpointing for recovery.
# - Runs in batch-like mode (once or availableNow), not continuous streaming.

df.writeStream.trigger(availableNow=True).foreachBatch(
    upsert_to_silver
).format("delta").option("checkpointLocation", silver_checkpoint_path).option(
    "mergeSchema", "true"
).outputMode(
    "update"
).trigger(
    once=True
).start().awaitTermination()

25/10/14 20:24:07 Spark Server has not sent updates for Streaming Query 3cb575de-b7ad-4ff9-8de0-e9c4450b804b in60 seconds, but the query is still active. Marking query as in-progress. Spark Session ID is 949c92af-6a66-4e1a-ac3e-94306f9d5465. This is typically not a problem.
25/10/14 20:24:09 Spark Server has not sent updates for Streaming Query 3cb575de-b7ad-4ff9-8de0-e9c4450b804b in60 seconds, but the query is still active. Marking query as in-progress. Spark Session ID is 949c92af-6a66-4e1a-ac3e-94306f9d5465. This is typically not a problem.
25/10/14 20:24:09 Spark Server has not sent updates for Streaming Query 3cb575de-b7ad-4ff9-8de0-e9c4450b804b in60 seconds, but the query is still active. Marking query as in-progress. Spark Session ID is 949c92af-6a66-4e1a-ac3e-94306f9d5465. This is typically not a problem.
25/10/14 20:24:10 Spark Server has not sent updates for Streaming Query 3cb575de-b7ad-4ff9-8de0-e9c4450b804b in60 seconds, but the query is still active. Marking query as in-p