In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

df = spark.read.option("header", True).option("inferSchema", True).csv(
    "/Volumes/big_data1/big_d/banking_volume/dataset_5000.csv"
)

# --- THE SAFE VERSION ---
df = df.withColumn(
    "transaction_time",
    expr("try_to_timestamp(transaction_time, 'M/d/yyyy H:mm')")
)

df = df.withColumn("txn_date", to_date(col("transaction_time")))

display(df)

transaction_id,customer_id,amount,transaction_time,merchant_type,is_fraud,age,gender,income,account_balance,transaction_frequency,customer_segment,atm_id,atm_location,cash_withdrawal_amount,loan_id,loan_amount,credit_score,loan_status,news_headline,sentiment_score,stock_ticker,txn_date
1,4174,2520.8,2025-01-01T00:00:00.000Z,Medical,0,69,Female,85031,245868.64,59,High Value,39,Bengaluru,38291.65,5001,737073,753,Paid,Company faces regulatory issues,0.09,SBIN,2025-01-01
2,4507,3654.36,2025-01-01T00:01:00.000Z,Retail,0,63,Female,25047,499830.13,4,High Value,18,Chennai,7062.71,5002,2324877,778,In Progress,Tech stocks rally on innovation,0.68,INFY,2025-01-01
3,1860,20149.64,2025-01-01T00:02:00.000Z,Medical,0,31,Male,164111,388402.94,39,Medium,34,Hyderabad,23130.33,5003,1884361,558,Paid,Banking sector sees strong growth,-0.53,ICICIBANK,2025-01-01
4,2294,14771.57,2025-01-01T00:03:00.000Z,Medical,1,63,Female,181373,484492.02,7,High Value,34,Delhi,25224.88,5004,2332992,352,Paid,Oil prices surge on global conflict,0.75,ICICIBANK,2025-01-01
5,2130,11626.89,2025-01-01T00:04:00.000Z,Bill,0,61,Other,194779,171504.58,71,New,42,Chennai,15310.03,5005,2010943,742,Paid,Rupee weakens against Dollar,0.47,TCS,2025-01-01
6,2095,14057.41,2025-01-01T00:05:00.000Z,Travel,0,55,Female,191186,61963.57,49,New,3,Hyderabad,8297.34,5006,363466,719,Paid,Global markets fall due to uncertainty,0.35,ICICIBANK,2025-01-01
7,4772,40176.1,2025-01-01T00:06:00.000Z,Food,0,37,Other,36366,253194.95,89,High Value,19,Mumbai,35615.85,5007,1393074,548,Paid,Banking sector sees strong growth,-0.73,RELIANCE,2025-01-01
8,4092,46462.11,2025-01-01T00:07:00.000Z,Retail,0,62,Other,166817,62557.62,15,New,34,Kolkata,13946.98,5008,603737,592,In Progress,Global markets fall due to uncertainty,-0.05,SBIN,2025-01-01
9,2638,20261.08,2025-01-01T00:08:00.000Z,Fuel,0,65,Other,189154,246479.63,7,Medium,26,Chennai,31270.28,5009,1775893,506,Default,Company faces regulatory issues,-0.45,TCS,2025-01-01
10,3169,45306.49,2025-01-01T00:09:00.000Z,Travel,1,58,Other,157425,135335.54,47,High Value,35,Hyderabad,16197.23,5010,2797210,400,Paid,Oil prices surge on global conflict,0.18,SBIN,2025-01-01


In [0]:
# 0. INGEST & COMMON PREPROCESSING
from pyspark.sql.functions import col, expr, to_date
from pyspark.sql.types import DoubleType

path = "/Volumes/big_data1/big_d/banking_volume/dataset_5000.csv"

df = spark.read.option("header", True).option("inferSchema", True).csv(path)

# Robust timestamp parsing (safe expression)
df = df.withColumn("transaction_time", expr("try_to_timestamp(transaction_time, 'M/d/yyyy H:mm')"))
df = df.withColumn("txn_date", to_date(col("transaction_time")))

# Cast important numeric columns to double if present
num_cols = ["amount","income","account_balance","transaction_frequency",
            "cash_withdrawal_amount","loan_amount","credit_score","sentiment_score","age"]
for c in num_cols:
    if c in df.columns:
        df = df.withColumn(c, col(c).cast(DoubleType()))

# Quick preview
print("Rows:", df.count())
display(df.limit(10))


Rows: 5000


transaction_id,customer_id,amount,transaction_time,merchant_type,is_fraud,age,gender,income,account_balance,transaction_frequency,customer_segment,atm_id,atm_location,cash_withdrawal_amount,loan_id,loan_amount,credit_score,loan_status,news_headline,sentiment_score,stock_ticker,txn_date
1,4174,2520.8,2025-01-01T00:00:00.000Z,Medical,0,69.0,Female,85031.0,245868.64,59.0,High Value,39,Bengaluru,38291.65,5001,737073.0,753.0,Paid,Company faces regulatory issues,0.09,SBIN,2025-01-01
2,4507,3654.36,2025-01-01T00:01:00.000Z,Retail,0,63.0,Female,25047.0,499830.13,4.0,High Value,18,Chennai,7062.71,5002,2324877.0,778.0,In Progress,Tech stocks rally on innovation,0.68,INFY,2025-01-01
3,1860,20149.64,2025-01-01T00:02:00.000Z,Medical,0,31.0,Male,164111.0,388402.94,39.0,Medium,34,Hyderabad,23130.33,5003,1884361.0,558.0,Paid,Banking sector sees strong growth,-0.53,ICICIBANK,2025-01-01
4,2294,14771.57,2025-01-01T00:03:00.000Z,Medical,1,63.0,Female,181373.0,484492.02,7.0,High Value,34,Delhi,25224.88,5004,2332992.0,352.0,Paid,Oil prices surge on global conflict,0.75,ICICIBANK,2025-01-01
5,2130,11626.89,2025-01-01T00:04:00.000Z,Bill,0,61.0,Other,194779.0,171504.58,71.0,New,42,Chennai,15310.03,5005,2010943.0,742.0,Paid,Rupee weakens against Dollar,0.47,TCS,2025-01-01
6,2095,14057.41,2025-01-01T00:05:00.000Z,Travel,0,55.0,Female,191186.0,61963.57,49.0,New,3,Hyderabad,8297.34,5006,363466.0,719.0,Paid,Global markets fall due to uncertainty,0.35,ICICIBANK,2025-01-01
7,4772,40176.1,2025-01-01T00:06:00.000Z,Food,0,37.0,Other,36366.0,253194.95,89.0,High Value,19,Mumbai,35615.85,5007,1393074.0,548.0,Paid,Banking sector sees strong growth,-0.73,RELIANCE,2025-01-01
8,4092,46462.11,2025-01-01T00:07:00.000Z,Retail,0,62.0,Other,166817.0,62557.62,15.0,New,34,Kolkata,13946.98,5008,603737.0,592.0,In Progress,Global markets fall due to uncertainty,-0.05,SBIN,2025-01-01
9,2638,20261.08,2025-01-01T00:08:00.000Z,Fuel,0,65.0,Other,189154.0,246479.63,7.0,Medium,26,Chennai,31270.28,5009,1775893.0,506.0,Default,Company faces regulatory issues,-0.45,TCS,2025-01-01
10,3169,45306.49,2025-01-01T00:09:00.000Z,Travel,1,58.0,Other,157425.0,135335.54,47.0,High Value,35,Hyderabad,16197.23,5010,2797210.0,400.0,Paid,Oil prices surge on global conflict,0.18,SBIN,2025-01-01


In [0]:
# 1. FRAUD DETECTION
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import when

# Prepare dataset and label
fraud_df = df
if "is_fraud" not in fraud_df.columns:
    raise ValueError("Column 'is_fraud' not found.")
fraud_df = fraud_df.withColumn("label", col("is_fraud").cast("double"))

# Categorical & numeric feature selection (only keep existing cols)
categorical_cols = [c for c in ["merchant_type","gender","customer_segment","atm_location","loan_status"] if c in fraud_df.columns]
numeric_cols = [c for c in ["amount","age","income","account_balance","transaction_frequency","cash_withdrawal_amount","credit_score","sentiment_score"] if c in fraud_df.columns]

# Index + OneHot (named args)
indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep") for c in categorical_cols]
encoders = [OneHotEncoder(inputCol=f"{c}_idx", outputCol=f"{c}_ohe") for c in categorical_cols]

assembler = VectorAssembler(inputCols=[f"{c}_ohe" for c in categorical_cols] + numeric_cols, outputCol="features_raw", handleInvalid="keep")
scaler = StandardScaler(inputCol="features_raw", outputCol="features", withMean=True, withStd=True)

rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=100)

pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler, rf])

train, test = fraud_df.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train)
pred = model.transform(test)

# Evaluation
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(pred)
print(f"Fraud detection AUC: {auc:.4f}")

display(pred.select("transaction_id","customer_id","amount","label","prediction","probability").limit(50))

# save
model.write().overwrite().save("/Volumes/big_data1/big_d/banking_volume/models/fraud_rf_v1")

Fraud detection AUC: 0.5033


transaction_id,customer_id,amount,label,prediction,probability
9,2638,20261.08,0.0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.9624937510631749"",""0.03750624893682515""]}"
12,2238,23827.08,0.0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.9651193151467166"",""0.03488068485328337""]}"
14,2482,32027.4,0.0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.9533332977356129"",""0.046666702264387114""]}"
22,4380,22472.99,0.0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.9665358078959583"",""0.03346419210404174""]}"
24,3391,31242.95,0.0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.9604395128652322"",""0.039560487134767695""]}"
25,2515,3598.07,0.0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.948414962330418"",""0.05158503766958204""]}"
26,4485,34134.03,0.0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.9596209513268176"",""0.040379048673182455""]}"
28,3433,35700.49,0.0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.9641312706474287"",""0.03586872935257128""]}"
31,3324,27629.52,0.0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.9585088056793449"",""0.04149119432065506""]}"
35,1021,13577.75,0.0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.9605270052889798"",""0.03947299471102019""]}"


In [0]:
fraud_vis = (
    df.groupBy("merchant_type")
      .agg(sum("is_fraud").alias("fraud_count"))
      .orderBy("fraud_count", ascending=False)
)

display(fraud_vis)


merchant_type,fraud_count
Travel,34
Medical,30
Online,29
Bill,28
Food,27
Fuel,26
Retail,25


Databricks visualization. Run in Databricks to view.

In [0]:
# 2. CUSTOMER SEGMENTATION
from pyspark.sql.functions import avg, count, sum as _sum
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler

# Aggregate per customer (one row per customer)
agg = df.groupBy("customer_id").agg(
    _sum("amount").alias("total_amount"),
    count("transaction_id").alias("transaction_count"),
    avg("cash_withdrawal_amount").alias("avg_cash_withdrawal"),
    avg("account_balance").alias("avg_account_balance"),
    avg("income").alias("avg_income"),
    avg("age").alias("avg_age")
).na.fill(0)

# engineered feature
agg = agg.withColumn("avg_txn_value", col("total_amount") / (col("transaction_count") + 1))

feature_cols = [c for c in ["total_amount","transaction_count","avg_cash_withdrawal","avg_account_balance","avg_income","avg_age","avg_txn_value"] if c in agg.columns]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw", handleInvalid="keep")
scaler = StandardScaler(inputCol="features_raw", outputCol="features", withMean=True, withStd=True)

kmeans = KMeans(featuresCol="features", k=4, seed=42)

pipeline = Pipeline(stages=[assembler, scaler, kmeans])
model = pipeline.fit(agg)
segmented = model.transform(agg)

display(segmented.select("customer_id","prediction").limit(100))
segmented.groupBy("prediction").agg({"total_amount":"avg","transaction_count":"avg","avg_income":"avg"}).show()

model.write().overwrite().save("/Volumes/big_data1/big_d/banking_volume/models/customer_kmeans_v1")


customer_id,prediction
1021,1
3139,1
2737,2
4885,3
4366,3
2936,3
4529,0
2332,0
1329,2
1007,0


+----------+------------------+----------------------+------------------+
|prediction| avg(total_amount)|avg(transaction_count)|   avg(avg_income)|
+----------+------------------+----------------------+------------------+
|         1| 82785.01656697002|    2.8803641092327696|107027.26365719235|
|         2|28493.244295774617|    1.3788732394366197|120251.74460093895|
|         3|30601.719479768806|    1.4104046242774566|100941.40944123313|
|         0| 29490.22054399999|                 1.328|101313.75066666666|
+----------+------------------+----------------------+------------------+



In [0]:
segment_vis = (
    df.groupBy("customer_segment")
      .agg(avg("income").alias("avg_income"))
      .orderBy("avg_income", ascending=False)
)

display(segment_vis)


customer_segment,avg_income
High Value,108347.905505341
Medium,107933.19937205652
New,107882.75404530745
Low,107200.70777690496


Databricks visualization. Run in Databricks to view.

In [0]:
# 3. ATM CASH DEMAND PREDICTION
from pyspark.sql.functions import dayofweek, month, lag
from pyspark.sql.window import Window
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

# prepare daily ATM aggregates
if "cash_withdrawal_amount" not in df.columns:
    raise ValueError("Column 'cash_withdrawal_amount' not found.")
atm_daily = df.groupBy("atm_id","txn_date").agg(
    _sum("cash_withdrawal_amount").alias("total_cash_withdrawn")
).orderBy("atm_id","txn_date").na.fill(0)

atm_daily = atm_daily.withColumn("day_of_week", dayofweek(col("txn_date")))
atm_daily = atm_daily.withColumn("month", month(col("txn_date")))

w = Window.partitionBy("atm_id").orderBy("txn_date")
atm_daily = atm_daily.withColumn("prev_day_cash", lag("total_cash_withdrawn").over(w)).na.fill(0.0)

feature_cols = ["day_of_week","month","prev_day_cash"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid="keep")

rf = RandomForestRegressor(featuresCol="features", labelCol="total_cash_withdrawn", numTrees=50)
pipeline = Pipeline(stages=[assembler, rf])

train, test = atm_daily.randomSplit([0.8,0.2], seed=42)
model = pipeline.fit(train)
pred = model.transform(test)

evaluator = RegressionEvaluator(labelCol="total_cash_withdrawn", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(pred)
print(f"ATM demand RMSE: {rmse:.2f}")

display(pred.select("atm_id","txn_date","total_cash_withdrawn","prediction").limit(50))
model.write().overwrite().save("/Volumes/big_data1/big_d/banking_volume/models/atm_demand_rf_v1")


ATM demand RMSE: 157884.97


atm_id,txn_date,total_cash_withdrawn,prediction
3,2025-01-01,927504.9799999996,729407.8728011181
3,2025-01-04,421447.91,549706.7831219668
4,2025-01-02,746559.2499999999,687373.2540826245
6,2025-01-02,687056.5300000001,680741.7474499911
6,2025-01-04,267731.71,441247.2029581467
7,2025-01-01,677083.74,729407.8728011181
7,2025-01-02,910116.26,681601.3287917414
7,2025-01-04,323693.72,589460.3901682022
8,2025-01-03,761745.8999999999,684728.7015594882
9,2025-01-03,669028.1699999998,674602.5994710713


In [0]:
atm_vis = (
    df.groupBy("atm_id")
      .agg(sum("cash_withdrawal_amount").alias("total_withdrawal"))
      .orderBy("total_withdrawal", ascending=False)
)

display(atm_vis)


atm_id,total_withdrawal
29,3253253.6700000004
17,3090747.5700000003
24,3063476.919999999
40,3010586.41
22,2958455.670000001
4,2862130.12
23,2819479.560000001
21,2818467.8600000017
32,2768766.0500000003
42,2736932.159999999


Databricks visualization. Run in Databricks to view.

In [0]:
# 4. LOAN DEFAULT EARLY WARNING
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import when
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Filter loan rows
loan_df = df.filter(col("loan_id").isNotNull())

# Create binary label: Default -> 1 else 0 (adjust if different wording)
loan_df = loan_df.withColumn("loan_label", when(col("loan_status") == "Default", 1.0).otherwise(0.0))

# Show distribution
print("Loan label distribution:")
loan_df.groupBy("loan_label").count().show()

# Features
feature_cols = [c for c in ["loan_amount","credit_score","income","account_balance","transaction_frequency","age"] if c in loan_df.columns]
loan_df = loan_df.fillna({c:0 for c in feature_cols})

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid="keep")
gbt = GBTClassifier(featuresCol="features", labelCol="loan_label", maxIter=100)

pipeline = Pipeline(stages=[assembler, gbt])

# Train only if there are positive and negative examples
counts = loan_df.groupBy("loan_label").count().collect()
labels = {r["loan_label"]: r["count"] for r in counts}
if labels.get(0.0,0) > 10 and labels.get(1.0,0) > 10:
    train, test = loan_df.randomSplit([0.8,0.2], seed=42)
    model = pipeline.fit(train)
    pred = model.transform(test)
    ev = BinaryClassificationEvaluator(labelCol="loan_label", rawPredictionCol="rawPrediction", metricName="areaUnderPR")
    aupr = ev.evaluate(pred)
    print(f"Loan default AUPR: {aupr:.4f}")
    display(pred.select("loan_id","loan_amount","credit_score","loan_label","prediction","probability").limit(50))
    model.write().overwrite().save("/Volumes/big_data1/big_d/banking_volume/models/loan_gbt_v1")
else:
    print("Not enough labeled defaults or non-defaults to reliably train. Provide historical loan outcome labels.")


Loan label distribution:
+----------+-----+
|loan_label|count|
+----------+-----+
|       0.0| 4189|
|       1.0|  811|
+----------+-----+

Loan default AUPR: 0.1555


loan_id,loan_amount,credit_score,loan_label,prediction,probability
5009,1775893.0,506.0,1.0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.9361982523627632"",""0.06380174763723678""]}"
5012,516669.0,599.0,0.0,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.014805248473479984"",""0.98519475152652""]}"
5014,2741067.0,734.0,0.0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.7615019464925801"",""0.23849805350741993""]}"
5022,2132215.0,835.0,0.0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.8129230302736535"",""0.18707696972634646""]}"
5024,1288255.0,630.0,0.0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.7627235068186345"",""0.23727649318136546""]}"
5025,2103497.0,490.0,0.0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.8617923341099906"",""0.13820766589000943""]}"
5026,273136.0,833.0,1.0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.9501929920924795"",""0.04980700790752046""]}"
5028,267470.0,793.0,1.0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.8509935084970462"",""0.14900649150295375""]}"
5031,278943.0,336.0,0.0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.8502021676667182"",""0.1497978323332818""]}"
5035,1902518.0,785.0,0.0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.6730228757499144"",""0.3269771242500856""]}"


In [0]:
loan_vis = (
    df.groupBy("loan_status")
      .agg(avg("loan_amount").alias("avg_loan_amount"))
      .orderBy("avg_loan_amount", ascending=False)
)

display(loan_vis)


loan_status,avg_loan_amount
Paid,1523739.9182426534
In Progress,1503939.0265957448
Default,1498193.924784217


Databricks visualization. Run in Databricks to view.

In [0]:
# 5. MARKET SENTIMENT AGGREGATION
from pyspark.sql.functions import avg, count
from pyspark.sql.window import Window
from pyspark.sql.functions import avg as _avg

if "stock_ticker" in df.columns and "sentiment_score" in df.columns:
    news = df.withColumn("news_date", col("txn_date"))
    sent = news.groupBy("stock_ticker","news_date").agg(
        _avg("sentiment_score").alias("avg_sentiment"),
        count("news_headline").alias("news_count")
    ).orderBy("stock_ticker","news_date")

    # rolling 3-day sentiment per ticker
    w = Window.partitionBy("stock_ticker").orderBy("news_date").rowsBetween(-2,0)
    sent = sent.withColumn("rolling_sentiment", _avg("avg_sentiment").over(w))

    display(sent.limit(200))
else:
    print("Missing column 'stock_ticker' or 'sentiment_score' - cannot compute market sentiment aggregation.")


stock_ticker,news_date,avg_sentiment,news_count,rolling_sentiment
HDFCBANK,2025-01-01,-0.0559183673469387,196,-0.0559183673469387
HDFCBANK,2025-01-02,0.0229807692307692,208,-0.0164687990580847
HDFCBANK,2025-01-03,-0.0247867298578199,211,-0.0192414426579964
HDFCBANK,2025-01-04,-0.0261224489795918,98,-0.0093094698688808
ICICIBANK,2025-01-01,0.0163111111111111,225,0.0163111111111111
ICICIBANK,2025-01-02,0.0275728155339806,206,0.0219419633225458
ICICIBANK,2025-01-03,-0.0121938775510204,196,0.0105633496980237
ICICIBANK,2025-01-04,-0.005,94,0.00345964599432
INFY,2025-01-01,0.0299473684210526,190,0.0299473684210526
INFY,2025-01-02,0.0550777202072538,193,0.0425125443141532


In [0]:
sentiment_vis = (
    df.groupBy("stock_ticker")
      .agg(avg("sentiment_score").alias("avg_sentiment"))
      .orderBy("avg_sentiment", ascending=False)
)

display(sentiment_vis)


stock_ticker,avg_sentiment
INFY,0.0263860667634252
SBIN,0.0193801652892561
RELIANCE,0.0097424892703862
ICICIBANK,0.009001386962552
WIPRO,0.0064112903225806
TCS,-0.0029519774011299
HDFCBANK,-0.0195932678821879


Databricks visualization. Run in Databricks to view.