In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# ============================================================
# 1. READ RAW FILE
# ============================================================
df_raw = (
    spark.read
        .text("<file_location>Customers.csv")
        .withColumnRenamed("value", "raw_line")
)

# ============================================================
# 2. NORMALIZE DELIMITERS → ALWAYS SEMICOLON
# ============================================================
df_norm = (
    df_raw
        .withColumn("step1", F.regexp_replace("raw_line", r"\t", ";"))
        .withColumn("step2", F.regexp_replace("step1", r"([A-Za-z]+ \d{1,2}), (\d{4})", r"\1@@\2"))
        .withColumn("step3", F.regexp_replace("step2", r",", ";"))
        .withColumn("step4", F.regexp_replace("step3", r"@@", ", "))
        .withColumn("step5", F.regexp_replace("step4", r"[^\x00-\x7F]", ""))
        .withColumn("step6", F.regexp_replace("step5", r";{2,}", ";"))
        .withColumn("clean_line", F.trim("step6"))
        .select("clean_line")
)

# ============================================================
# 3. SPLIT INTO FIELDS SAFELY
# ============================================================
df_split = df_norm.withColumn("fields", F.split("clean_line", ";"))

df_final = df_split.select(
    F.expr("get(fields, 0)").alias("customer_id"),
    F.expr("get(fields, 1)").alias("first_name"),
    F.expr("get(fields, 2)").alias("last_name"),
    F.expr("get(fields, 3)").alias("email"),
    F.expr("get(fields, 4)").alias("phone"),
    F.expr("get(fields, 5)").alias("country"),
    F.expr("get(fields, 6)").alias("created_date"),
    F.expr("get(fields, 7)").alias("status")
)

# Remove header row
df_final = df_final.filter(F.col("customer_id") != "customer_id")

# ============================================================
# 4. CLEAN + NORMALIZE ALL COLUMNS
# ============================================================
df_clean = (
    df_final
        .withColumn("email_clean", F.lower(F.trim(F.regexp_replace("email", r"[^A-Za-z0-9@._-]", ""))))
        .withColumn("phone_clean", F.regexp_replace("phone", r"[^0-9]", ""))
        .withColumn("country_clean", F.upper(F.trim("country")))
        .withColumn("status_clean", F.lower(F.trim("status")))
        .withColumn(
            "date_clean",
            F.coalesce(
                F.try_to_date("created_date", "yyyy-MM-dd"),
                F.try_to_date("created_date", "yyyy/MM/dd"),
                F.try_to_date("created_date", "dd/MM/yyyy"),
                F.try_to_date("created_date", "MM-dd-yy"),
                F.try_to_date("created_date", "MMMM dd, yyyy")
            )
        )
)

# Replace dirty created_date with clean one
df_clean = df_clean.drop("created_date").withColumnRenamed("date_clean", "created_date")

# ============================================================
# 5. SURVIVORSHIP SCORING
# ============================================================
df_scored = (
    df_clean
        .withColumn("score_email", F.when(F.col("email_clean").contains("@"), 1).otherwise(0))
        .withColumn("score_phone", F.when(F.length("phone_clean") >= 10, 1).otherwise(0))
        .withColumn("score_date", F.when(F.col("created_date").isNotNull(), 1).otherwise(0))
        .withColumn("score_status", F.when(F.col("status_clean").isin("active", "inactive"), 1).otherwise(0))
        .withColumn(
            "survivor_score",
            F.col("score_email") +
            F.col("score_phone") +
            F.col("score_date") +
            F.col("score_status")
        )
)

# ============================================================
# 6. DEDUPE → KEEP BEST ROW PER CUSTOMER
# ============================================================
w = Window.partitionBy("customer_id").orderBy(F.col("survivor_score").desc())

df_survivor = (
    df_scored
        .withColumn("rank", F.row_number().over(w))
        .filter("rank = 1")
        .drop("rank")
)

# ============================================================
# 7. RESEQUENCE CUSTOMER_ID (Option A: ALWAYS START AT C001)
# ============================================================
w2 = Window.orderBy(F.monotonically_increasing_id())

df_silver = (
    df_survivor
        .withColumn("seq", F.row_number().over(w2))
        .withColumn("customer_id", F.concat(F.lit("C"), F.lpad("seq", 3, "0")))
        .drop("seq")
)

# ============================================================
# 8. WRITE SILVER TO DELTA
# ============================================================
df_silver.write.format("delta").mode("overwrite").save(
    "<output_location>silver/customers"
)


In [0]:
%sql
SELECT * FROM workspace.default.silver_customers;


customer_id,first_name,last_name,email,phone,country,status,email_clean,phone_clean,country_clean,status_clean,created_date,date_clean
C001,xiXMfv,dJDwCL,1234567890,Canada,2026/01/26,,1234567890,,2026/01/26,,,
C002,XVywTd,uahNkT,xvywtd.uahnkt@example.com,India,2026-01-07,,xvywtd.uahnkt@example.com,,2026-01-07,,,
C003,LxhcHu,ntYxWc,invalid_email,123 456 7890,Canada,unknown,invalid_email,1234567890.0,CANADA,unknown,,
C004,ZIuxJZ,QUyhSv,invalid_email,1234567890,Canada,unknown,invalid_email,1234567890.0,CANADA,unknown,,
C005,uCoXDT,ZrLBCJ,test@example.com,+1 123 456 7890,USA,Active,test@example.com,11234567890.0,USA,active,,
C006,GvDQkh,pJDYKh,+1 123 456 7890,Canada,01-16-26,,11234567890,,01-16-26,,,
C007,YcmxLW,KhLCFq,invalid_email,UK,01-17-26,,invalid_email,,01-17-26,,,
C008,mDFmjr,eufzck,mdfmjr.eufzck@example.com,UK,2026/01/03,,mdfmjr.eufzck@example.com,,2026/01/03,,,
C009,rVJnsl,nFqBLk,123 456 7890,Canada,"1, 2",,1234567890,,"1, 2",,,
C010,uuxjzW,hGNVuK,uuxjzw.hgnvuk@example.com,123 456 7890,UK,active,uuxjzw.hgnvuk@example.com,1234567890.0,UK,active,,
