## Bronze to Silver: Data Cleansing and Transformation

In [0]:
from pyspark.sql.types import StringType, IntegerType, DateType, BooleanType
import pyspark.sql.functions as F

In [0]:
catalog_name = 'ecommerce'

In [0]:
df = spark.table(f'{catalog_name}.bronze.brz_order_items')
df.show(5)

+----------+-------------------+----------------+--------+--------+-------------+--------+-------------------+----------+------------+----------+-------+-----------+--------------------+--------------------+
|        dt|           order_ts|     customer_id|order_id|item_seq|   product_id|quantity|unit_price_currency|unit_price|discount_pct|tax_amount|channel|coupon_code|           file_name|    ingest_timestamp|
+----------+-------------------+----------------+--------+--------+-------------+--------+-------------------+----------+------------+----------+-------+-----------+--------------------+--------------------+
|2025-08-01|2025-08-01 22:53:52|CUST000000241190|  643611|       1|2000000028279|       1|                GBP|        11|         10%|         2|    web|       NULL|dbfs:/Volumes/eco...|2025-11-08 06:00:...|
|2025-08-01|2025-08-01 22:53:52|CUST000000241190|  643611|       2|2000000377445|       1|                GBP|         5|          4%|         1|    web|       NULL|dbf

In [0]:
df.printSchema()

root
 |-- dt: string (nullable = true)
 |-- order_ts: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- item_seq: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- unit_price_currency: string (nullable = true)
 |-- unit_price: string (nullable = true)
 |-- discount_pct: string (nullable = true)
 |-- tax_amount: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- coupon_code: string (nullable = true)
 |-- file_name: string (nullable = true)
 |-- ingest_timestamp: timestamp (nullable = true)



In [0]:
# Transformation: Drop any duplicates
df = df.dropDuplicates(["order_id", "item_seq"])

# Transformation: Convert 'Two' → 2 and cast to Integer
df = df.withColumn(
    "quantity",
    F.when(F.col("quantity") == "Two", 2).otherwise(F.col("quantity")).cast("int")
)

# Transformation : Remove any '$' or other symbols from unit_price, keep only numeric
df = df.withColumn(
    "unit_price",
    F.regexp_replace("unit_price", "[$]", "").cast("double")
)

# Transformation : Remove '%' from discount_pct and cast to double
df = df.withColumn(
    "discount_pct",
    F.regexp_replace("discount_pct", "%", "").cast("double")
)

# Transformation : coupon code processing (convert to lower)
df = df.withColumn(
    "coupon_code", F.lower(F.trim(F.col("coupon_code")))
)

# Transformation : channel processing 
df = df.withColumn(
    "channel",
    F.when(F.col("channel") == "web", "Website")
    .when(F.col("channel") == "app", "Mobile")
    .otherwise(F.col("channel")),
)

In [0]:

# Transformation: datatype conversions
# 1) Convert dt (string → date)
df = df.withColumn(
    "dt",
    F.to_date("dt", "yyyy-MM-dd")     
)

# 2) Convert order_ts (string → timestamp)
df = df.withColumn(
    "order_ts",
    F.coalesce(
        F.to_timestamp("order_ts", "yyyy-MM-dd HH:mm:ss"),  # matches 2025-08-01 22:53:52
        F.to_timestamp("order_ts", "dd-MM-yyyy HH:mm")      # fallback for 01-08-2025 22:53
    )
)


# 3) Convert item_seq (string → integer)
df = df.withColumn(
    "item_seq",
    F.col("item_seq").cast("int")
)

# 4) Convert tax_amount (string → double, strip non-numeric characters)
df = df.withColumn(
    "tax_amount",
    F.regexp_replace("tax_amount", r"[^0-9.\-]", "").cast("double")
)


#Transformation : Add processed time 
df = df.withColumn(
    "processed_time", F.current_timestamp()
)

In [0]:
display(df.limit(5))

dt,order_ts,customer_id,order_id,item_seq,product_id,quantity,unit_price_currency,unit_price,discount_pct,tax_amount,channel,coupon_code,file_name,ingest_timestamp,processed_time
2025-08-01,2025-08-01T22:53:52.000Z,CUST000000241190,643611,1,2000000028279,1,GBP,11.0,10.0,2.0,Website,,dbfs:/Volumes/ecommerce/source_data/ecommerce_store/order_items/landing/order_items_2025-08-01.csv,2025-11-08T06:00:07.678Z,2025-11-08T06:29:12.555Z
2025-08-01,2025-08-01T22:53:52.000Z,CUST000000241190,643611,2,2000000377445,1,GBP,5.0,4.0,1.0,Website,,dbfs:/Volumes/ecommerce/source_data/ecommerce_store/order_items/landing/order_items_2025-08-01.csv,2025-11-08T06:00:07.678Z,2025-11-08T06:29:12.555Z
2025-08-01,2025-08-01T09:44:32.000Z,CUST000000239553,643612,1,2000000417639,4,INR,4871.0,8.0,3245.0,Website,fest20,dbfs:/Volumes/ecommerce/source_data/ecommerce_store/order_items/landing/order_items_2025-08-01.csv,2025-11-08T06:00:07.678Z,2025-11-08T06:29:12.555Z
2025-08-01,2025-08-01T04:45:03.000Z,CUST000000175269,643613,1,2000000422664,2,AED,74.0,0.0,18.0,Mobile,,dbfs:/Volumes/ecommerce/source_data/ecommerce_store/order_items/landing/order_items_2025-08-01.csv,2025-11-08T06:00:07.678Z,2025-11-08T06:29:12.555Z
2025-08-01,2025-08-01T04:45:03.000Z,CUST000000175269,643613,2,2000000238159,1,AED,1012.0,11.0,109.0,Mobile,,dbfs:/Volumes/ecommerce/source_data/ecommerce_store/order_items/landing/order_items_2025-08-01.csv,2025-11-08T06:00:07.678Z,2025-11-08T06:29:12.555Z


In [0]:
# check the final datatypes
df.printSchema()

root
 |-- dt: date (nullable = true)
 |-- order_ts: timestamp (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- item_seq: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit_price_currency: string (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- discount_pct: double (nullable = true)
 |-- tax_amount: double (nullable = true)
 |-- channel: string (nullable = true)
 |-- coupon_code: string (nullable = true)
 |-- file_name: string (nullable = true)
 |-- ingest_timestamp: timestamp (nullable = true)
 |-- processed_time: timestamp (nullable = false)



In [0]:
# Write raw data to the silver layer (catalog: ecommerce, schema: silver, table: slv_brands)
df.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{catalog_name}.silver.slv_order_items")