In [0]:
# Load Kafka messages (CDC data) into the Bronze layer
bronze_df = spark.read.json("/FileStore/tables/kafka_messages_checking_account.json")  # Adjust path as per your DBFS setup

# Save the raw CDC data to Bronze layer
bronze_df.write.format("delta").mode("overwrite").save("/mnt/bronze/checking_account_cdc")


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

# Load the raw CDC data from the Bronze layer
bronze_df = spark.read.format("delta").load("/mnt/bronze/checking_account_cdc")

# Flatten the data to extract relevant fields (handling 'after' for create/update events)
flattened_df = bronze_df.select(
    "payload.after.customer_id",
    "payload.after.scheduled_payment",
    "payload.after.txn_amount",
    "payload.after.debit_or_credit",
    "payload.after.updt_ts",
    "payload.after.initial_balance"
)

# Deduplicate based on customer_id and updt_ts (retain latest records)
window_spec = Window.partitionBy("customer_id", "updt_ts").orderBy(col("updt_ts").desc())
deduplicated_df = flattened_df.withColumn("row_num", row_number().over(window_spec)).filter(col("row_num") == 1).drop("row_num").dropna()

# Save the deduplicated data to Silver layer
deduplicated_df.write.format("delta").mode("overwrite").save("/mnt/silver/checking_account_consolidated")


In [0]:
display(deduplicated_df)

customer_id,scheduled_payment,txn_amount,debit_or_credit,updt_ts,initial_balance
1,901,817,1,2021-10-31 02:37:40,33873
2,8836,3642,0,2021-01-20 06:50:33,41092
2,4242,7806,0,2021-05-06 10:46:17,21093
2,4068,6796,0,2022-03-17 13:15:31,18719
2,6766,8790,1,2022-07-07 04:11:23,7163
3,159,1898,1,2021-01-07 17:03:03,5189
5,816,659,0,2020-04-03 12:22:52,4481
6,1543,6141,1,2020-02-01 08:32:51,30243
6,8641,2968,1,2021-03-30 17:05:39,48237
6,2587,2981,0,2021-11-23 14:31:54,33987


In [0]:
from pyspark.sql.functions import col, sum, avg, when

# Load the consolidated data from Silver layer
silver_df = spark.read.format("delta").load("/mnt/silver/checking_account_consolidated")

# Create transaction summary per customer
transaction_summary = silver_df.groupBy("customer_id").agg(
    sum("txn_amount").alias("total_transactions"),
    avg("txn_amount").alias("average_transaction"),
    sum(when(col("debit_or_credit") == 0, col("txn_amount")).otherwise(0)).alias("total_debits"),
    sum(when(col("debit_or_credit") == 1, col("txn_amount")).otherwise(0)).alias("total_credits")
)

# Save the transaction summary to Gold layer with overwriteSchema option
transaction_summary.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("/mnt/gold/transaction_summary")


In [0]:
display(transaction_summary)

customer_id,total_transactions,average_transaction,total_debits,total_credits
26,2766,2766.0,2766,0
29,12503,4167.666666666667,0,12503
474,12570,4190.0,8304,4266
65,11350,5675.0,11350,0
191,23358,5839.5,17674,5684
418,17135,4283.75,10273,6862
222,19310,6436.666666666667,5408,13902
270,9640,4820.0,5734,3906
293,9712,4856.0,0,9712
243,26723,3817.571428571429,10772,15951


In [0]:
# Create a Gold table for customer balance over time
customer_balance = silver_df.groupBy("customer_id", "updt_ts").agg(
    last("initial_balance").alias("latest_balance")
)

# Save the customer balance data to Gold layer
customer_balance.write.format("delta").mode("overwrite").save("/mnt/gold/customer_balance")

display(customer_balance)

customer_id,updt_ts,latest_balance
24,2020-03-29 14:58:04,45898
118,2022-03-05 05:04:34,15145
131,2020-08-12 20:40:56,38154
150,2021-02-07 06:09:52,28025
152,2020-02-03 02:04:44,7843
164,2021-10-17 15:02:42,49756
166,2021-02-25 15:52:30,35460
472,2020-02-15 23:08:06,10609
485,2022-04-05 18:21:15,39123
66,2021-04-11 10:14:12,31512


In [0]:
# Create a Gold table for scheduled payments summary
scheduled_payment_summary = silver_df.groupBy("customer_id").agg(
    sum("scheduled_payment").alias("total_scheduled_payments")
)

# Save the scheduled payment summary to Gold layer
scheduled_payment_summary.write.format("delta").mode("overwrite").save("/mnt/gold/scheduled_payment_summary")
display(scheduled_payment_summary)

customer_id,total_scheduled_payments
26,5568
29,11080
474,8178
65,11422
191,23250
418,22951
222,13582
270,10968
293,8144
243,39825
