In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from delta.tables import DeltaTable

In [0]:
bronze_customers_tbl = "my_catalog.bronze_retailx.customers"
silver_customers_tbl = "my_catalog.silver_retailx.customers"

In [0]:
# we usually persist watermarks in control tables, For simplicity using 'effective_from' from Silver table

# get max processed ingestion time
last_processed_ts = (
    spark.table(silver_customers_tbl)
    .select(
        max("effective_from").alias("max_ts")
    )
    .collect()[0]["max_ts"]
)

if last_processed_ts is None:
    last_processed_ts = "1900-01-01"

In [0]:
df = (
    spark.table(bronze_customers_tbl)
    .filter(col("_ingested_at") > lit(last_processed_ts))
)


In [0]:
display(df)

CUSTOMER_ID,NAME,EMAIL,CREATED_DATE,_rescued_data,_ingested_at,_source_file
5,Anil Sharama,vikram.singh@gmail.com,2025-02-15,,2025-12-14T05:58:57.340Z,/Volumes/my_catalog/raw_retailx/customers/2025/12/14/Customers%202.csv
6,Ananya Iyer,ananya.iyer@gmail.com,2025-02-18,,2025-12-14T05:58:57.340Z,/Volumes/my_catalog/raw_retailx/customers/2025/12/14/Customers%202.csv
7,Rahul Verma,rahul.verma@gmail.com,2025-03-05,,2025-12-14T05:58:57.340Z,/Volumes/my_catalog/raw_retailx/customers/2025/12/14/Customers%202.csv
8,Neha Gupta,neha.gupta@gmail.com,2025-03-12,,2025-12-14T05:58:57.340Z,/Volumes/my_catalog/raw_retailx/customers/2025/12/14/Customers%202.csv
9,Suresh Naidu,suresh.naidu@gmail.com,2025-03-20,,2025-12-14T05:58:57.340Z,/Volumes/my_catalog/raw_retailx/customers/2025/12/14/Customers%202.csv
10,Kavya Rao,kavya.rao@gmail.com,2025-03-28,,2025-12-14T05:58:57.340Z,/Volumes/my_catalog/raw_retailx/customers/2025/12/14/Customers%202.csv


In [0]:
df = df.drop("_rescued_data", "_source_file", "_ingested_at")
display(df)

CUSTOMER_ID,NAME,EMAIL,CREATED_DATE
5,Anil Sharama,vikram.singh@gmail.com,2025-02-15
6,Ananya Iyer,ananya.iyer@gmail.com,2025-02-18
7,Rahul Verma,rahul.verma@gmail.com,2025-03-05
8,Neha Gupta,neha.gupta@gmail.com,2025-03-12
9,Suresh Naidu,suresh.naidu@gmail.com,2025-03-20
10,Kavya Rao,kavya.rao@gmail.com,2025-03-28


In [0]:
df = (
    df
    .withColumn("customer_id", col("CUSTOMER_ID").cast("bigint"))
    .withColumn("name", col("NAME").cast("string"))
    .withColumn("email", col("EMAIL").cast("string"))
    .withColumn("created_date", to_timestamp(col("CREATED_DATE")))
)

In [0]:
df = (
    df
    .filter(
    col("CUSTOMER_ID").isNotNull() &
    col("NAME").isNotNull() &
    col("EMAIL").rlike("^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$")
    )
)
display(df)

customer_id,name,email,created_date
5,Anil Sharama,vikram.singh@gmail.com,2025-02-15T00:00:00.000Z
6,Ananya Iyer,ananya.iyer@gmail.com,2025-02-18T00:00:00.000Z
7,Rahul Verma,rahul.verma@gmail.com,2025-03-05T00:00:00.000Z
8,Neha Gupta,neha.gupta@gmail.com,2025-03-12T00:00:00.000Z
9,Suresh Naidu,suresh.naidu@gmail.com,2025-03-20T00:00:00.000Z
10,Kavya Rao,kavya.rao@gmail.com,2025-03-28T00:00:00.000Z


In [0]:
df = (
    df
    .withColumn("email", lower(trim("EMAIL")))
    .withColumn("name", initcap(trim("NAME")))
)

In [0]:
df = (
    df
    .withColumn(
        "rn",
        row_number().over(
            Window.partitionBy("customer_id")
            .orderBy(col("customer_id").asc())
        )
    )
    .filter("rn = 1")
    .drop("rn")
)


In [0]:
# To detect changes efficiently and avoid column-by-column comparisons
final_df = (
    df
    .withColumn(
        "record_hash",
        sha2(
            concat_ws("||", "name", "email", "created_date"),
            256
        )
    )
    .withColumn("effective_from", current_timestamp())
    .withColumn("effective_to", lit("9999-12-31").cast("timestamp"))
    .withColumn("is_current", lit(True))
)
display(final_df)

customer_id,name,email,created_date,record_hash,effective_from,effective_to,is_current
5,Anil Sharama,vikram.singh@gmail.com,2025-02-15T00:00:00.000Z,34e3a84e3bdc59c00ae5a300b7230a4c82b530e9a0bccc0f749eb91609c99720,2025-12-14T06:02:33.171Z,9999-12-31T00:00:00.000Z,True
6,Ananya Iyer,ananya.iyer@gmail.com,2025-02-18T00:00:00.000Z,6a43ce442ceb3fc5f3b554ee962f33281a577dff4e9b479120b9113c615c3bea,2025-12-14T06:02:33.171Z,9999-12-31T00:00:00.000Z,True
7,Rahul Verma,rahul.verma@gmail.com,2025-03-05T00:00:00.000Z,dd2db64a514dbbd83f9497ddb8d38574c6699519afc548f5804466e232690b9e,2025-12-14T06:02:33.171Z,9999-12-31T00:00:00.000Z,True
8,Neha Gupta,neha.gupta@gmail.com,2025-03-12T00:00:00.000Z,0adccb1185054be5564d3e53b7bb88c1fef3db31327ee6e6bea166ef18be3065,2025-12-14T06:02:33.171Z,9999-12-31T00:00:00.000Z,True
9,Suresh Naidu,suresh.naidu@gmail.com,2025-03-20T00:00:00.000Z,df0dfa777c39919511f69be2246467f9402633f96a3221f3024b8aaa2f451f18,2025-12-14T06:02:33.171Z,9999-12-31T00:00:00.000Z,True
10,Kavya Rao,kavya.rao@gmail.com,2025-03-28T00:00:00.000Z,ec5463bfaadb23d10dacbe5843b168317bd5c2a0a3d4bd49f36d425f4d910323,2025-12-14T06:02:33.171Z,9999-12-31T00:00:00.000Z,True


In [0]:
final_df.createOrReplaceTempView("temp_customers")

In [0]:
%sql

MERGE INTO my_catalog.silver_retailx.customers tgt
USING temp_customers src
ON tgt.customer_id = src.customer_id
AND tgt.is_current = true

-- Expire old record if data changed
WHEN MATCHED AND tgt.record_hash <> src.record_hash THEN
  UPDATE SET
    tgt.is_current = false,
    tgt.effective_to = current_timestamp()

-- Insert new record (new customer OR changed customer)
WHEN NOT MATCHED THEN
  INSERT (
    customer_id,
    name,
    email,
    created_date,
    effective_from,
    effective_to,
    is_current,
    record_hash
  )
  VALUES (
    src.customer_id,
    src.name,
    src.email,
    src.created_date,
    src.effective_from,
    src.effective_to,
    src.is_current,
    src.record_hash
  );


num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
6,1,0,5


In [0]:
df = spark.table("my_catalog.silver_retailx.customers")
display(df)

customer_id,name,email,created_date,effective_from,effective_to,is_current,record_hash
5,Vikram Singh,vikram.singh@gmail.com,2025-02-10,2025-12-14T05:43:53.215Z,2025-12-14T06:02:34.523Z,False,74de00213a0fa88933b0327b50663b39d7732e2fd96850aa28d1d033deb577f2
6,Ananya Iyer,ananya.iyer@gmail.com,2025-02-18,2025-12-14T06:02:34.523Z,9999-12-31T00:00:00.000Z,True,6a43ce442ceb3fc5f3b554ee962f33281a577dff4e9b479120b9113c615c3bea
7,Rahul Verma,rahul.verma@gmail.com,2025-03-05,2025-12-14T06:02:34.523Z,9999-12-31T00:00:00.000Z,True,dd2db64a514dbbd83f9497ddb8d38574c6699519afc548f5804466e232690b9e
8,Neha Gupta,neha.gupta@gmail.com,2025-03-12,2025-12-14T06:02:34.523Z,9999-12-31T00:00:00.000Z,True,0adccb1185054be5564d3e53b7bb88c1fef3db31327ee6e6bea166ef18be3065
9,Suresh Naidu,suresh.naidu@gmail.com,2025-03-20,2025-12-14T06:02:34.523Z,9999-12-31T00:00:00.000Z,True,df0dfa777c39919511f69be2246467f9402633f96a3221f3024b8aaa2f451f18
10,Kavya Rao,kavya.rao@gmail.com,2025-03-28,2025-12-14T06:02:34.523Z,9999-12-31T00:00:00.000Z,True,ec5463bfaadb23d10dacbe5843b168317bd5c2a0a3d4bd49f36d425f4d910323
1,Ravi Kumar,ravi.kumar@gmail.com,2025-01-05,2025-12-14T05:43:53.215Z,9999-12-31T00:00:00.000Z,True,233aff011eef2f0eefc9e4bab4c51e9ab7620a53ad6f9b919d6d32c970b41802
2,Sneha Reddy,sneha.reddy@gmail.com,2025-01-10,2025-12-14T05:43:53.215Z,9999-12-31T00:00:00.000Z,True,f43d5fe78399601a8c70448736ff20d70a404a261dfd0a7fe69aa3cd4c933ae8
3,Arjun Mehta,arjun.mehta@gmail.com,2025-01-15,2025-12-14T05:43:53.215Z,9999-12-31T00:00:00.000Z,True,b593d6cdcc21736cd145f24b9aca3e0e9739aff50a78e12cf2b78afb3917d23f
4,Priya Sharma,priya.sharma@gmail.com,2025-02-01,2025-12-14T05:43:53.215Z,9999-12-31T00:00:00.000Z,True,43c76d1c4235a7ecb3d8f543dd11f2ffd71c1fd1d51c6a9e8fa59d1e066d858d


In [0]:
%sql

SELECT
  COUNT(*) AS total_rows,
  COUNT(DISTINCT customer_id) AS distinct_customers,
  SUM(CASE WHEN is_current THEN 1 ELSE 0 END) AS active_records
FROM my_catalog.silver_retailx.customers;

total_rows,distinct_customers,active_records
10,10,9


In [0]:
print('Done')

Done
