In [0]:
                                                                                                                                                                                                                                                                                                                                                                                                                                        from pyspark.sql import SparkSession
from pyspark.sql.functions import col, round, countDistinct, avg, sum as _sum, desc
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

In [0]:
silver_path = "s3://dbr-tasks/silver/test_data_cleaned"
gold_path = "s3://dbr-tasks/gold/test_data"
gold_table = "uc_test.demo.gold_test_data"

In [0]:
df = spark.read.format("delta").load(silver_path)

In [0]:
# 1) Latest record per customer (by month_date)
w = Window.partitionBy("Customer_ID").orderBy(col("month_date").desc())
df_latest = df.withColumn("_rn", row_number().over(w)).filter(col("_rn") == 1).drop("_rn")
df_latest = df_latest.select(
    "Customer_ID", "Name", "Age", "Occupation", "Annual_Income", "Monthly_Inhand_Salary",
    "Num_Bank_Accounts", "Num_Credit_Card", "Interest_Rate", "loan_count", "Outstanding_Debt",
    "Credit_Utilization_Ratio", "credit_history_total_months", "Payment_Behaviour", "credit_history_years"
)

# write customer_latest
cust_latest_path = gold_path + "customer_latest"
df_latest.write.mode("overwrite").format("delta").save(cust_latest_path)
spark.sql("DROP TABLE IF EXISTS uc_test.demo.gold_customer_latest")
spark.sql(f"CREATE TABLE uc_test.demo.gold_customer_latest USING DELTA LOCATION '{cust_latest_path}'")


DataFrame[]

In [0]:
display(df_latest.limit(10))

Customer_ID,Name,Age,Occupation,Annual_Income,Monthly_Inhand_Salary,Num_Bank_Accounts,Num_Credit_Card,Interest_Rate,loan_count,Outstanding_Debt,Credit_Utilization_Ratio,credit_history_total_months,Payment_Behaviour,credit_history_years
CUS_0x1000,Alistair Barrf,18,lawyer,30625.94,2706.1616666666664,6,5,27,2.0,1562.91,27.020942138647204,133.0,high spent small value payments,11.0
CUS_0x1009,Arunah,26,mechanic,52312.68,4250.39,6,5,17,3.0,202.68,39.38209872972783,376.0,high spent medium value payments,31.0
CUS_0x100b,Shirboni,19,,113781.38999999998,,1,4,1,,1030.2,41.88767932,194.0,high spent large value payments,16.0
CUS_0x1011,Schneyerh,44,doctor,58918.47,5208.8725,3,3,17,3.0,473.14,26.560936818707447,194.0,high spent medium value payments,16.0
CUS_0x1013,Cameront,44,mechanic,98620.98,7962.415000000001,3,3,6,3.0,1233.51,39.74657249590416,218.0,!@9#%8,18.0
CUS_0x1015,Holtono,28,journalist,46951.02,3725.585,7,4,16,,340.22,30.356054833296103,261.0,high spent medium value payments,21.0
CUS_0x1018,Felsenthalq,16,accountant,61194.81,5014.5675,7,7,840,7.0,2773.09,25.340918865493265,175.0,high spent small value payments,14.0
CUS_0x1026,Josephv,52,manager,170614.28,14463.856666666668,2,1320,9,2.0,849.69,39.14976364459192,,high spent medium value payments,
CUS_0x102d,Neil Chatterjeex,31,entrepreneur,89064.52,7256.043333333334,5,3,1,1.0,648.36,31.828242267790905,367.0,high spent large value payments,30.0
CUS_0x102e,Rhysn,26,scientist,50807.44,4197.953333333334,8,4,11,3.0,869.59,37.82838175144648,279.0,high spent small value payments,23.0


In [0]:
from pyspark.sql.functions import when, col, countDistinct, round, avg, sum as _sum

monthly = df.groupBy("Month", "month_date").agg(
    countDistinct("Customer_ID").alias("num_customers"),
    round(avg("Outstanding_Debt"), 2).alias("avg_outstanding_debt"),
    round(avg("Credit_Utilization_Ratio"), 2).alias("avg_credit_utilization"),
    round(avg("Annual_Income"), 2).alias("avg_annual_income"),
    (
        100.0 * _sum(when(col("Num_of_Delayed_Payment") > 0, 1).otherwise(0)) / countDistinct("Customer_ID")
    ).alias("delinquency_rate_pct")
).orderBy("month_date")

monthly_path = gold_path + "monthly_metrics"
monthly.write.mode("overwrite").format("delta").save(monthly_path)
spark.sql("DROP TABLE IF EXISTS uc_test.demo.gold_monthly_metrics")
spark.sql(f"CREATE TABLE uc_test.demo.gold_monthly_metrics USING DELTA LOCATION '{monthly_path}'")

DataFrame[]

In [0]:
display(monthly.limit(5))

Month,month_date,num_customers,avg_outstanding_debt,avg_credit_utilization,avg_annual_income,delinquency_rate_pct
September,2025-09-01,12500,1426.22,32.23,153943.58,87.992
October,2025-10-01,12500,1426.22,32.33,162200.49,87.608
November,2025-11-01,12500,1426.22,32.26,165527.79,88.576
December,2025-12-01,12500,1426.22,32.29,183664.96,87.952


In [0]:
# 3) Occupation / Payment behaviour aggregated metrics
occupation_metrics = (df.groupBy("Occupation")
                      .agg(
                          countDistinct("Customer_ID").alias("customers"),
                          round(avg("Outstanding_Debt"),2).alias("avg_outstanding_debt"),
                          round(avg("Credit_Utilization_Ratio"),2).alias("avg_credit_utilization")
                      ).orderBy(desc("customers")))
occ_path = gold_path + "occupation_metrics"
occupation_metrics.write.mode("overwrite").format("delta").save(occ_path)
spark.sql("DROP TABLE IF EXISTS uc_test.demo.gold_occupation_metrics")
spark.sql(f"CREATE TABLE uc_test.demo.gold_occupation_metrics USING DELTA LOCATION '{occ_path}'")


DataFrame[]

In [0]:
display(occupation_metrics.limit(5))

Occupation,customers,avg_outstanding_debt,avg_credit_utilization
,3101,1455.38,32.31
lawyer,887,1352.06,32.22
engineer,858,1444.23,32.3
architect,853,1335.06,32.3
mechanic,847,1488.22,32.37


In [0]:
# 4) Top customers by outstanding debt (top 50)
top_customers = df_latest.select("Customer_ID", "Name", "Outstanding_Debt", "Credit_Utilization_Ratio", "Annual_Income") \
    .orderBy(desc("Outstanding_Debt")).limit(50)
top_path = gold_path + "top_customers"
top_customers.write.mode("overwrite").format("delta").save(top_path)
spark.sql("DROP TABLE IF EXISTS uc_test.demo.gold_top_customers")
spark.sql(f"CREATE TABLE uc_test.demo.gold_top_customers USING DELTA LOCATION '{top_path}'")

DataFrame[]