### Import the necessary libraries

In [1]:
import os
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.10"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3.10"

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, when, sum, avg, max, count,
    countDistinct
)
from pyspark.ml.feature import (
    StringIndexer,
    OneHotEncoder,
    VectorAssembler,
    StandardScaler
)
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [3]:
spark = SparkSession.builder \
    .appName("FraudDetection") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/27 22:02:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Load data from CSV into a Spark DataFrame

In [4]:
transactions = spark.read.csv(
    "data/transactions.csv",
    header=True,
    inferSchema=True
)

loan_apps = spark.read.csv(
    "data/loan_applications.csv",
    header=True,
    inferSchema=True
)

### Check dataframes schemas 

In [5]:
transactions.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- transaction_date: timestamp (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- transaction_amount: double (nullable = true)
 |-- merchant_category: string (nullable = true)
 |-- merchant_name: string (nullable = true)
 |-- transaction_location: string (nullable = true)
 |-- account_balance_after_transaction: double (nullable = true)
 |-- is_international_transaction: integer (nullable = true)
 |-- device_used: string (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- transaction_status: string (nullable = true)
 |-- transaction_source_destination: string (nullable = true)
 |-- transaction_notes: string (nullable = true)
 |-- fraud_flag: integer (nullable = true)



In [6]:
loan_apps.printSchema()

root
 |-- application_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- application_date: date (nullable = true)
 |-- loan_type: string (nullable = true)
 |-- loan_amount_requested: double (nullable = true)
 |-- loan_tenure_months: integer (nullable = true)
 |-- interest_rate_offered: double (nullable = true)
 |-- purpose_of_loan: string (nullable = true)
 |-- employment_status: string (nullable = true)
 |-- monthly_income: double (nullable = true)
 |-- cibil_score: integer (nullable = true)
 |-- existing_emis_monthly: double (nullable = true)
 |-- debt_to_income_ratio: double (nullable = true)
 |-- property_ownership_status: string (nullable = true)
 |-- residential_address: string (nullable = true)
 |-- applicant_age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- number_of_dependents: integer (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- fraud_flag: integer (nullable = true)
 |-- fraud_type: string (nullable = true

### Drop Unecessary columns from transactions and loan_apps dataframes

In [7]:
transactions = transactions.drop(
    "ip_address",
    "transaction_notes",
    "transaction_location",
    "transaction_source_destination"
)

loan_apps = loan_apps.drop(

    "residential_address",
    "fraud_type"
)

In [8]:
loan_txn_df = loan_apps.select(
    "customer_id",
    "application_date"
).join(
    transactions,
    on="customer_id",
    how="inner"
)

### Filter the transactions (only keep the ones made before the loan application

In [9]:
txn_df = loan_txn_df.filter(
    col("transaction_date") < col("application_date")
)

### Feature Engineering on transactions 

In [10]:
txn_agg = txn_df.groupBy("customer_id").agg(
    count("*").alias("txn_count"),
    sum("transaction_amount").alias("total_txn_amount"),
    avg("transaction_amount").alias("avg_txn_amount"),
    max("transaction_amount").alias("max_txn_amount"),

    sum(when(col("is_international_transaction") == 1, 1).otherwise(0))
        .alias("international_txn_count"),

    countDistinct("merchant_name").alias("unique_merchants"),
    countDistinct("merchant_category").alias("unique_merchant_categories"),

    sum(when(col("transaction_status") == "Failed", 1).otherwise(0))
        .alias("failed_txn_count"),
    
    sum(when(col("fraud_flag") == 1, 1).otherwise(0))
        .alias("fraud_flag_txn_count"),

    avg("account_balance_after_transaction").alias("avg_post_balance")
)


In [11]:
txn_agg = txn_agg.withColumn(
    "international_txn_ratio",
    when(col("txn_count") > 0, col("international_txn_count") / col("txn_count"))
    .otherwise(0)
).withColumn(
    "failed_txn_ratio",
    when(col("txn_count") > 0, col("failed_txn_count") / col("txn_count"))
    .otherwise(0)
)


### Merge the dataframes to get advantage of transactions history of customers for loan fraud detection

In [12]:
full_df = loan_apps.join(
    txn_agg,
    on="customer_id",
    how="inner"
)


### Feature engineering on loan applications 

In [13]:
full_df = full_df.withColumn(
    "loan_to_income_ratio",
    col("loan_amount_requested") / (col("monthly_income") + 1)
).withColumn(
    "emi_burden_ratio",
    col("existing_emis_monthly") / (col("monthly_income") + 1)
).withColumn(
    "is_high_risk_age",
    when((col("applicant_age") < 21) | (col("applicant_age") > 65), 1).otherwise(0)
)


In [14]:
full_df = full_df.withColumn(
    "age_bucket",
    when(col("applicant_age") < 21, "very_young")
    .when((col("applicant_age") >= 21) & (col("applicant_age") <= 25), "young")
    .when((col("applicant_age") >= 26) & (col("applicant_age") <= 35), "early_career")
    .when((col("applicant_age") >= 36) & (col("applicant_age") <= 50), "mid_career")
    .when((col("applicant_age") >= 51) & (col("applicant_age") <= 65), "senior")
    .otherwise("elderly")
)

In [15]:
full_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- application_id: string (nullable = true)
 |-- application_date: date (nullable = true)
 |-- loan_type: string (nullable = true)
 |-- loan_amount_requested: double (nullable = true)
 |-- loan_tenure_months: integer (nullable = true)
 |-- interest_rate_offered: double (nullable = true)
 |-- purpose_of_loan: string (nullable = true)
 |-- employment_status: string (nullable = true)
 |-- monthly_income: double (nullable = true)
 |-- cibil_score: integer (nullable = true)
 |-- existing_emis_monthly: double (nullable = true)
 |-- debt_to_income_ratio: double (nullable = true)
 |-- property_ownership_status: string (nullable = true)
 |-- applicant_age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- number_of_dependents: integer (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- fraud_flag: integer (nullable = true)
 |-- txn_count: long (nullable = false)
 |-- total_txn_amount: double (nullable = true)
 |-

### Drop Uncessary columns for training 

In [16]:
full_df = full_df.drop(

    "customer_id",
    "application_id",
     "application_date",
    "applicant_age"
)

In [17]:
categorical_cols = ["loan_type", 
                    "purpose_of_loan",
                    "employment_status",
                    "property_ownership_status",
                    "gender",
                    "loan_status",
                    "age_bucket"
                    
                    
                    
                   ]
numeric_cols = ["loan_amount_requested", 
                "loan_tenure_months",
                "interest_rate_offered",
                "monthly_income",
                "cibil_score",
                "existing_emis_monthly",
                "debt_to_income_ratio",
                "number_of_dependents",
                "txn_count",
                "total_txn_amount",
                "avg_txn_amount",
                "max_txn_amount",
                "international_txn_count",
                "unique_merchants",
                "unique_merchant_categories",
                "failed_txn_count",
                "fraud_flag_txn_count",
                "avg_post_balance",
                "international_txn_ratio",
                "failed_txn_ratio",
                "loan_to_income_ratio",
                "emi_burden_ratio",
                "is_high_risk_age"
                
                          
                
                
               ]


In [18]:
indexers = [
    StringIndexer(
        inputCol=col,
        outputCol=f"{col}_idx",
        handleInvalid="keep"
    )
    for col in categorical_cols
]


### Encode categorical columns 

In [19]:
assembler = VectorAssembler(
    inputCols=[f"{col}_idx" for col in categorical_cols] + numeric_cols,
    outputCol="features"
)

### Perform Stratified split since the dataset is unbalanced

In [20]:
label_col = "fraud_flag"

fractions = {
    0.0: 0.8,
    1.0: 0.8
}

train_df = full_df.sampleBy(
    col=label_col,
    fractions=fractions,
    seed=42
)

test_df = full_df.subtract(train_df)

In [21]:
train_df.groupBy(label_col).count().show()
test_df.groupBy(label_col).count().show()

+----------+-----+
|fraud_flag|count|
+----------+-----+
|         1|  749|
|         0|35876|
+----------+-----+



26/01/27 22:02:41 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 25:>                                                       (0 + 11) / 11]

+----------+-----+
|fraud_flag|count|
+----------+-----+
|         1|  181|
|         0| 8991|
+----------+-----+



                                                                                

## Train and Evaluate random forest

In [22]:
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="fraud_flag",
    numTrees=100,
    maxDepth=10,
    seed=42
)

In [23]:
pipeline = Pipeline(
    stages=indexers + [assembler, rf]
)

model = pipeline.fit(train_df)

In [24]:
predictions = model.transform(test_df)

In [25]:
evaluator = BinaryClassificationEvaluator(labelCol="fraud_flag")

In [26]:
auc = evaluator.evaluate(predictions)
print("AUC:", auc)

AUC: 0.9999978492918947


In [27]:
cm = (
    predictions
    .groupBy("fraud_flag", "prediction")
    .count()
)

cm.show()

+----------+----------+-----+
|fraud_flag|prediction|count|
+----------+----------+-----+
|         0|       0.0| 8991|
|         1|       1.0|  181|
+----------+----------+-----+



In [28]:
from pyspark.sql.functions import col, sum as _sum, when, coalesce, lit

stats = (
    cm
    .groupBy("fraud_flag")
    .agg(
        coalesce(
            _sum(when(col("fraud_flag") == col("prediction"), col("count"))),
            lit(0)
        ).alias("TP"),
        coalesce(
            _sum(when(col("fraud_flag") != col("prediction"), col("count"))),
            lit(0)
        ).alias("FN")
    )
)

In [29]:
fp = (
    cm
    .groupBy("prediction")
    .agg(_sum("count").alias("predicted_total"))
    .withColumnRenamed("prediction", "fraud_flag")
)

stats = stats.join(fp, "fraud_flag")
stats = stats.withColumn("FP", col("predicted_total") - col("TP"))


In [30]:
stats = (
    stats
    .withColumn("precision", col("TP") / (col("TP") + col("FP")))
    .withColumn("recall", col("TP") / (col("TP") + col("FN")))
    .withColumn("f1", 2 * col("precision") * col("recall") /
                        (col("precision") + col("recall")))
)

stats.select("fraud_flag", "precision", "recall", "f1").show()


26/01/27 22:03:01 WARN DAGScheduler: Broadcasting large task binary with size 1014.9 KiB


+----------+---------+------+---+
|fraud_flag|precision|recall| f1|
+----------+---------+------+---+
|         1|      1.0|   1.0|1.0|
|         0|      1.0|   1.0|1.0|
+----------+---------+------+---+



26/01/27 22:03:02 WARN DAGScheduler: Broadcasting large task binary with size 1253.3 KiB


## Train and Evaluate Logistic Regression Classifier 

In [31]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(
    featuresCol="features",
    labelCol="fraud_flag",
    maxIter=100,
    regParam=0.0,
    elasticNetParam=0.0
)


In [32]:
pipeline = Pipeline(
    stages=indexers + [assembler, lr]
)

model = pipeline.fit(train_df)

In [33]:
predictions = model.transform(test_df)

In [34]:
evaluator = BinaryClassificationEvaluator(labelCol="fraud_flag")

In [35]:
auc = evaluator.evaluate(predictions)
print("AUC:", auc)

AUC: 0.9999975420478796


In [36]:
cm = (
    predictions
    .groupBy("fraud_flag", "prediction")
    .count()
)

cm.show()

+----------+----------+-----+
|fraud_flag|prediction|count|
+----------+----------+-----+
|         0|       0.0| 8991|
|         1|       1.0|  181|
+----------+----------+-----+



In [37]:
from pyspark.sql.functions import col, sum as _sum, when, coalesce, lit

stats = (
    cm
    .groupBy("fraud_flag")
    .agg(
        coalesce(
            _sum(when(col("fraud_flag") == col("prediction"), col("count"))),
            lit(0)
        ).alias("TP"),
        coalesce(
            _sum(when(col("fraud_flag") != col("prediction"), col("count"))),
            lit(0)
        ).alias("FN")
    )
)

In [38]:
fp = (
    cm
    .groupBy("prediction")
    .agg(_sum("count").alias("predicted_total"))
    .withColumnRenamed("prediction", "fraud_flag")
)

stats = stats.join(fp, "fraud_flag")
stats = stats.withColumn("FP", col("predicted_total") - col("TP"))


In [39]:
stats = (
    stats
    .withColumn("precision", col("TP") / (col("TP") + col("FP")))
    .withColumn("recall", col("TP") / (col("TP") + col("FN")))
    .withColumn("f1", 2 * col("precision") * col("recall") /
                        (col("precision") + col("recall")))
)

stats.select("fraud_flag", "precision", "recall", "f1").show()



+----------+---------+------+---+
|fraud_flag|precision|recall| f1|
+----------+---------+------+---+
|         1|      1.0|   1.0|1.0|
|         0|      1.0|   1.0|1.0|
+----------+---------+------+---+



                                                                                

### Train a Gradient-Boosted Tree classifier 

In [70]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(
    featuresCol="features",
    labelCol="fraud_flag",
    maxIter=100,
    maxDepth=5,
    seed=42
)


In [71]:
pipeline = Pipeline(
    stages=indexers + [assembler, gbt]
)

model = pipeline.fit(train_df)

In [72]:
predictions = model.transform(test_df)

In [73]:
evaluator = BinaryClassificationEvaluator(labelCol="fraud_flag")

In [74]:
auc = evaluator.evaluate(predictions)
print("AUC:", auc)

AUC: 1.0


In [75]:
cm = (
    predictions
    .groupBy("fraud_flag", "prediction")
    .count()
)

cm.show()

+----------+----------+-----+
|fraud_flag|prediction|count|
+----------+----------+-----+
|         0|       0.0| 8991|
|         1|       1.0|  181|
+----------+----------+-----+



In [76]:
from pyspark.sql.functions import col, sum as _sum, when, coalesce, lit

stats = (
    cm
    .groupBy("fraud_flag")
    .agg(
        coalesce(
            _sum(when(col("fraud_flag") == col("prediction"), col("count"))),
            lit(0)
        ).alias("TP"),
        coalesce(
            _sum(when(col("fraud_flag") != col("prediction"), col("count"))),
            lit(0)
        ).alias("FN")
    )
)

In [77]:
fp = (
    cm
    .groupBy("prediction")
    .agg(_sum("count").alias("predicted_total"))
    .withColumnRenamed("prediction", "fraud_flag")
)

stats = stats.join(fp, "fraud_flag")
stats = stats.withColumn("FP", col("predicted_total") - col("TP"))


In [78]:
stats = (
    stats
    .withColumn("precision", col("TP") / (col("TP") + col("FP")))
    .withColumn("recall", col("TP") / (col("TP") + col("FN")))
    .withColumn("f1", 2 * col("precision") * col("recall") /
                        (col("precision") + col("recall")))
)

stats.select("fraud_flag", "precision", "recall", "f1").show()



+----------+---------+------+---+
|fraud_flag|precision|recall| f1|
+----------+---------+------+---+
|         1|      1.0|   1.0|1.0|
|         0|      1.0|   1.0|1.0|
+----------+---------+------+---+



                                                                                

## Optimized version â€” CrossValidator + ParamGrid

In [79]:
from pyspark.ml.tuning import ParamGridBuilder

paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [50, 100]) \
    .addGrid(gbt.maxDepth, [3, 5]) \
    .addGrid(gbt.stepSize, [0.05, 0.1]) \
    .build()


In [80]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="fraud_flag",
    predictionCol="prediction",
    metricName="f1"
)


In [81]:
from pyspark.ml.tuning import CrossValidator

cv_gbt = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5,
    parallelism=4  
)


In [82]:
cv_gbt_model = cv_gbt.fit(train_df)

gbt_cv_preds = cv_gbt_model.transform(test_df)

gbt_cv_f1 = evaluator.evaluate(gbt_cv_preds)
gbt_cv_f1


26/01/27 22:50:32 WARN BlockManager: Asked to remove block broadcast_10803_piece0, which does not exist
26/01/27 22:50:32 WARN BlockManager: Asked to remove block broadcast_10803, which does not exist
26/01/27 23:01:57 WARN BlockManager: Asked to remove block broadcast_13385, which does not exist
26/01/27 23:20:56 WARN CacheManager: Asked to cache already cached data.        200]]
26/01/27 23:20:56 WARN CacheManager: Asked to cache already cached data.
26/01/27 23:30:10 WARN BlockManager: Asked to remove block broadcast_19208, which does not exist
                                                                                200]]

1.0

In [83]:
best_model = cv_gbt_model.bestModel
best_gbt = best_model.stages[-1]

print("Best maxIter:", best_gbt.getMaxIter())
print("Best maxDepth:", best_gbt.getMaxDepth())
print("Best stepSize:", best_gbt.getStepSize())

Best maxIter: 50
Best maxDepth: 3
Best stepSize: 0.05


### SVM

In [49]:
from pyspark.ml.classification import LinearSVC

svm = LinearSVC(
    featuresCol="features",
    labelCol="fraud_flag",
    maxIter=100
)

In [50]:
pipeline = Pipeline(
    stages=indexers + [assembler, svm]
)

model = pipeline.fit(train_df)

In [51]:
predictions = model.transform(test_df)

In [52]:
evaluator = BinaryClassificationEvaluator(labelCol="fraud_flag")

In [53]:
auc = evaluator.evaluate(predictions)
print("AUC:", auc)

AUC: 0.9999975420478796


In [54]:
cm = (
    predictions
    .groupBy("fraud_flag", "prediction")
    .count()
)

cm.show()

+----------+----------+-----+
|fraud_flag|prediction|count|
+----------+----------+-----+
|         0|       0.0| 8991|
|         1|       1.0|  181|
+----------+----------+-----+



In [55]:
from pyspark.sql.functions import col, sum as _sum, when, coalesce, lit

stats = (
    cm
    .groupBy("fraud_flag")
    .agg(
        coalesce(
            _sum(when(col("fraud_flag") == col("prediction"), col("count"))),
            lit(0)
        ).alias("TP"),
        coalesce(
            _sum(when(col("fraud_flag") != col("prediction"), col("count"))),
            lit(0)
        ).alias("FN")
    )
)

In [56]:
fp = (
    cm
    .groupBy("prediction")
    .agg(_sum("count").alias("predicted_total"))
    .withColumnRenamed("prediction", "fraud_flag")
)

stats = stats.join(fp, "fraud_flag")
stats = stats.withColumn("FP", col("predicted_total") - col("TP"))


In [57]:
stats = (
    stats
    .withColumn("precision", col("TP") / (col("TP") + col("FP")))
    .withColumn("recall", col("TP") / (col("TP") + col("FN")))
    .withColumn("f1", 2 * col("precision") * col("recall") /
                        (col("precision") + col("recall")))
)

stats.select("fraud_flag", "precision", "recall", "f1").show()



+----------+---------+------+---+
|fraud_flag|precision|recall| f1|
+----------+---------+------+---+
|         0|      1.0|   1.0|1.0|
|         1|      1.0|   1.0|1.0|
+----------+---------+------+---+



                                                                                

### Naive Bayes

In [58]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

nb = NaiveBayes(
    featuresCol="features",
    labelCol="fraud_flag"
)


In [59]:
pipeline = Pipeline(
    stages=indexers + [assembler, nb]
)

model = pipeline.fit(train_df)

In [60]:
evaluator = BinaryClassificationEvaluator(labelCol="fraud_flag")

In [61]:
auc = evaluator.evaluate(predictions)
print("AUC:", auc)

AUC: 0.9999975420478796


In [62]:
cm = (
    predictions
    .groupBy("fraud_flag", "prediction")
    .count()
)

cm.show()

+----------+----------+-----+
|fraud_flag|prediction|count|
+----------+----------+-----+
|         0|       0.0| 8991|
|         1|       1.0|  181|
+----------+----------+-----+



In [63]:
from pyspark.sql.functions import col, sum as _sum, when, coalesce, lit

stats = (
    cm
    .groupBy("fraud_flag")
    .agg(
        coalesce(
            _sum(when(col("fraud_flag") == col("prediction"), col("count"))),
            lit(0)
        ).alias("TP"),
        coalesce(
            _sum(when(col("fraud_flag") != col("prediction"), col("count"))),
            lit(0)
        ).alias("FN")
    )
)

In [64]:
fp = (
    cm
    .groupBy("prediction")
    .agg(_sum("count").alias("predicted_total"))
    .withColumnRenamed("prediction", "fraud_flag")
)

stats = stats.join(fp, "fraud_flag")
stats = stats.withColumn("FP", col("predicted_total") - col("TP"))


In [65]:
stats = (
    stats
    .withColumn("precision", col("TP") / (col("TP") + col("FP")))
    .withColumn("recall", col("TP") / (col("TP") + col("FN")))
    .withColumn("f1", 2 * col("precision") * col("recall") /
                        (col("precision") + col("recall")))
)

stats.select("fraud_flag", "precision", "recall", "f1").show()

+----------+---------+------+---+
|fraud_flag|precision|recall| f1|
+----------+---------+------+---+
|         1|      1.0|   1.0|1.0|
|         0|      1.0|   1.0|1.0|
+----------+---------+------+---+

