---------------------------------------------
Notebook 02: business reporting + summary analytics on the cleaned data you saved in Parquet.
---------------------------------------------


In [0]:
from pyspark.sql.functions import sum as _sum, avg, countDistinct

# Load cleaned data from Parquet
df = spark.read.parquet("/FileStore/tables/bank_transactions_cleaned")

df.printSchema()
df.select("CUSTOMER_ID", "AMOUNT", "TXN_TYPE", "EMI_FLAG").display()


root
 |-- TRANSACTION_ID: string (nullable = true)
 |-- CUSTOMER_ID: string (nullable = true)
 |-- ACCOUNT_TYPE: string (nullable = true)
 |-- BRANCH_CODE: string (nullable = true)
 |-- TXN_DATE: date (nullable = true)
 |-- AMOUNT: double (nullable = true)
 |-- MERCHANT_CATEGORY: string (nullable = true)
 |-- CHANNEL: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- IS_INTERNATIONAL: boolean (nullable = true)
 |-- TXN_TIMESTAMP: timestamp (nullable = true)
 |-- AS_OF_DATE: date (nullable = true)
 |-- LCHG_TIME: timestamp (nullable = true)
 |-- IS_CREDITED: boolean (nullable = true)
 |-- EMI_FLAG: boolean (nullable = true)
 |-- LOAN_ACCOUNT: boolean (nullable = true)
 |-- LOAN_AMOUNT: double (nullable = true)
 |-- LOAN_BALANCE: double (nullable = true)
 |-- Txn_day: integer (nullable = true)
 |-- Txn_month: integer (nullable = true)
 |-- Txn_year: integer (nullable = true)
 |-- HIGH_VALUE_TXN: boolean (nullable = true)
 |-- Txn_group: string (nullable = true)
 |-- TXN_T

CUSTOMER_ID,AMOUNT,TXN_TYPE,EMI_FLAG
CUST2824,771.0,Credit,False
CUST4657,1314.32,Credit,False
CUST9928,289.35,Credit,False
CUST5552,7945.44,Credit,False
CUST2674,6643.78,Credit,False
CUST5333,8760.43,Credit,False
CUST7201,3085.9,Credit,False
CUST3664,8872.99,Credit,False
CUST3803,6338.3,Credit,False
CUST9751,20043.71,Credit,False


📊 Business Analytics

In [0]:
#Total Spending Per Customer
from pyspark.sql.functions import sum
spend_df=df.groupBy("CUSTOMER_ID").agg(sum("Amount").alias("Total_spent")).orderBy("Total_spent",ascending=False)
spend_df.display()

CUSTOMER_ID,Total_spent
CUST3832,169807.54000000004
CUST8222,161505.52
CUST8657,161287.44
CUST8062,155936.86000000002
CUST8779,153200.53999999998
CUST5757,151044.76
CUST7807,150854.36
CUST3532,148771.79
CUST5862,148003.34
CUST8488,146687.25999999998


In [0]:
#save
spend_df.write.mode("overwrite").parquet("/FileStore/tables/spend_per_customer")


In [0]:
#Top 5 Customers by Debit
top_debit_df=df.filter(df.TXN_TYPE=='Debit').groupBy("CUSTOMER_ID").agg(sum("Amount").alias("TOTAL_DEBIT")).orderBy("TOTAL_DEBIT",ascending=False).limit(5)
top_debit_df.display()

CUSTOMER_ID,TOTAL_DEBIT
CUST5028,83223.32
CUST6495,81834.5
CUST4532,81742.06999999999
CUST8488,81013.95000000001
CUST3665,76623.98


In [0]:
top_debit_df.write.mode("overwrite").parquet("/FileStore/tables/top_5_debit_customers")


In [0]:
#Monthly Average EMI, Bills, Credit Usage
monthly_emi_df = df.filter(df.TXN_TYPE.isin("EMI", "Loan_Repayment", "Bill_Payment")) \
                   .groupBy("TXN_MONTH", "TXN_TYPE") \
                   .agg(avg("AMOUNT").alias("AVG_AMOUNT")) \
                   .orderBy("TXN_MONTH", "TXN_TYPE")

monthly_emi_df.display()


TXN_MONTH,TXN_TYPE,AVG_AMOUNT
1,Bill_Payment,4953.56227705113
1,EMI,5048.8004402877705
1,Loan_Repayment,5202.199976105139
2,Bill_Payment,4905.295164698568
2,EMI,5013.878375341633
2,Loan_Repayment,5217.43763525836
3,Bill_Payment,5020.811250000001
3,EMI,5122.365463203467
3,Loan_Repayment,4997.691026239068
4,Bill_Payment,4855.067441721459


In [0]:
monthly_emi_df.write.mode("overwrite").parquet("/FileStore/tables/monthly_emi_avg")


In [0]:
# EMI vs Non-EMI User Count
emi_user_count = df.groupBy("EMI_FLAG") \
                   .agg(countDistinct("CUSTOMER_ID").alias("UNIQUE_USERS"))

emi_user_count.display()


EMI_FLAG,UNIQUE_USERS
True,8690
False,8994


In [0]:
emi_user_count.write.mode("overwrite").parquet("/FileStore/tables/emi_user_count")


In [0]:
#City-wise Avg Transaction Amount
city_avg_amt = df.groupBy("CITY") \
                 .agg(avg("AMOUNT").alias("AVG_TXN_AMOUNT")) \
                 .orderBy("AVG_TXN_AMOUNT", ascending=False)

city_avg_amt.display()


CITY,AVG_TXN_AMOUNT
Kolkata,5053.404612877581
Delhi,5045.293986199152
Bangalore,5028.1152543880935
Ahmedabad,5013.336830662691
Mumbai,4999.855605660829
Hyderabad,4989.799537930488
Pune,4955.832895287115
Chennai,4900.76509089453


In [0]:
city_avg_amt.write.mode("overwrite").parquet("/FileStore/tables/city_avg_txn")
