In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LogisticRegressionModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.functions import col
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.streaming import StreamingQuery
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve
# from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
# from pyspark.ml.classification import RandomForestClassifier
# from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
# import matplotlib.pyplot as plt
# import seaborn as sns
# import pandas as pd

In [3]:
# File path
file_path = "data/creditcard.csv"

In [4]:
# 1. Initialize Spark Session
spark = SparkSession.builder \
    .appName("FraduDetection") \
    .master("local[*]") \
    .getOrCreate()


25/04/26 23:41:51 WARN Utils: Your hostname, mbp.local resolves to a loopback address: 127.0.0.1; using 10.20.5.195 instead (on interface en0)
25/04/26 23:41:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/26 23:41:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/26 23:41:53 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
# Load the credit card dataset
def load_data(file_path):
    try:
        df = spark.read.csv(file_path, header=True, inferSchema=True)
        print("Dataset loaded successfully.")
        return df
    except Exception as e:
        print(f"Error loading dataset: {e}")
        return None
df = load_data(file_path)
df.show(5)

                                                                                

Dataset loaded successfully.


25/04/26 23:20:30 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----+------------------+-------------------+----------------+------------------+-------------------+-------------------+-------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+--------------------+-------------------+------------------+------------------+------------------+------------------+--------------------+-------------------+------+-----+
|Time|                V1|                 V2|              V3|                V4|                 V5|                 V6|                 V7|                V8|                V9|                V10|               V11|               V12|               V13|               V14|               V15|               V16|               V17|                V18|               V19|                V20|                 V21|                V22|     

### Explore the data

In [5]:
# Check the schema
df.printSchema()

root
 |-- Time: double (nullable = true)
 |-- V1: double (nullable = true)
 |-- V2: double (nullable = true)
 |-- V3: double (nullable = true)
 |-- V4: double (nullable = true)
 |-- V5: double (nullable = true)
 |-- V6: double (nullable = true)
 |-- V7: double (nullable = true)
 |-- V8: double (nullable = true)
 |-- V9: double (nullable = true)
 |-- V10: double (nullable = true)
 |-- V11: double (nullable = true)
 |-- V12: double (nullable = true)
 |-- V13: double (nullable = true)
 |-- V14: double (nullable = true)
 |-- V15: double (nullable = true)
 |-- V16: double (nullable = true)
 |-- V17: double (nullable = true)
 |-- V18: double (nullable = true)
 |-- V19: double (nullable = true)
 |-- V20: double (nullable = true)
 |-- V21: double (nullable = true)
 |-- V22: double (nullable = true)
 |-- V23: double (nullable = true)
 |-- V24: double (nullable = true)
 |-- V25: double (nullable = true)
 |-- V26: double (nullable = true)
 |-- V27: double (nullable = true)
 |-- V28: double (nulla

### Data Preprocessing

In [6]:
# Check null
df.select([col(c).isNull().cast("int").alias(c) for c in df.columns]).groupBy().sum().show()

[Stage 3:>                                                          (0 + 8) / 8]

+---------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-----------+----------+
|sum(Time)|sum(V1)|sum(V2)|sum(V3)|sum(V4)|sum(V5)|sum(V6)|sum(V7)|sum(V8)|sum(V9)|sum(V10)|sum(V11)|sum(V12)|sum(V13)|sum(V14)|sum(V15)|sum(V16)|sum(V17)|sum(V18)|sum(V19)|sum(V20)|sum(V21)|sum(V22)|sum(V23)|sum(V24)|sum(V25)|sum(V26)|sum(V27)|sum(V28)|sum(Amount)|sum(Class)|
+---------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-----------+----------+
|        0|      0|      0|      0|      0|      0|      0|      0|      0|      0|       0|       0|       0|       0|       0|       0|       0|       0|       0|  

                                                                                

### Feature Engineering

In [7]:
# The dataset already has PCA-transformed features(V1-V28). Lets use Amount and time as additional features.
# Normalized the data
assembler = VectorAssembler(inputCols=["Amount", "Time"], outputCol="features_raw")
df = assembler.transform(df)

scaler = StandardScaler(inputCol="features_raw", outputCol = "scaled_features")
scaler_model = scaler.fit(df)
df = scaler_model.transform(df)


                                                                                

### Assemble Features

In [9]:
# Combine V1-V128 , scaled Amount, and Time into a feature vector
feature_cols = [f"V{i}" for i in range(1, 29)] + ["scaled_features"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

### Handle Imbalanced Data

In [10]:
# Use undersampling oroversampling(e.g, SMOTE is complex in Spark, undersampling is simpler)
fraud = df.filter(col("Class")==1)
non_fraud = df.filter(col("Class")==0).sample(fraction=0.02, seed=42)
balanced_data = fraud.union(non_fraud)

### Building machine learning model

In [11]:
# split the data
train_data , test_data = balanced_data.randomSplit([0.8,0.2], seed=2)

### Choose the model

In [14]:
lr = LogisticRegression(featuresCol="features", labelCol="Class")
lr_model = lr.fit(train_data)

25/04/26 23:26:56 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

### Make Predictions

In [16]:
predictions = lr_model.transform(test_data)
predictions.select("Class","prediction", "probability").show(5)

+-----+----------+--------------------+
|Class|prediction|         probability|
+-----+----------+--------------------+
|    1|       1.0|[0.00261492532300...|
|    1|       1.0|[2.56544436510587...|
|    1|       1.0|[2.48304697336230...|
|    1|       1.0|[3.54142340313879...|
|    1|       1.0|[0.00130468136242...|
+-----+----------+--------------------+
only showing top 5 rows



                                                                                

### Evaluate the Model

In [17]:
# AUC-ROC
auc_evaluator = BinaryClassificationEvaluator(labelCol="Class", metricName="areaUnderROC")
auc = auc_evaluator.evaluate(predictions)
print(f"AUC-ROC: {auc}")

# Precision, Recall, F1
multi_evaluator = MulticlassClassificationEvaluator(labelCol="Class", predictionCol="prediction")
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
print(f"Precision: {precision}, Recall: {recall}, F1-Score: {f1}")

                                                                                

AUC-ROC: 0.9769848917644629




Precision: 0.9865649101448073, Recall: 0.9867601246105919, F1-Score: 0.9863970645260234


                                                                                

### Confusion Matrix

In [19]:
predictions.groupBy("Class", "prediction").count().show()



+-----+----------+-----+
|Class|prediction|count|
+-----+----------+-----+
|    1|       0.0|   14|
|    1|       1.0|   84|
|    0|       0.0| 1183|
|    0|       1.0|    3|
+-----+----------+-----+



                                                                                

### Tune the model

In [21]:
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5]) \
    .build()

cv = CrossValidator(estimator=lr, estimatorParamMaps=param_grid, evaluator=auc_evaluator, numFolds=3)
cv_model = cv.fit(train_data)
best_model = cv_model.bestModel

                                                                                

### Save and Deploy the Model

In [24]:
best_model.save("models/fraud_detection_model")

                                                                                

### Load the model

In [5]:
loaded_model = LogisticRegressionModel.load("models/fraud_detection_model")

                                                                                

### Deploy

In [35]:
# use the streaming data from stream_source folder
stream_source_path = "data/stream_source"
# read the streaming data from stream_source folder
streaming_data = spark.readStream.schema(df.schema).csv(stream_source_path)
# use the model to predict the streaming data
streaming_predictions = loaded_model.transform(streaming_data)
# write the predictions to the console
streaming_predictions.writeStream.outputMode("append").format("console").start()


25/04/26 23:48:47 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/hj/4_1_bgn11_dgm99bq0lnjxl80000gn/T/temporary-7f935fa7-d1c8-4efb-87c6-bd86bd287c5d. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/04/26 23:48:47 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x11439fe50>

25/04/26 23:48:47 ERROR MicroBatchExecution: Query [id = 6c88ea4b-2e4b-4371-a821-e4eca2057bc1, runId = bb9d451f-73a8-4a80-bedb-1795c56139c3] terminated with error
org.apache.spark.sql.AnalysisException: [UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE] The CSV datasource doesn't support the column `features_raw` of the type "STRUCT<type: TINYINT, size: INT, indices: ARRAY<INT>, values: ARRAY<DOUBLE>>".
	at org.apache.spark.sql.errors.QueryCompilationErrors$.dataTypeUnsupportedByDataSourceError(QueryCompilationErrors.scala:1650)
	at org.apache.spark.sql.execution.datasources.DataSourceUtils$.$anonfun$verifySchema$1(DataSourceUtils.scala:92)
	at org.apache.spark.sql.execution.datasources.DataSourceUtils$.$anonfun$verifySchema$1$adapted(DataSourceUtils.scala:90)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.sc