In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
accounts_df = spark.read.format("csv")\
    .option("header",True)\
        .option("inferSchema",True)\
            .load("s3://end-to-end-banking-data-pipeline/silver/cleaned_accounts")
accounts_df.display()

account_id,customer_id,account_open_date,balance,status,status_flag
ACC00000671,CUST002602,2025-12-13,850680.26,BLOCKED,N
ACC00001510,CUST008444,2018-01-29,634323.35,DORMANT,N
ACC00002822,CUST002438,2005-02-25,893248.19,CLOSED,N
ACC00002983,CUST004708,2015-12-03,202731.03,DORMANT,N
ACC00003107,CUST007951,2016-06-24,704485.26,CLOSED,N
ACC00008791,CUST001279,2019-03-04,347187.43,BLOCKED,N
ACC00009053,CUST008051,2020-08-30,163991.3,BLOCKED,N
ACC00009810,CUST005550,2018-10-11,199514.68,DORMANT,N
ACC00002274,CUST002350,2008-09-05,894025.64,BLOCKED,N
ACC00003463,CUST007510,2012-04-03,8000.78,CLOSED,N


In [0]:
transactions_df = spark.read.format("csv")\
    .option("header", "true")\
        .option("inferSchema", "true")\
            .load("s3://end-to-end-banking-data-pipeline/silver/cleaned_transactions")
display(transactions_df)

txn_id,account_id,txn_date,txn_type,amount,channel,date
TXN00000172,ACC00006104,2023-06-21T19:21:34.000Z,WITHDRAWAL,98965.19,ONLINE,2023-06-21
TXN00000468,ACC00000671,2023-01-11T19:54:12.000Z,PAYMENT,26205.34,POS,2023-01-11
TXN00000991,ACC00006443,2023-11-15T02:57:07.000Z,PAYMENT,92479.86,MOBILE,2023-11-15
TXN00001205,ACC00005445,2023-04-04T19:26:15.000Z,TRANSFER,62871.18,BRANCH,2023-04-04
TXN00001626,ACC00009428,2023-12-29T18:25:21.000Z,PAYMENT,62172.62,ATM,2023-12-29
TXN00004575,ACC00002269,2023-01-03T10:39:34.000Z,PAYMENT,35431.7,ONLINE,2023-01-03
TXN00005273,ACC00000455,2023-11-11T06:04:08.000Z,WITHDRAWAL,54419.66,ATM,2023-11-11
TXN00006761,ACC00005699,2023-06-19T01:45:22.000Z,TRANSFER,25597.56,ATM,2023-06-19
TXN00007762,ACC00002330,2023-06-30T20:42:44.000Z,WITHDRAWAL,27341.34,MOBILE,2023-06-30
TXN00007839,ACC00000026,2023-08-20T01:25:21.000Z,DEPOSIT,41769.25,MOBILE,2023-08-20


In [0]:
# 4. Identify dormant accounts (no transactions in last 12 months)
recent_txn_df = transactions_df.filter(
    datediff(current_date(), col("txn_date")) <= 365
).select("account_id").distinct()

dormant_accounts_df = accounts_df.join(
    recent_txn_df,
    on="account_id",
    how="left_anti"
)
display(dormant_accounts_df)

account_id,customer_id,account_open_date,balance,status,status_flag
ACC00000671,CUST002602,2025-12-13,850680.26,BLOCKED,N
ACC00001510,CUST008444,2018-01-29,634323.35,DORMANT,N
ACC00002822,CUST002438,2005-02-25,893248.19,CLOSED,N
ACC00002983,CUST004708,2015-12-03,202731.03,DORMANT,N
ACC00003107,CUST007951,2016-06-24,704485.26,CLOSED,N
ACC00008791,CUST001279,2019-03-04,347187.43,BLOCKED,N
ACC00009053,CUST008051,2020-08-30,163991.3,BLOCKED,N
ACC00009810,CUST005550,2018-10-11,199514.68,DORMANT,N
ACC00002274,CUST002350,2008-09-05,894025.64,BLOCKED,N
ACC00003463,CUST007510,2012-04-03,8000.78,CLOSED,N


In [0]:
# Load the data into s3
dormant_accounts_df.write.mode("overwrite").format("csv")\
    .option("header", "true")\
    .save("s3://end-to-end-banking-data-pipeline/gold/dormant_accounts/")
dormant_accounts_df.write.mode("overwrite").saveAsTable("workspace.banking_data.dormant_accounts")