In [0]:
from pyspark.sql import functions as F

df = (
    spark.read.format("csv")
        .option("inferSchema", "true")
        .option("header", "true")
        .load("/Volumes/raw-data/banking/csv/Banking_Database.csv")
)

df.printSchema()
df.display(5)

In [0]:
pivot_df = df.select(
    F.col("Customer ID").alias("customer_id"),
    F.col("Account Type").alias("account_type"),
    F.col("Transaction Type").alias("txn_type"),
    F.col("Transaction Amount").alias("txn_amount")
)

### Pivot Functions

In [0]:
### Transaction Pivot – Amount by Transaction Type per Account

pivot_txn = (
    pivot_df
    .groupBy("customer_id", "account_type")
    .pivot("txn_type")
    .agg(F.sum("txn_amount"))
)

pivot_txn.display()

In [0]:
### Monthly Transaction Pivot – Total Amount per Month
monthly_df = df.select(
    F.col("Customer ID").alias("customer_id"),
    F.date_format("Transaction Date", "yyyy-MM").alias("txn_month"),
    F.col("Transaction Amount").alias("txn_amount")
)

pivot_month = (
    monthly_df
    .groupBy("customer_id")
    .pivot("txn_month")
    .agg(F.sum("txn_amount"))
)

pivot_month.display()

In [0]:

### Loan Status Pivot – Count by Loan Type
loan_df = df.select(
    F.col("Loan Type").alias("loan_type"),
    F.col("Loan Status").alias("loan_status")
)

pivot_loan = (
    loan_df
    .groupBy("loan_type")
    .pivot("loan_status")
    .agg(F.count("loan_status"))
)

display(pivot_loan)

In [0]:
### Card Type Pivot – Average Credit Card Balance
card_df = df.select(
    F.col("Card Type").alias("card_type"),
    F.col("Credit Card Balance").alias("cc_balance")
)

pivot_card = (
    card_df
    .groupBy("card_type")
    .pivot("card_type")
    .agg(F.avg("cc_balance"))
)

pivot_card.display()

In [0]:
### Branch-wise Anomaly Pivot – Risk Analysis
anomaly_df = df.select(
    F.col("Branch ID").alias("branch_id"),
    F.col("Anomaly").alias("anomaly_flag")
)

pivot_anomaly = (
    anomaly_df
    .groupBy("branch_id")
    .pivot("anomaly_flag")
    .agg(F.count("anomaly_flag"))
)

display(pivot_anomaly)

### When Otherwise
Branch-wise Anomaly Pivot – Risk Analysis

In [0]:
function_test = df.select(
    F.col("Customer ID").alias("customer_id"),
    F.col("Transaction Amount").alias("txn_amount"),
    F.when(
        (F.col("Transaction Type") == "Withdrawal") & (F.col("Transaction Amount") >= 50000),
        "HIGH Value Withdrawal"
    ).when(
        (F.col("Transaction Type") == "Withdrawal") & (F.col("Transaction Amount") >= 10000),
        "MID Value Withdrawal"
    ).when(
        (F.col("Transaction Type") == "Withdrawal"),
        "Very low Value Withdrawal"
    ).otherwise(None).alias("txn_cat")
)
display(function_test)

In [0]:
function_test = df.select(
    F.col("Customer ID").alias("customer_id"),
    F.col("Transaction Amount").alias("txn_amount"),
    F.when(
        (F.trim(F.lower(F.col("Transaction Type"))) == "withdrawal") & (F.col("Transaction Amount") > 500),
        "HIGH Value Withdrawal"
    ).when(
        (F.trim(F.lower(F.col("Transaction Type"))) == "withdrawal") & (F.col("Transaction Amount") > 100),
        "MID Value Withdrawal"
    ).when(
        (F.trim(F.lower(F.col("Transaction Type"))) == "withdrawal"),
        "Very low Value Withdrawal"
    ).otherwise("UPI Transaction").alias("txn_cat")
)
display(function_test)

In [0]:
from pyspark.sql import functions as F

function_test = (
    df.filter(F.col("Transaction Type") == "Withdrawal")  ## Added filter to get only withdrawel data from the dataset
      .select(
        F.col("Customer ID").alias("customer_id"),
        F.col("Transaction Amount").alias("txn_amount"),
        F.when(
            (F.lower(F.trim(F.col("Transaction Type"))) == "withdrawal") &
            (F.col("Transaction Amount") > 5000),
            "BANK TRANSFER"
        ).when(
            (F.lower(F.trim(F.col("Transaction Type"))) == "withdrawal") &
            (F.col("Transaction Amount").between(3000, 5000)),
            "HIGH Value Withdrawal"
        ).when(
            (F.lower(F.trim(F.col("Transaction Type"))) == "withdrawal") &
            (F.col("Transaction Amount").between(1000, 2999)),
            "MID Value Withdrawal"
        ).when(
            (F.lower(F.trim(F.col("Transaction Type"))) == "withdrawal") &
            (F.col("Transaction Amount").between(500, 999)),
            "LOW Value Withdrawal"
        ).otherwise(
            "UPI Transaction"
        ).alias("txn_cat")
    )
)

display(function_test)