### **Import packages**

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from delta.tables import DeltaTable

### **Define paths**

In [0]:
silver_path = "abfss://silver@stfinancedev.dfs.core.windows.net/s_loans"
gold_path = "abfss://gold@stfinancedev.dfs.core.windows.net/dim_loans"

### **Create delta table schema**

In [0]:
spark.sql(f"""
          CREATE TABLE IF NOT EXISTS finance_cata.gold.dim_loans(
            Loankey BIGINT GENERATED ALWAYS AS IDENTITY(START WITH 1 INCREMENT BY 1),
            LoanID STRING,
            CustomerID STRING,
            AccountID STRING,
            LoanType STRING,
            PrincipalAmount DECIMAL(10,2),
            InterestRate DECIMAL(10,2),
            LoanStartDateTime TIMESTAMP,
            LoanEndDateTime TIMESTAMP,
            LoanStatus STRING,
            TenureMonths INT,
            StartDate TIMESTAMP,
            EndDate TIMESTAMP,
            IsActive BOOLEAN           
          )USING DELTA 
          LOCATION "{gold_path}"
          """)

### **Load silver data**

In [0]:
silver_df = spark.read.format("delta").load(silver_path)

### **Add SCD 2 columns**

In [0]:
silver_df = silver_df.withColumn("StartDate", current_timestamp())\
                     .withColumn("EndDate", lit(None).cast("timestamp"))\
                     .withColumn("IsActive", lit(True))

In [0]:
silver_df.display()

### **Load gold data**

In [0]:
if DeltaTable.isDeltaTable(spark, gold_path):
    dim_loans = DeltaTable.forPath(spark, gold_path)
else:
    # First-time load 
    silver_df.write.format("delta").mode("append").save(gold_path)
    dim_loans = DeltaTable.forPath(spark, gold_path)

### **Delta merge & Load data in gold layer**

In [0]:
(
    dim_loans.alias("t")
    .merge(
        silver_df.alias("s"),
        "t.LoanID = s.LoanID AND t.IsActive = true"
    )
    .whenMatchedUpdate(
        condition="""
            t.LoanID <> s.LoanID OR
            t.CustomerID <> s.CustomerID OR
            t.AccountID <> s.AccountID OR
            t.LoanType <> s.LoanType OR
            t.PrincipalAmount <> s.PrincipalAmount OR
            t.InterestRate <> s.InterestRate OR
            t.LoanStartDateTime <> s.LoanStartDateTime OR
            t.LoanEndDateTime <> s.LoanEndDateTime OR
            t.LoanStatus <> s.LoanStatus OR
            t.TenureMonths <> s.TenureMonths
        """,
        set={
            "EndDate": current_timestamp(),
            "IsActive": 'False'
        }
    )
    .whenNotMatchedInsert(
        values={
            "LoanID": "s.LoanID",
            "CustomerID": "s.CustomerID",
            "AccountID": "s.AccountID",
            "LoanType": "s.LoanType",
            "PrincipalAmount": "s.PrincipalAmount",
            "InterestRate": "s.InterestRate",
            "LoanStartDateTime": "s.LoanStartDateTime",
            "LoanEndDateTime": "s.LoanEndDateTime",
            "LoanStatus": "s.LoanStatus",
            "TenureMonths": "s.TenureMonths",
            "StartDate": "s.StartDate",
            "EndDate": "s.EndDate",
            "IsActive": "s.IsActive"
        }
    )
    .execute()
)

In [0]:
%sql
select * from finance_cata.gold.dim_loans