In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window


In [0]:
bronze_path_customers = "<location>"

df_bronze_customers = spark.read.format("delta").load(bronze_path_customers)


In [0]:
source_system = "customers"
business_key = "customer_id"
load_timestamp = F.current_timestamp()


In [0]:
schema_contract = {
    "customer_id": "int",
    "first_name": "string",
    "last_name": "string",
    "email": "string",
    "phone": "string",
    "city": "string",
    "state": "string",
    "signup_date": "date"
}


In [0]:
mapping_rules = {
    "first_name":   lambda c: F.initcap(F.trim(F.col(c))),
    "last_name":    lambda c: F.initcap(F.trim(F.col(c))),
    "email":        lambda c: F.lower(F.trim(F.col(c))),
    "phone":        lambda c: F.trim(F.col(c)),
    "city":         lambda c: F.initcap(F.trim(F.col(c))),
    "state":        lambda c: F.upper(F.trim(F.col(c))),
    "signup_date": lambda c: F.coalesce(
        F.try_to_date(F.col(c), "yyyy-MM-dd"),
        F.try_to_date(F.col(c), "MM/dd/yyyy"),
        F.try_to_date(F.col(c), "M/d/yyyy"),
        F.try_to_date(F.col(c), "dd/MM/yyyy"),
        F.try_to_date(F.col(c), "d/M/yyyy")
    )

}



In [0]:
df_transformed = df_bronze_customers

for col_name, rule in mapping_rules.items():
    df_transformed = df_transformed.withColumn(col_name, rule(col_name))


In [0]:
for col_name, data_type in schema_contract.items():
    df_transformed = df_transformed.withColumn(col_name, F.col(col_name).cast(data_type))


In [0]:
window_spec = Window.partitionBy(business_key).orderBy(F.col("raw_ingestion_timestamp").desc())

df_deduped = (
    df_transformed
    .withColumn("rn", F.row_number().over(window_spec))
    .filter("rn = 1")
    .drop("rn")
)


In [0]:
df_silver_customers = (
    df_deduped
    .withColumn("silver_load_timestamp", load_timestamp)
    .withColumn("silver_source_system", F.lit(source_system))
)


In [0]:
silver_path_customers = "<location>"

df_silver_customers.write.format("delta").mode("overwrite").save(silver_path_customers)


In [0]:
spark.read.format("delta").load(silver_path_customers).show()

+-----------+----------+---------+--------------------+--------------------+---------------+-----+-----------+--------------------+-----------------------+---------------------+--------------------+
|customer_id|first_name|last_name|               email|               phone|           city|state|signup_date|       raw_file_path|raw_ingestion_timestamp|silver_load_timestamp|silver_source_system|
+-----------+----------+---------+--------------------+--------------------+---------------+-----+-----------+--------------------+-----------------------+---------------------+--------------------+
|          1|    Andrew|    Smith|                NULL|          5486653565|   Nicholasland|   MP| 2024-10-04|abfss://chatgptpr...|   2026-02-04 23:47:...| 2026-02-05 00:04:...|           customers|
|          2|      Ruth|    Walsh|  mary50@example.net|001-898-507-5175x609|   Port Antonio|   IA| 2023-03-14|abfss://chatgptpr...|   2026-02-04 23:47:...| 2026-02-05 00:04:...|           customers|
|    