### **Reading raw ingested data from Bronze layer**

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
df_customer = spark.read.format("parquet")\
                   .load("abfss://bronze@stretailenvdev.dfs.core.windows.net/f_Customers")


In [0]:
df_customer.display()

### **Incremental Loading in silver layer**

In [0]:


silver_path = "abfss://silver@stretailenvdev.dfs.core.windows.net/s_Customers"
from delta.tables import DeltaTable

if DeltaTable.isDeltaTable(spark, silver_path):
    last_max_ts = (spark.read.format("delta").load(silver_path)
                   .agg(max("RegistrationDate").alias("max_ts"))
                   .first()["max_ts"])
else:
    last_max_ts = None


In [0]:
if last_max_ts:
    df_inc = df_customer.filter(col("RegistrationDate") > lit(last_max_ts))
else:
    df_inc = df_customer  # full load for first time


### **Data Transformation**

In [0]:
df_inc.printSchema()

**Data Cleaning**

**Handling Duplicate**

In [0]:
window = Window.partitionBy("CustomerID").orderBy(col("RegistrationDate").desc())

df_inc = df_inc.withColumn("rn", row_number().over(window)).filter(col("rn") == 1).drop("rn")


**Handling Nulls**

In [0]:
df_inc = df_inc.filter(col("CustomerID").isNotNull())

In [0]:


df_cust = (df_inc
    .fillna({
        "FirstName": "Unknown",
        "LastName": "Unknown",
        "email": "N/A",
        "Phone": "N/A",
        "Address": "Unknown",
        "City": "Unknown",
        "State": "Unknown",
        "Pincode": 0
    })
    .withColumn("DateOfBirth", coalesce("DateOfBirth", lit(None).cast("date")))
    .withColumn("RegistrationDate", coalesce("RegistrationDate", current_date()))
)


**Replace line breaks (\n, \r) with a space to avoid multi-line issues.**

In [0]:
df_cust.limit(10).display()

In [0]:


df_cust = df_cust.withColumn(
    "Address",
    regexp_replace(col("Address"), r'[\r\n]+', ' ')
)


In [0]:
df_cust.limit(10).display()

**Standardizing Schema**

In [0]:
silver_ready = (df_cust
    .withColumn("CustomerID", col("CustomerID").cast("string"))
    .withColumn("email", lower(col("email")))
    .withColumn("RegistrationDate", to_timestamp("RegistrationDate"))
)


### **Implementing Upsert (MERGE) logic in the Silver Delta table**

In [0]:
if DeltaTable.isDeltaTable(spark, silver_path):
    silver_tbl = DeltaTable.forPath(spark, silver_path)
    (
        silver_tbl.alias("tgt")
        .merge(
            silver_ready.alias("src"),
            "tgt.CustomerID = src.CustomerID"  # Business key
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )
else:
    silver_ready.write.format("delta").mode("overwrite").save(silver_path)
